diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java index 230fb31f..74175b2f 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java @@ -1,13 +1,17 @@ package com.suisung.mall.im.common.websocket.service; import com.suisung.mall.im.pojo.vo.SendVO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.web.socket.WebSocketSession; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.IM_FANOUT_EXCHANGE; @@ -15,6 +19,7 @@ import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.IM_FANO @Service public class DistributedMessageService { + private static final Logger log = LoggerFactory.getLogger(DistributedMessageService.class); @Autowired private RabbitTemplate rabbitTemplate; @@ -30,7 +35,8 @@ public class DistributedMessageService { /** * 发送消息给用户 */ - public boolean sendToUser(String targetUserId, SendVO message) { + public boolean sendToUser(String targetUserId, SendVO message, ConcurrentHashMap> userSessions) { + log.info("targetUserId{},message:{}", targetUserId, message); String targetServer = sessionService.getUserServer(targetUserId); if (targetServer == null) { @@ -40,7 +46,7 @@ public class DistributedMessageService { if (isCurrentServer(targetServer)) { // 用户在当前服务器,直接发送 - return localSessionManager.sendToUser(targetUserId, message); + return localSessionManager.sendToUser(targetUserId, message,userSessions); } else { // 用户在其他服务器,通过RabbitMQ转发 forwardToUser(targetServer, targetUserId, message); diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java index 4e7ab0e6..c9f845b6 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java @@ -42,9 +42,10 @@ public class LocalSessionManager { if (existingSessions == null) { existingSessions = new CopyOnWriteArrayList<>(); } - if (!existingSessions.contains(session)) { - existingSessions.add(session); - } +// if (!existingSessions.contains(session)) { +// existingSessions.add(session); +// } + existingSessions.add(session); return existingSessions; }); @@ -68,12 +69,15 @@ public class LocalSessionManager { /** * 发送消息给用户 */ - public boolean sendToUser(String userId, SendVO message) { + public boolean sendToUser(String userId, SendVO message,ConcurrentHashMap> userSession) { if (userId == null || message == null) { log.info("发送消息失败: 参数为空"); return false; } - + if(null!=userSession){ + this.userSessions=userSession; + } + log.info("userSessions={}", userSession); List sessions = userSessions.get(userId); if (sessions == null || sessions.isEmpty()) { log.info("用户没有活跃会话: {}", userId); diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java index 0cebeda6..155ada80 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java @@ -65,7 +65,7 @@ public class MessageConsumerService { } if (userMsg != null) { - boolean success = localSessionManager.sendToUser(userId, userMsg); + boolean success = localSessionManager.sendToUser(userId, userMsg,null); logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败"); } else { logger.info("消息格式错误,无法发送给用户: {}", userId); diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java index 82ed6eb2..e4b11f83 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java @@ -1,11 +1,11 @@ package com.suisung.mall.im.common.websocket.service.onchat; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONUtil; -import com.suisung.mall.common.feignService.AccountService; -import com.suisung.mall.common.utils.CheckUtil; +import com.suisung.mall.im.common.websocket.service.DistributedMessageService; +import com.suisung.mall.im.common.websocket.service.DistributedSessionService; +import com.suisung.mall.im.common.websocket.service.LocalSessionManager; import com.suisung.mall.im.common.websocket.utils.Constants; import com.suisung.mall.im.pojo.entity.ChatHistory; import com.suisung.mall.im.pojo.vo.MineDTO; @@ -13,164 +13,78 @@ import com.suisung.mall.im.pojo.vo.ReceiveDTO; import com.suisung.mall.im.pojo.vo.SendVO; import com.suisung.mall.im.pojo.vo.ToDTO; import com.suisung.mall.im.service.ChatHistoryService; -import com.suisung.mall.im.service.LayGroupService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.web.socket.*; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class MallsuiteImSocketHandler implements WebSocketHandler { private static Logger logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class); - private static ArrayList users; - private static ArrayList usersStr; + // 移除原有的静态集合,使用注入的服务 + // private static ArrayList users; + // private static ArrayList usersStr; + // private static Map> userSession; + // private static Map> groupSession; - //存入用户的所有终端的连接信息 - private static Map> userSession; + @Autowired + private DistributedSessionService distributedSessionService; - //群组信息 - private static Map> groupSession; + @Autowired + private DistributedMessageService distributedMessageService; - static { - users = new ArrayList<>(); - usersStr = new ArrayList<>(); - userSession = Collections.synchronizedMap(new HashMap<>()); - groupSession = Collections.synchronizedMap(new HashMap<>()); - logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class); - } - - public ArrayList getOnlineLoginNames() { - ArrayList onlineLoginNames = new ArrayList(); - for (WebSocketSession user : users) { - String userName = (String) user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); - if (userName != null) { - onlineLoginNames.add(userName); - } - } - return onlineLoginNames; - - } - - public ArrayList getOnlineLoginUserId() { - ArrayList onlineLoginUserId = new ArrayList(); - for (WebSocketSession user : users) { - Integer user_id = Convert.toInt(user.getAttributes().get("user_id")); - if (CheckUtil.isNotEmpty(user_id)) { - onlineLoginUserId.add(user_id); - } - } - return onlineLoginUserId; - - } + @Autowired + private LocalSessionManager localSessionManager; @Autowired private ChatHistoryService chatHistoryService; - @Autowired - private LayGroupService layGroupService; - @Autowired - private AccountService accountService; + //用户上线后触发 @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.debug("connect to the websocket success......"); + + String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ; String sessionId = session.getId(); - users.add(session); - this.updateOnlineStatus(); - String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);//获取刚上线用户的loginName - List loginSessions = userSession.get(loginUserId); - if (CollUtil.isEmpty(loginSessions)) { - if (null == loginSessions) { - loginSessions = new ArrayList<>(); - } + // 存储到本地会话管理 + ConcurrentHashMap> userSessions=new ConcurrentHashMap<>(); + localSessionManager.setUserSessions(userSessions); + localSessionManager.addUserSession(loginUserId, session); - loginSessions.add(session); - userSession.put(loginUserId, loginSessions); - } else { - if (!loginSessions.contains(session)) { - loginSessions.add(session); - } - } + logger.info("添加会话到本地成功:{}", localSessionManager.getUserSessions().values()); - if (loginUserId != null) { - SendVO msg = new SendVO(); - //读取离线信息 - ChatHistory chat = new ChatHistory(); - chat.setReceiver(loginUserId);//发给刚上线的用户信息 - chat.setStatus("0"); - List list = chatHistoryService.findList(chat); - /* - for(ChatHistory c : list){ - String sender=""; - String receiver=""; + // 注册到分布式会话服务 + Map attributes = new HashMap<>(); + attributes.put("user_id", String.valueOf(session.getAttributes().get("user_id"))); + attributes.put("loginName", loginUserId); + distributedSessionService.registerUserSession(loginUserId, sessionId, attributes); - if("friend".equals(c.getType()) || "user".equals(c.getType())){//如果是个人信息 - sender = c.getSender(); //todo 确认数据库存的数据uid user_id 不一致 - msg.setId(sender); - msg.setToid(c.getReceiver()); - msg.setContent(c.getMsg()); - //todu 缺少数据 - //msg = sender+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_; - }else if(c.getType().contains("_group")){//如果是直播群组信息 - msg.setType("group"); + // 处理离线消息等原有逻辑... + // this.updateOnlineStatus(); - sender = c.getSender(); - String groupId = c.getReceiver().split("_")[0];//直播群组id - receiver=c.getReceiver().split("_")[1];//直播群组所属user_id - msg = sender+Constants._msg_+groupId+Constants._msg_+c.getMsg()+Constants._msg_; - - }else{//如果是群组信息 - msg.setType("group"); - - String groupId = c.getSender().split(Constants._msg_)[0];//群组id - sender=c.getSender().split(Constants._msg_)[1];//发送者loginName - msg = groupId+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_; - - } - - //todo 应该冗余 - //获取用户基本信息 - AccountUserBase accountUserBase = accountService.getUserBase(Integer.valueOf(sender)); - //AccountUserBase accountUserBase= result.getFenResult(AccountUserBase.class); - //AccountUserInfo userInfo=accountService.getUserInfo(accountUserBase.getUser_id()); - AccountUserInfo userInfo=accountService.getUserInfo(Integer.valueOf(sender)); - - - msg.setUser_id(Convert.toInt(session.getAttributes().get("user_id"))); - msg.setUsername(accountUserBase.getUser_nickname()); - msg.setAvatar(userInfo.getUser_avatar()); - //msg.setMessage_id(c.getId()); - msg.setFromid(msg.getId()); - msg.setTime(c.getCreate_date().getTime()); - msg.setStatus(0); //? - msg.setMsg(new HashMap()); - - boolean isSuccess = this.sendMessageToUser(loginUserId, msg); - if(isSuccess) { - c.setStatus("1");//标记为已读 - chatHistoryService.saveNew(c); - } - } - */ - } } //接收js侧发送来的用户信息 @Override public void handleMessage(WebSocketSession session, WebSocketMessage socketMessage) throws Exception { ReceiveDTO receiveDTO = JSONUtil.toBean(socketMessage.getPayload().toString(), ReceiveDTO.class); - if (receiveDTO != null) {//发送消息 + logger.info("接收到信息{}", receiveDTO); + if (receiveDTO != null) { SendVO sendVO = new SendVO(); MineDTO mine = receiveDTO.getMine(); ToDTO to = receiveDTO.getTo(); + // 设置sendVO的代码保持不变... sendVO.setToid(to.getId()); - sendVO.setZid(to.getZid()); //群组特有 + sendVO.setZid(to.getZid()); sendVO.setId(mine.getId()); Integer toUserId = Convert.toInt(session.getAttributes().get("user_id")); sendVO.setUser_id(toUserId); @@ -182,32 +96,33 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { to.setType("friend"); } - sendVO.setType(to.getType()); //是从to中读取Type + sendVO.setType(to.getType()); sendVO.setSendmethod(sendVO.getType()); - sendVO.setContent(mine.getContent()); sendVO.setMessage_id(mine.getMessage_id()); - sendVO.setFromid(mine.getId()); sendVO.setTime(new Date().getTime()); - sendVO.setStatus(0); //? + sendVO.setStatus(0); sendVO.setMsg_type(mine.getType()); sendVO.setItem_id(mine.getItem_id()); sendVO.setMsg(new HashMap()); sendVO.setMessage_length(mine.getLength()); - String sender = mine.getId();//信息发送者登录名(loginName)或user_id - String receiver = to.getId();//信息接收者,如果是私聊就是用户loginName,如果是群聊就是群组id + String sender = mine.getId(); + String receiver = to.getId(); String msg = mine.getContent(); String avatar = mine.getAvatar(); String type = to.getType(); - String senderName = mine.getUsername();//发送者姓名(name) + String senderName = mine.getUsername(); + + // 更新心跳 + distributedSessionService.updateHeartbeat(sender, session.getId()); if (!mine.getId().equals(to.getId())) { sendVO.setMine(false); - //保存聊天记录 + if ("friend".equals(type) || "user".equals(type)) { - //如果是私聊 + // 私聊消息 - 使用分布式发送 ChatHistory chat = new ChatHistory(); chat.setId(IdUtil.simpleUUID()); chat.setSender(sender); @@ -215,124 +130,48 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { chat.setMsg(msg); chat.setType("friend"); chat.setCreate_date(new Date()); - boolean isSuccess = this.sendMessageToUser(receiver, sendVO); - if (isSuccess) {//如果信息发送成功 - chat.setStatus("1");//设置为已读 + boolean isSuccess = distributedMessageService.sendToUser(receiver, sendVO,localSessionManager.getUserSessions()); + if (isSuccess) { + chat.setStatus("1"); } else { + // 发送离线消息提醒 sendVO.setToid(mine.getId()); sendVO.setContent(to.getName() + " 对方现在离线,他将在上线后收到你的消息!"); sendVO.setMsg_type("text"); sendVO.setMessage_length("0"); sendVO.setId(to.getId()); - this.sendMessageToUser(sender, sendVO);//同时向本人发送对方不在线消息 - chat.setStatus("0");//设置为未读 + distributedMessageService.sendToUser(sender, sendVO,localSessionManager.getUserSessions()); + chat.setStatus("0"); } chatHistoryService.saveNew(chat); - } else if ("group".equals(type)) {//如果是群聊 - // 临时,不经过数据库 - List groupLoginSession = groupSession.get(to.getZid()); - if (!CollUtil.isEmpty(groupLoginSession)) { - for (WebSocketSession gs : groupLoginSession) { + } else if ("group".equals(type)) { + // 群聊消息 - 使用分布式发送 + String groupId = to.getZid(); + distributedMessageService.sendToGroup(groupId, sendVO, session.getId()); - if (sender.equals(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString())) {//群消息不发给自己,但是存一条记录当做聊天记录查询 - - } else { - //设置接受用户 - sendVO.setToid(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString()); - boolean isSuccess = this.sendMessageToUser(to.getName(), sendVO); - if (isSuccess) { - - } else { - } - } - - //chatHistoryService.saveNew(chat); - } - } - - - /* - List layGroupUserlist = new ArrayList(); - - //群主 - LayGroupUser owner = new LayGroupUser(); - LayGroup layGroup = layGroupService.get(receiver); - owner.setUser_id(layGroup.getCreate_by()); - layGroupUserlist.add(owner); - - //群成员1 - List zlist = layGroupService.getUsersByGroup(layGroupService.get(receiver)); - layGroupUserlist.addAll(zlist); - - for (LayGroupUser lgUser : layGroupUserlist) { - AccountUserBase user_base_row = accountService.getUserBase(Integer.valueOf(lgUser.getUser_id())); - ChatHistory chat = new ChatHistory(); - chat.setId(IdUtil.simpleUUID()); - //群聊时信息先发送给群聊id(即receiver),在后台转发给所有非发送者(sender)的人的话,群聊id(即receiver)就变成发送者。 - String groupId = receiver; - //保存聊天记录 - chat.setSender(groupId + Constants._msg_ + sender);//群聊时保存群聊id和发送者id - chat.setReceiver(user_base_row.getUser_account());//群中所有信息获得者人员 - chat.setMsg(msg); - chat.setType("group"); - //message = groupId + Constants._msg_ + user_base_row.getUser_account() + Constants._msg_ + msg + Constants._msg_ + avatar + Constants._msg_ + type + Constants._msg_ + senderName + Constants._msg_ + datatime; - if (sender.equals(user_base_row.getUser_account())) {//群消息不发给自己,但是存一条记录当做聊天记录查询 - chat.setStatus("1");//发送成功,设置为已读 - } else { - boolean isSuccess = this.sendMessageToUser(user_base_row.getUser_account(), sendVO); - if (isSuccess) { - chat.setStatus("1");//发送成功,设置为已读 - } else { - chat.setStatus("0");//用户不在线,设置为未读 - } - } - - chatHistoryService.saveNew(chat); - } - */ - - } else if ("join_group".equals(type)) { //临时群组,聚焦当前窗口 + } else if ("join_group".equals(type)) { + // 加入群组 String zid = to.getZid(); - - //设置session,属性上标出所属群组: 目前无同时多个群组需求,如果存在,此处存入list session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid); - //设置群组中用户 - List groupLoginSession = groupSession.get(zid); + // 添加到本地群组 + localSessionManager.addGroupSession(zid, session); - if (CollUtil.isEmpty(groupLoginSession)) { - if (null == groupLoginSession) { - groupLoginSession = new ArrayList<>(); - } + // 添加到分布式群组 + String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); + distributedSessionService.addUserToGroup(zid, userId, session.getId()); - groupLoginSession.add(session); - groupSession.put(zid, groupLoginSession); - } else { - if (!groupLoginSession.contains(session)) { - groupLoginSession.add(session); - } - } - - //todo通知已存在群组用户消息 可启动独立task - } else if ("leave_group".equals(type)) { //临时群组,聚焦当前窗口 + } else if ("leave_group".equals(type)) { + // 离开群组 String zid = to.getZid(); - - //设置session,属性上标出所属群组 session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, null); - //设置群组中用户 - List groupLoginSession = groupSession.get(zid); + // 从本地群组移除 + localSessionManager.removeGroupSession(zid, session); - if (CollUtil.isEmpty(groupLoginSession)) { - } else { - if (groupLoginSession.contains(session)) { - groupLoginSession.remove(session); - } - } - - //todo通知已存在群组用户消息 可启动独立task - } else { + // 从分布式群组移除 + distributedSessionService.removeUserFromGroup(zid, session.getId()); } } else { sendVO.setMine(true); @@ -346,28 +185,39 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { session.close(); } logger.debug("websocket connection closed......"); - users.remove(session); - userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session); - - if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) { - groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session); - } - - this.updateOnlineStatus(); - + cleanupSession(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); - users.remove(session); - userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session); + cleanupSession(session); + } - if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) { - groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session); + /** + * 清理会话资源 + */ + private void cleanupSession(WebSocketSession session) { + String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ; + String sessionId = session.getId(); + String groupId = (String) session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY); + + // 从本地管理移除 + if(localSessionManager.getUserSessions()==null){ + localSessionManager.setUserSessions(new ConcurrentHashMap<>()); + } + localSessionManager.removeUserSession(loginUserId, session); + if (groupId != null) { + localSessionManager.removeGroupSession(groupId, session); } - this.updateOnlineStatus(); + // 从分布式存储注销 + distributedSessionService.unregisterUserSession(loginUserId, sessionId); + if (groupId != null) { + distributedSessionService.removeUserFromGroup(groupId, sessionId); + } + + // this.updateOnlineStatus(); } @Override @@ -376,64 +226,112 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { } /** - * 给所有在线用户发送消息 - * - * @param message + * 给某个用户发送消息 (兼容原有接口) */ - public void sendMessageToAllUsers(List message) { + public boolean sendMessageToUser(String loginName, SendVO message) { + return distributedMessageService.sendToUser(loginName, message,localSessionManager.getUserSessions()); + } - SendVO msg = new SendVO(); - msg.setContent(message.toString()); - msg.setType("online"); - msg.setSendmethod(msg.getType()); + /** + * 获取所有在线用户登录名 + */ + public List getOnlineLoginNames() { + // 使用分布式服务获取所有在线用户登录名 + return distributedSessionService.getOnlineLoginNames(); + } - //不能这么操作 - for (WebSocketSession user : users) { - try { - if (user.isOpen()) { - user.sendMessage(new TextMessage(msg.toString())); + /** + * 获取所有在线用户ID + */ + public List getOnlineLoginUserId() { + // 使用分布式服务获取所有在线用户ID + return distributedSessionService.getAllOnlineUserIds(); + } + + /** + * 获取当前服务器的在线用户ID + */ + public List getLocalOnlineLoginUserId() { + // 使用本地会话管理器获取当前服务器的在线用户ID + return localSessionManager.getLocalOnlineUserIds(); + } + + /** + * 获取当前服务器的在线用户登录名 + */ + public List getLocalOnlineLoginNames() { + // 使用本地会话管理器获取当前服务器的在线用户登录名 + return localSessionManager.getLocalOnlineLoginNames(); + } + + /** + * 更新在线状态 - 通知所有用户在线列表变化 + */ + public void updateOnlineStatus() { + try { + // 创建在线状态消息 + SendVO onlineStatusMsg = new SendVO(); + onlineStatusMsg.setType("online_status"); + onlineStatusMsg.setSendmethod("broadcast"); + + // 获取在线用户列表 + List onlineUsers = getOnlineLoginNames(); + List onlineUserIds = getOnlineLoginUserId(); + + Map data = new HashMap<>(); + data.put("onlineUsers", onlineUsers); + data.put("onlineUserIds", onlineUserIds); + data.put("timestamp", System.currentTimeMillis()); + + onlineStatusMsg.setMsg(data); + + // 广播在线状态更新(只广播到当前服务器,避免循环广播) + broadcastLocalOnlineStatus(onlineStatusMsg); + + logger.info("在线状态更新: {} 个用户在线", onlineUsers.size()); + } catch (Exception e) { + logger.error("更新在线状态失败: " + e.getMessage(), e); + } + } + + /** + * 广播在线状态到本地用户 + */ + private void broadcastLocalOnlineStatus(SendVO message) { + logger.info("localSessionManager: {}", localSessionManager); + logger.info("localSessionManager.userSessions: {}", localSessionManager.getUserSessions()); + logger.info("localSessionManager.userSessions.values: {}", localSessionManager.getUserSessions().values()); + if (localSessionManager == null || localSessionManager.getUserSessions().isEmpty()) { + logger.error("localSessionManager is null, cannot broadcast online status"); + return; + } + + // 获取本地所有用户会话 + for (List sessions : localSessionManager.getUserSessions().values()) { + for (WebSocketSession session : sessions) { + if (session!=null&&session.isOpen()) { + try { + session.sendMessage(new TextMessage(message.toString())); + } catch (IOException e) { + logger.error("发送在线状态消息失败: " + e.getMessage(), e); + } } - } catch (IOException e) { - logger.error("给所有在线用户发送信息失败!" + e.getMessage(), e); } } } /** - * 给某个用户发送消息 - * - * @param loginName - * @param message + * 给所有在线用户发送消息 */ - public boolean sendMessageToUser(String loginName, SendVO message) { - boolean result = false; + public void sendMessageToAllUsers(SendVO message) { + // 获取所有在线用户ID + List onlineUserIds = getOnlineLoginUserId(); - //不能这样操作。 - //for (WebSocketSession user : users) { - List lst = userSession.get(message.getToid()); - - if (CollUtil.isNotEmpty(lst)) { - for (WebSocketSession user : lst) { - //if (user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).equals(loginName)) {//允许用户多个浏览器登录,每个浏览器页面都会收到用户信息 - try { - if (user.isOpen()) { - user.sendMessage(new TextMessage(message.toString())); - result = true; - } - } catch (IOException e) { - logger.error("给用户发送信息失败!" + e.getMessage(), e); - } - - //break;//注释掉此处意味着遍历该用户打开的所有页面并发送信息,否则只会向用户登录的第一个网页发送信息。 - //} - } + for (Integer userId : onlineUserIds) { + String userIdStr = String.valueOf(userId); + distributedMessageService.sendToUser(userIdStr, message,localSessionManager.getUserSessions()); } - return result; + logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size()); } - - public void updateOnlineStatus() { - //this.sendMessageToAllUsers(this.getOnlineLoginNames());//通知所有用户更新在线信息 - } - } \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java b/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java index dd4bb67d..3138043c 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java +++ b/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java @@ -45,8 +45,8 @@ public class ChatSocketInfoController { @RequestMapping(value = "/getUserOnline", method = RequestMethod.POST) public List getUserOnline(@RequestBody List user_ids) { logger.info(I18nUtil._("接收的用户ids:") + CollUtil.join(user_ids, ",")); - //List onlineLoginUserIds = distributedSessionService.getOnlineUserIds(); - List onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId(); + List onlineLoginUserIds = distributedSessionService.getOnlineUserIds(); + // List onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId(); Iterator online_user_ids_iter = onlineLoginUserIds.iterator(); // 处理移除非本店铺客服 while (online_user_ids_iter.hasNext()) {