From 92e5ccdff9e9c5ff5abce673764aea7c06254d3e Mon Sep 17 00:00:00 2001 From: liyj <1617420630@qq.com> Date: Thu, 20 Nov 2025 10:35:14 +0800 Subject: [PATCH] =?UTF-8?q?im=E8=B0=83=E6=95=B4=E7=BB=84=E7=9A=84sesion,?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/DistributedMessageService.java | 5 ++--- .../service/LocalSessionManager.java | 19 ++++------------ .../service/MessageConsumerService.java | 5 ++--- .../onchat/MallsuiteImSocketHandler.java | 22 ++++++++++++++----- 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java index d6ba45ae..0a317f5b 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java @@ -36,10 +36,9 @@ public class DistributedMessageService { * 发送消息给用户 * @param targetUserId 目标用户就是account的id * @param message - * @param userSessions * @return */ - public boolean sendToUser(String targetUserId, SendVO message, ConcurrentHashMap> userSessions) { + public boolean sendToUser(String targetUserId, SendVO message) { log.info("targetUserId{},message:{}", targetUserId, message); String targetServer = sessionService.getUserServer(targetUserId); @@ -50,7 +49,7 @@ public class DistributedMessageService { if (isCurrentServer(targetServer)) { // 用户在当前服务器,直接发送 - return localSessionManager.sendToUser(targetUserId, message,userSessions); + return localSessionManager.sendToUser(targetUserId, message); } else { // 用户在其他服务器,通过RabbitMQ转发 forwardToUser(targetServer, targetUserId, message); diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java index 98a83d13..72c3feeb 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java @@ -27,10 +27,11 @@ public class LocalSessionManager { private ConcurrentHashMap> userSessions; // 本地存储的群组会话 (groupId -> sessions) - private ConcurrentHashMap> groupSessions = new ConcurrentHashMap<>(); + private ConcurrentHashMap> groupSessions; public LocalSessionManager() { this.userSessions = MallsuiteImSocketHandler.userSessions; + this.groupSessions = MallsuiteImSocketHandler.groupSessions; } /** @@ -51,18 +52,14 @@ public class LocalSessionManager { * 发送消息给用户 * @param userId account的id * @param message - * @param userSession * @return */ - public boolean sendToUser(String userId, SendVO message,ConcurrentHashMap> userSession) { + public boolean sendToUser(String userId, SendVO message) { if (userId == null || message == null) { log.info("发送消息失败: 参数为空"); return false; } - if(null!=userSession){ - this.userSessions=userSession; - } - log.info("userSessions={}", userSession); + log.info("userSessions={}", userSessions); List sessions = userSessions.get(userId); if (sessions == null || sessions.isEmpty()) { log.info("用户没有活跃会话: {}", userId); @@ -111,14 +108,6 @@ public class LocalSessionManager { return false; } - /** - * 添加群组会话 - */ - public void addGroupSession(String groupId, WebSocketSession session) { - groupSessions.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()) - .add(session); - } - /** * 移除群组会话 */ diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java index 155ada80..ffb0d413 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java @@ -26,8 +26,7 @@ public class MessageConsumerService { public void consumeForwardMessage(Map message) { try { String type = (String) message.get("type"); - System.out.println("收到RabbitMQ消息, 类型: " + type + ", 服务器: " + rabbitMQManager.getServerId()); - + logger.info("收到RabbitMQ消息, 类型: {}, 服务器: {}", type, rabbitMQManager.getServerId()); switch (type) { case "SEND_TO_USER": handleSendToUser(message); @@ -65,7 +64,7 @@ public class MessageConsumerService { } if (userMsg != null) { - boolean success = localSessionManager.sendToUser(userId, userMsg,null); + boolean success = localSessionManager.sendToUser(userId, userMsg); logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败"); } else { logger.info("消息格式错误,无法发送给用户: {}", userId); diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java index 2a3d5223..06ca3b13 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java @@ -36,6 +36,8 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { public static ConcurrentHashMap> userSessions; + public static ConcurrentHashMap> groupSessions; + @Autowired private DistributedSessionService distributedSessionService; @@ -50,6 +52,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { static { userSessions=new ConcurrentHashMap<>(); + groupSessions=new ConcurrentHashMap<>(); } //用户上线后触发 @@ -134,7 +137,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { chat.setMsg(msg); chat.setType("friend"); chat.setCreate_date(new Date()); - boolean isSuccess = distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO,localSessionManager.getUserSessions()); + boolean isSuccess = distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO); if (isSuccess) { chat.setStatus("1"); } else { @@ -144,7 +147,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { sendVO.setMsg_type("text"); sendVO.setMessage_length("0"); sendVO.setId(to.getId()); - distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO,localSessionManager.getUserSessions()); + distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO); chat.setStatus("0"); } chatHistoryService.saveNew(chat); @@ -160,7 +163,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid); // 添加到本地群组 - localSessionManager.addGroupSession(zid, session); + this.addGroupSession(zid, session); // 添加到分布式群组 String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); @@ -233,7 +236,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { * 给某个用户发送消息 (兼容原有接口) */ public boolean sendMessageToUser(String loginName, SendVO message) { - return distributedMessageService.sendToUser(loginName, message,localSessionManager.getUserSessions()); + return distributedMessageService.sendToUser(loginName, message); } /** @@ -333,7 +336,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { for (Integer userId : onlineUserIds) { String userIdStr = String.valueOf(userId); - distributedMessageService.sendToUser(userIdStr, message,localSessionManager.getUserSessions()); + distributedMessageService.sendToUser(userIdStr, message); } logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size()); @@ -360,4 +363,13 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { logger.info("添加用户会话成功: userId={}, sessionId={}", userId, session.getId()); } + + + /** + * 添加群组会话 + */ + public void addGroupSession(String groupId, WebSocketSession session) { + groupSessions.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()) + .add(session); + } } \ No newline at end of file