diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java index 037b616c..0eb94b99 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java @@ -1,5 +1,7 @@ package com.suisung.mall.im.common.websocket.service; import com.suisung.mall.im.pojo.vo.SendVO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; @@ -15,6 +17,7 @@ import java.util.concurrent.CopyOnWriteArrayList; @Component public class LocalSessionManager { + private static final Logger log = LoggerFactory.getLogger(LocalSessionManager.class); // 本地存储的用户会话 (userId -> sessions) public final ConcurrentHashMap> userSessions = new ConcurrentHashMap<>(); @@ -25,54 +28,82 @@ public class LocalSessionManager { * 添加用户会话 */ public void addUserSession(String userId, WebSocketSession session) { - userSessions.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>()) - .add(session); + if (userId == null || session == null) { + log.info("添加用户会话失败: 参数为空"); + return; + } + userSessions.compute(userId, (key, existingSessions) -> { + if (existingSessions == null) { + existingSessions = new CopyOnWriteArrayList<>(); + } + if (!existingSessions.contains(session)) { + existingSessions.add(session); + } + return existingSessions; + }); + + log.info("添加用户会话成功: userId={}, sessionId={}", userId, session.getId()); } /** * 移除用户会话 */ public void removeUserSession(String userId, WebSocketSession session) { - List sessions = userSessions.get(userId); - if (sessions != null) { - sessions.remove(session); - if (sessions.isEmpty()) { - userSessions.remove(userId); - } + if (userId == null || session == null) { + return; } + + userSessions.computeIfPresent(userId, (key, sessions) -> { + sessions.remove(session); + return sessions.isEmpty() ? null : sessions; + }); } /** * 发送消息给用户 */ public boolean sendToUser(String userId, SendVO message) { + if (userId == null || message == null) { + log.info("发送消息失败: 参数为空"); + return false; + } + List sessions = userSessions.get(userId); - if (sessions != null && !sessions.isEmpty()) { - boolean success = false; - for (WebSocketSession session : sessions) { - if (session.isOpen()) { - try { - session.sendMessage(new TextMessage(message.toString())); - success = true; - } catch (IOException e) { - // 发送失败,移除会话 - removeUserSession(userId, session); - } + if (sessions == null || sessions.isEmpty()) { + log.info("用户没有活跃会话: {}", userId); + return false; + } + + boolean success = false; + for (WebSocketSession session : sessions) { + if (session != null && session.isOpen()) { + try { + session.sendMessage(new TextMessage(message.toString())); + success = true; + } catch (IOException e) { + log.info("发送消息失败: {}", e.getMessage()); + // 发送失败,移除会话 + removeUserSession(userId, session); } } - return success; } - return false; + return success; } /** * 发送消息给会话 */ public boolean sendToSession(String sessionId, SendVO message) { + if (sessionId == null || message == null) { + return false; + } + // 遍历所有会话找到指定的sessionId for (List sessions : userSessions.values()) { + if (sessions == null) continue; + for (WebSocketSession session : sessions) { - if (session.getId().equals(sessionId) && session.isOpen()) { + if (session != null && session.getId().equals(sessionId) && session.isOpen()) { try { session.sendMessage(new TextMessage(message.toString())); return true; diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java index 41fc1a88..38fb372e 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java @@ -289,7 +289,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { // 广播在线状态更新(只广播到当前服务器,避免循环广播) broadcastLocalOnlineStatus(onlineStatusMsg); - logger.debug("在线状态更新: {} 个用户在线", onlineUsers.size()); + logger.info("在线状态更新: {} 个用户在线", onlineUsers.size()); } catch (Exception e) { logger.error("更新在线状态失败: " + e.getMessage(), e); } @@ -299,6 +299,10 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { * 广播在线状态到本地用户 */ private void broadcastLocalOnlineStatus(SendVO message) { + if (localSessionManager == null) { + logger.error("localSessionManager is null, cannot broadcast online status"); + return; + } // 获取本地所有用户会话 for (List sessions : localSessionManager.userSessions.values()) { for (WebSocketSession session : sessions) {