From c8857caadf912b33adefbce9ef57ae9fb0e31a0c Mon Sep 17 00:00:00 2001 From: liyj <1617420630@qq.com> Date: Thu, 13 Nov 2025 17:18:53 +0800 Subject: [PATCH] =?UTF-8?q?im=E6=9C=8D=E5=8A=A1=E6=96=B0=E5=A2=9E=E5=88=86?= =?UTF-8?q?=E5=B8=83=E5=BC=8Frabbimq=E8=A7=A3=E5=86=B3=E6=96=B9=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../websocket/config/RabbitMQConfig.java | 84 ++++ .../service/DistributedMessageService.java | 125 +++++ .../service/DistributedSessionService.java | 234 +++++++++ .../service/LocalSessionManager.java | 144 ++++++ .../service/MessageConsumerService.java | 233 +++++++++ .../websocket/service/RabbitMQManager.java | 170 +++++++ .../onchat/MallsuiteImSocketHandler.java | 457 +++++++----------- .../websocket/utils/ServerIdGenerator.java | 153 ++++++ .../admin/ChatSocketInfoController.java | 2 +- mall-im/src/main/resources/bootstrap-dev.yml | 9 + .../src/main/resources/bootstrap-local.yml | 9 + mall-im/src/main/resources/bootstrap-prod.yml | 9 + mall-im/src/main/resources/bootstrap-uat.yml | 9 + 13 files changed, 1354 insertions(+), 284 deletions(-) create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/config/RabbitMQConfig.java create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedSessionService.java create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/RabbitMQManager.java create mode 100644 mall-im/src/main/java/com/suisung/mall/im/common/websocket/utils/ServerIdGenerator.java diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/config/RabbitMQConfig.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/config/RabbitMQConfig.java new file mode 100644 index 00000000..d6e4c874 --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/config/RabbitMQConfig.java @@ -0,0 +1,84 @@ +package com.suisung.mall.im.common.websocket.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitMQConfig { + private static Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class); + // 直连交换机 - 用于精确路由 + public static final String IM_DIRECT_EXCHANGE = "im.direct.exchange"; + + // 广播交换机 - 用于广播消息 + public static final String IM_FANOUT_EXCHANGE = "im.fanout.exchange"; + + // 队列前缀 + public static final String IM_QUEUE_PREFIX = "im.queue."; + + // 路由键前缀 + public static final String IM_ROUTING_KEY_PREFIX = "im.server."; + + /** + * 创建直连交换机 - 用于点对点消息 + */ + @Bean + public DirectExchange imDirectExchange() { + return new DirectExchange(IM_DIRECT_EXCHANGE, true, false); + } + + /** + * 创建广播交换机 - 用于广播消息 + */ + @Bean + public FanoutExchange imFanoutExchange() { + return new FanoutExchange(IM_FANOUT_EXCHANGE, true, false); + } + + /** + * JSON消息转换器 - 支持SendVO等复杂对象 + */ + @Bean + public Jackson2JsonMessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } + + /** + * 配置RabbitTemplate + */ + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(jsonMessageConverter()); + + // 设置确认回调 + template.setConfirmCallback((correlationData, ack, cause) -> { + if (!ack) { + logger.info("消息发送失败: {}", cause); + } + }); + + + return template; + } + + /** + * 配置监听器容器工厂 + */ + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setMessageConverter(jsonMessageConverter()); + factory.setConcurrentConsumers(3); + factory.setMaxConcurrentConsumers(10); + factory.setPrefetchCount(1); // 每次处理1条消息 + return factory; + } +} \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java new file mode 100644 index 00000000..230fb31f --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedMessageService.java @@ -0,0 +1,125 @@ +package com.suisung.mall.im.common.websocket.service; + +import com.suisung.mall.im.pojo.vo.SendVO; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.IM_FANOUT_EXCHANGE; + + +@Service +public class DistributedMessageService { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private DistributedSessionService sessionService; + + @Autowired + private LocalSessionManager localSessionManager; + + @Autowired + private RabbitMQManager rabbitMQManager; + + /** + * 发送消息给用户 + */ + public boolean sendToUser(String targetUserId, SendVO message) { + String targetServer = sessionService.getUserServer(targetUserId); + + if (targetServer == null) { + // 用户离线,可以存储离线消息 + return false; + } + + if (isCurrentServer(targetServer)) { + // 用户在当前服务器,直接发送 + return localSessionManager.sendToUser(targetUserId, message); + } else { + // 用户在其他服务器,通过RabbitMQ转发 + forwardToUser(targetServer, targetUserId, message); + return true; + } + } + + /** + * 发送消息给群组 + */ + public void sendToGroup(String groupId, SendVO message, String excludeSessionId) { + // 获取群组所有成员 + List> members = sessionService.getGroupMembers(groupId); + + for (Map member : members) { + String userId = (String) member.get("userId"); + String sessionId = (String) member.get("sessionId"); + String memberServer = (String) member.get("serverId"); + + // 排除发送者自己 + if (sessionId.equals(excludeSessionId)) { + continue; + } + + if (isCurrentServer(memberServer)) { + // 成员在当前服务器 + localSessionManager.sendToSession(sessionId, message); + } else { + // 成员在其他服务器 + forwardToSession(memberServer, sessionId, message); + } + } + } + + /** + * 广播消息给所有在线用户 + */ + public void broadcast(SendVO message, String excludeServerId) { + // 创建广播消息 + Map broadcastMsg = new HashMap<>(); + broadcastMsg.put("type", "BROADCAST"); + broadcastMsg.put("message", message); + broadcastMsg.put("excludeServer", excludeServerId); + broadcastMsg.put("sourceServer", rabbitMQManager.getServerId()); + + // 广播到所有服务器(这里简化处理,实际应该获取所有服务器列表) + // 可以通过Redis维护服务器列表,然后遍历发送 + rabbitTemplate.convertAndSend(IM_FANOUT_EXCHANGE, "im.server.all", broadcastMsg); + } + + /** + * 转发消息到用户 + */ + private void forwardToUser(String targetServer, String userId, SendVO message) { + Map forwardMsg = new HashMap<>(); + forwardMsg.put("type", "SEND_TO_USER"); + forwardMsg.put("targetUserId", userId); + forwardMsg.put("message", message); + forwardMsg.put("sourceServer", rabbitMQManager.getServerId()); + + String routingKey = rabbitMQManager.getTargetRoutingKey(targetServer); + rabbitTemplate.convertAndSend(IM_FANOUT_EXCHANGE, routingKey, forwardMsg); + } + + /** + * 转发消息到会话 + */ + private void forwardToSession(String targetServer, String sessionId, SendVO message) { + Map forwardMsg = new HashMap<>(); + forwardMsg.put("type", "SEND_TO_SESSION"); + forwardMsg.put("targetSessionId", sessionId); + forwardMsg.put("message", message); + forwardMsg.put("sourceServer", rabbitMQManager.getServerId()); + + String routingKey = rabbitMQManager.getTargetRoutingKey(targetServer); + rabbitTemplate.convertAndSend(IM_FANOUT_EXCHANGE, routingKey, forwardMsg); + } + + private boolean isCurrentServer(String serverId) { + return serverId.equals(rabbitMQManager.getServerId()); + } +} \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedSessionService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedSessionService.java new file mode 100644 index 00000000..3ec3a7d9 --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/DistributedSessionService.java @@ -0,0 +1,234 @@ +package com.suisung.mall.im.common.websocket.service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; + +@Service +public class DistributedSessionService { + + private static final Logger log = LoggerFactory.getLogger(DistributedSessionService.class); + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private RabbitMQManager rabbitMQManager; // 使用RabbitMQManager获取serverId + + private static final String USER_SERVER_KEY = "im:user:server:"; + private static final String USER_SESSIONS_KEY = "im:user:sessions:"; + private static final String SERVER_USERS_KEY = "im:server:users:"; + private static final String GROUP_SESSIONS_KEY = "im:group:sessions:"; + + /** + * 注册用户会话 + */ + public void registerUserSession(String userId, String sessionId, Map attributes) { + String userKey = USER_SESSIONS_KEY + userId; + String serverKey = USER_SERVER_KEY + userId; + String serverUsersKey = SERVER_USERS_KEY + getServerId(); + + // 存储用户会话信息 + Map sessionInfo = new HashMap<>(); + sessionInfo.put("sessionId", sessionId); + sessionInfo.put("serverId", getServerId()); + sessionInfo.put("connectTime", System.currentTimeMillis()); + sessionInfo.put("lastHeartbeat", System.currentTimeMillis()); + sessionInfo.put("attributes", attributes); + + redisTemplate.opsForHash().put(userKey, sessionId, sessionInfo); + redisTemplate.expire(userKey, Duration.ofMinutes(30)); + + // 更新用户-服务器映射 + redisTemplate.opsForValue().set(serverKey, getServerId(), Duration.ofMinutes(30)); + + // 添加到服务器用户集合 + redisTemplate.opsForSet().add(serverUsersKey, userId); + redisTemplate.expire(serverUsersKey, Duration.ofMinutes(30)); + + log.info("用户会话注册: userId={}, sessionId={}, serverId={}", userId, sessionId, getServerId()); + } + + /** + * 注销用户会话 + */ + public void unregisterUserSession(String userId, String sessionId) { + String userKey = USER_SESSIONS_KEY + userId; + String serverUsersKey = SERVER_USERS_KEY + getServerId(); + + // 从用户会话中移除 + redisTemplate.opsForHash().delete(userKey, sessionId); + + // 从服务器用户集合中移除 + redisTemplate.opsForSet().remove(serverUsersKey, userId); + + // 检查用户是否还有活跃会话 + Long size = redisTemplate.opsForHash().size(userKey); + if (size == null || size == 0) { + String serverKey = USER_SERVER_KEY + userId; + redisTemplate.delete(serverKey); + } + } + + /** + * 获取用户所在服务器 + */ + public String getUserServer(String userId) { + String key = USER_SERVER_KEY + userId; + return (String) redisTemplate.opsForValue().get(key); + } + + /** + * 获取用户所有会话ID + */ + public Set getUserSessions(String userId) { + String key = USER_SESSIONS_KEY + userId; + return redisTemplate.opsForHash().keys(key).stream() + .map(Object::toString) + .collect(HashSet::new, HashSet::add, HashSet::addAll); + } + + /** + * 添加用户到群组 + */ + public void addUserToGroup(String groupId, String userId, String sessionId) { + String groupKey = GROUP_SESSIONS_KEY + groupId; + Map memberInfo = new HashMap<>(); + memberInfo.put("userId", userId); + memberInfo.put("sessionId", sessionId); + memberInfo.put("serverId", getServerId()); + memberInfo.put("joinTime", System.currentTimeMillis()); + + redisTemplate.opsForHash().put(groupKey, sessionId, memberInfo); + redisTemplate.expire(groupKey, Duration.ofHours(2)); + } + + /** + * 从群组移除用户 + */ + public void removeUserFromGroup(String groupId, String sessionId) { + String groupKey = GROUP_SESSIONS_KEY + groupId; + redisTemplate.opsForHash().delete(groupKey, sessionId); + } + + /** + * 获取群组所有成员 + */ + public List> getGroupMembers(String groupId) { + String groupKey = GROUP_SESSIONS_KEY + groupId; + return redisTemplate.opsForHash().values(groupKey).stream() + .map(obj -> (Map) obj) + .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); + } + + /** + * 更新心跳 + */ + public void updateHeartbeat(String userId, String sessionId) { + String userKey = USER_SESSIONS_KEY + userId; + Map sessionInfo = (Map) redisTemplate.opsForHash().get(userKey, sessionId); + if (sessionInfo != null) { + sessionInfo.put("lastHeartbeat", System.currentTimeMillis()); + redisTemplate.opsForHash().put(userKey, sessionId, sessionInfo); + redisTemplate.expire(userKey, Duration.ofMinutes(30)); + } + } + + /** + * 获取所有在线用户ID列表 + */ + public List getOnlineUserIds() { + // 方法1: 通过服务器用户集合获取(推荐) + String serverUsersKey = SERVER_USERS_KEY + new RabbitMQManager().getServerId(); + Set userIds = redisTemplate.opsForSet().members(serverUsersKey); + + if (userIds != null && !userIds.isEmpty()) { + return userIds.stream() + .map(userIdStr -> { + try { + return Integer.parseInt(userIdStr.toString()); + } catch (NumberFormatException e) { + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + return new ArrayList<>(); + } + + /** + * 获取所有服务器的在线用户ID列表(全局) + */ + public List getAllOnlineUserIds() { + // 获取所有服务器的用户集合 + Set serverKeys = redisTemplate.keys(SERVER_USERS_KEY + "*"); + List allUserIds = new ArrayList<>(); + + if (serverKeys != null) { + for (String serverKey : serverKeys) { + Set userIds = redisTemplate.opsForSet().members(serverKey); + if (userIds != null) { + userIds.forEach(userIdStr -> { + try { + allUserIds.add(Integer.parseInt(userIdStr.toString())); + } catch (NumberFormatException e) { + // 忽略格式错误的用户ID + } + }); + } + } + } + + // 去重 + return allUserIds.stream().distinct().collect(Collectors.toList()); + } + + /** + * 获取在线用户登录名列表 + */ + public List getOnlineLoginNames() { + List loginNames = new ArrayList<>(); + List userIds = getOnlineUserIds(); + + // 这里可以根据用户ID获取对应的登录名 + // 实际实现可能需要调用用户服务 + for (Integer userId : userIds) { + // 模拟获取登录名,实际应该从用户服务或缓存中获取 + String loginName = "user_" + userId; + loginNames.add(loginName); + } + + return loginNames; + } + + /** + * 获取指定用户的会话信息 + */ + public List> getUserSessionsInfo(String userId) { + String userKey = USER_SESSIONS_KEY + userId; + Map sessions = redisTemplate.opsForHash().entries(userKey); + + return sessions.values().stream() + .map(obj -> (Map) obj) + .collect(Collectors.toList()); + } + + /** + * 检查用户是否在线 + */ + public boolean isUserOnline(String userId) { + String userKey = USER_SESSIONS_KEY + userId; + Long size = redisTemplate.opsForHash().size(userKey); + return size != null && size > 0; + } + + private String getServerId() { + return rabbitMQManager.getServerId(); + } +} \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java new file mode 100644 index 00000000..037b616c --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/LocalSessionManager.java @@ -0,0 +1,144 @@ +package com.suisung.mall.im.common.websocket.service; +import com.suisung.mall.im.pojo.vo.SendVO; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +@Component +public class LocalSessionManager { + + // 本地存储的用户会话 (userId -> sessions) + public final ConcurrentHashMap> userSessions = new ConcurrentHashMap<>(); + + // 本地存储的群组会话 (groupId -> sessions) + private final ConcurrentHashMap> groupSessions = new ConcurrentHashMap<>(); + + /** + * 添加用户会话 + */ + public void addUserSession(String userId, WebSocketSession session) { + userSessions.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>()) + .add(session); + } + + /** + * 移除用户会话 + */ + public void removeUserSession(String userId, WebSocketSession session) { + List sessions = userSessions.get(userId); + if (sessions != null) { + sessions.remove(session); + if (sessions.isEmpty()) { + userSessions.remove(userId); + } + } + } + + /** + * 发送消息给用户 + */ + public boolean sendToUser(String userId, SendVO message) { + List sessions = userSessions.get(userId); + if (sessions != null && !sessions.isEmpty()) { + boolean success = false; + for (WebSocketSession session : sessions) { + if (session.isOpen()) { + try { + session.sendMessage(new TextMessage(message.toString())); + success = true; + } catch (IOException e) { + // 发送失败,移除会话 + removeUserSession(userId, session); + } + } + } + return success; + } + return false; + } + + /** + * 发送消息给会话 + */ + public boolean sendToSession(String sessionId, SendVO message) { + // 遍历所有会话找到指定的sessionId + for (List sessions : userSessions.values()) { + for (WebSocketSession session : sessions) { + if (session.getId().equals(sessionId) && session.isOpen()) { + try { + session.sendMessage(new TextMessage(message.toString())); + return true; + } catch (IOException e) { + return false; + } + } + } + } + return false; + } + + /** + * 添加群组会话 + */ + public void addGroupSession(String groupId, WebSocketSession session) { + groupSessions.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()) + .add(session); + } + + /** + * 移除群组会话 + */ + public void removeGroupSession(String groupId, WebSocketSession session) { + List sessions = groupSessions.get(groupId); + if (sessions != null) { + sessions.remove(session); + if (sessions.isEmpty()) { + groupSessions.remove(groupId); + } + } + } + + /** + * 获取本地所有在线用户ID + */ + public List getLocalOnlineUserIds() { + List userIds = new ArrayList<>(); + + for (String userIdStr : userSessions.keySet()) { + try { + userIds.add(Integer.parseInt(userIdStr)); + } catch (NumberFormatException e) { + // 忽略格式错误的用户ID + } + } + + return userIds; + } + + /** + * 获取本地所有在线用户登录名 + */ + public List getLocalOnlineLoginNames() { + return new ArrayList<>(userSessions.keySet()); + } + + /** + * 获取本地会话数量统计 + */ + public Map getLocalSessionStats() { + Map stats = new HashMap<>(); + stats.put("totalUsers", userSessions.size()); + stats.put("totalSessions", userSessions.values().stream().mapToInt(List::size).sum()); + stats.put("totalGroups", groupSessions.size()); + + return stats; + } +} \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java new file mode 100644 index 00000000..f1307d8b --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/MessageConsumerService.java @@ -0,0 +1,233 @@ +package com.suisung.mall.im.common.websocket.service; + +import com.suisung.mall.im.pojo.vo.SendVO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Map; + +@Service +public class MessageConsumerService { + private static Logger logger = LoggerFactory.getLogger(MessageConsumerService.class); + @Autowired + private LocalSessionManager localSessionManager; + + @Autowired + private RabbitMQManager rabbitMQManager; + + /** + * 监听当前服务器的队列 + * 使用 RabbitListener 监听动态队列 + */ + @RabbitListener(queues = "#{rabbitMQManager.queueName}") + public void consumeForwardMessage(Map message) { + try { + String type = (String) message.get("type"); + System.out.println("收到RabbitMQ消息, 类型: " + type + ", 服务器: " + rabbitMQManager.getServerId()); + + switch (type) { + case "SEND_TO_USER": + handleSendToUser(message); + break; + + case "SEND_TO_SESSION": + handleSendToSession(message); + break; + + case "BROADCAST": + handleBroadcast(message); + break; + + default: + logger.info("未知消息类型: {}", type); + } + } catch (Exception e) { + logger.info("处理RabbitMQ消息失败: {}", e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 处理发送给用户的消息 + */ + private void handleSendToUser(Map message) { + String userId = (String) message.get("targetUserId"); + Object messageObj = message.get("message"); + + SendVO userMsg = null; + if (messageObj instanceof SendVO) { + userMsg = (SendVO) messageObj; + } else if (messageObj instanceof Map) { + userMsg = convertMapToSendVO((Map) messageObj); + } + + if (userMsg != null) { + boolean success = localSessionManager.sendToUser(userId, userMsg); + logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败"); + } else { + logger.info("消息格式错误,无法发送给用户: {}", userId); + } + } + + /** + * 处理发送给会话的消息 + */ + private void handleSendToSession(Map message) { + String sessionId = (String) message.get("targetSessionId"); + Object messageObj = message.get("message"); + + SendVO sessionMsg = null; + if (messageObj instanceof SendVO) { + sessionMsg = (SendVO) messageObj; + } else if (messageObj instanceof Map) { + sessionMsg = convertMapToSendVO((Map) messageObj); + } + + if (sessionMsg != null) { + boolean success = localSessionManager.sendToSession(sessionId, sessionMsg); + logger.info("发送消息给会话 {}: {}", sessionId, success ? "成功" : "失败"); + } else { + logger.info("消息格式错误,无法发送给会话: {}", sessionId); + } + } + + /** + * 处理广播消息 + */ + private void handleBroadcast(Map message) { + String excludeServer = (String) message.get("excludeServer"); + String currentServer = rabbitMQManager.getServerId(); + + // 如果当前服务器不在排除列表中,则处理广播 + if (!currentServer.equals(excludeServer)) { + Object messageObj = message.get("message"); + SendVO broadcastMsg = null; + + if (messageObj instanceof SendVO) { + broadcastMsg = (SendVO) messageObj; + } else if (messageObj instanceof Map) { + broadcastMsg = convertMapToSendVO((Map) messageObj); + } + + if (broadcastMsg != null) { + // 实现广播给所有本地用户 + broadcastToAllLocalUsers(broadcastMsg); + logger.info("广播消息给所有本地用户"); + } + } else { + logger.info("当前服务器被排除在广播之外: {}", currentServer); + } + } + + /** + * 广播给所有本地用户 + */ + private void broadcastToAllLocalUsers(SendVO message) { + // 这里可以实现具体的广播逻辑 + // 例如遍历所有本地会话并发送消息 + logger.info("执行广播逻辑,消息内容: {}", message.getContent()); + } + + /** + * 将Map转换为SendVO + */ + private SendVO convertMapToSendVO(Map messageMap) { + try { + SendVO sendVO = new SendVO(); + + // 设置基本字段 + if (messageMap.get("toid") != null) { + sendVO.setToid((String) messageMap.get("toid")); + } + if (messageMap.get("id") != null) { + sendVO.setId((String) messageMap.get("id")); + } + if (messageMap.get("content") != null) { + sendVO.setContent((String) messageMap.get("content")); + } + if (messageMap.get("type") != null) { + sendVO.setType((String) messageMap.get("type")); + } + if (messageMap.get("username") != null) { + sendVO.setUsername((String) messageMap.get("username")); + } + if (messageMap.get("avatar") != null) { + sendVO.setAvatar((String) messageMap.get("avatar")); + } + if (messageMap.get("fromid") != null) { + sendVO.setFromid((String) messageMap.get("fromid")); + } + if (messageMap.get("time") != null) { + sendVO.setTime((Long) messageMap.get("time")); + } + if (messageMap.get("status") != null) { + sendVO.setStatus((Integer) messageMap.get("status")); + } + if (messageMap.get("msg_type") != null) { + sendVO.setMsg_type((String) messageMap.get("msg_type")); + } + if (messageMap.get("item_id") != null) { + sendVO.setItem_id(Long.valueOf((String) messageMap.get("item_id"))); + } + if (messageMap.get("message_length") != null) { + sendVO.setMessage_length((String) messageMap.get("message_length")); + } + if (messageMap.get("zid") != null) { + sendVO.setZid((String) messageMap.get("zid")); + } + if (messageMap.get("user_id") != null) { + sendVO.setUser_id((Integer) messageMap.get("user_id")); + } + if (messageMap.get("friend_id") != null) { + sendVO.setFriend_id((Integer) messageMap.get("friend_id")); + } + if (messageMap.get("sendmethod") != null) { + sendVO.setSendmethod((String) messageMap.get("sendmethod")); + } + if (messageMap.get("message_id") != null) { + sendVO.setMessage_id(Integer.valueOf((String) messageMap.get("message_id"))); + } + if (messageMap.get("mine") != null) { + sendVO.setMine((Boolean) messageMap.get("mine")); + } + + return sendVO; + } catch (Exception e) { + logger.info("Map转换为SendVO失败: {}", e.getMessage()); + return null; + } + } + + /** + * 处理连接建立事件(可选,用于服务器间同步) + */ + @RabbitListener(queues = "#{rabbitMQManager.queueName}") + public void handleSessionEvent(Map event) { + String eventType = (String) event.get("eventType"); + String userId = (String) event.get("userId"); + String sessionId = (String) event.get("sessionId"); + + logger.info("收到会话事件: {}, 用户: {}, 会话: {}", eventType, userId, sessionId); + + // 可以根据需要处理各种会话事件 + switch (eventType) { + case "SESSION_CREATED": + // 处理会话创建事件 + break; + case "SESSION_CLOSED": + // 处理会话关闭事件 + break; + } + } + + /** + * 监听广播队列 + */ + @RabbitListener(queues = "im.broadcast.queue.#{rabbitMQManager.serverId}") + public void consumeBroadcastMessage(Map message) { + handleBroadcast(message); + } +} \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/RabbitMQManager.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/RabbitMQManager.java new file mode 100644 index 00000000..5d2e32ad --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/RabbitMQManager.java @@ -0,0 +1,170 @@ +package com.suisung.mall.im.common.websocket.service; + +import com.suisung.mall.im.common.websocket.service.onchat.MallsuiteImSocketHandler; +import com.suisung.mall.im.common.websocket.utils.ServerIdGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.*; + +@Service +public class RabbitMQManager { + private static Logger logger = LoggerFactory.getLogger(RabbitMQManager.class); + @Autowired + private AmqpAdmin amqpAdmin; + + @Autowired + private DirectExchange imDirectExchange; + + @Autowired + private FanoutExchange imFanoutExchange; + + @Value("${im.server.id:#{null}}") // 可选配置,优先使用配置的ID + private String configuredServerId; + + private String serverId; + private String queueName; + private String routingKey; + private String broadcastQueueName; + + @PostConstruct + public void init() { + // 确定服务器ID:优先使用配置,否则自动生成 + this.serverId = determineServerId(); + this.queueName = IM_QUEUE_PREFIX + serverId; + this.routingKey = IM_ROUTING_KEY_PREFIX + serverId; + this.broadcastQueueName = "im.broadcast.queue." + serverId; + + // 创建点对点队列 + Queue queue = new Queue(queueName, true, false, false); + amqpAdmin.declareQueue(queue); + + // 绑定到直连交换机 + Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, + IM_DIRECT_EXCHANGE, routingKey, null); + amqpAdmin.declareBinding(binding); + + // 创建广播队列 + Queue broadcastQueue = new Queue(broadcastQueueName, true, false, false); + amqpAdmin.declareQueue(broadcastQueue); + + // 绑定广播队列到广播交换机 + Binding broadcastBinding = new Binding(broadcastQueueName, + Binding.DestinationType.QUEUE, + IM_FANOUT_EXCHANGE, + "", + null); + amqpAdmin.declareBinding(broadcastBinding); + + logger.info("=== RabbitMQ队列初始化完成 ==="); + logger.info("服务器ID: {}", serverId); + logger.info("点对点队列: {}", queueName); + logger.info("路由键: {}", routingKey); + logger.info("广播队列: {}", broadcastQueueName); + logger.info("============================"); + } + + /** + * 确定服务器ID + */ + private String determineServerId() { + // 1. 优先使用配置的服务器ID + if (configuredServerId != null && !configuredServerId.trim().isEmpty()) { + return configuredServerId.trim(); + } + + // 2. 使用自动生成的稳定服务器ID + return ServerIdGenerator.generateServerId(); + } + + public String getServerId() { + return serverId; + } + + public String getQueueName() { + return queueName; + } + + public String getRoutingKey() { + return routingKey; + } + + /** + * 获取目标服务器的路由键 + */ + public String getTargetRoutingKey(String targetServerId) { + return IM_ROUTING_KEY_PREFIX + targetServerId; + } + + /** + * 获取服务器信息(用于监控) + */ + public ServerInfo getServerInfo() { + ServerInfo info = new ServerInfo(); + info.setServerId(serverId); + info.setQueueName(queueName); + info.setRoutingKey(routingKey); + info.setBroadcastQueueName(broadcastQueueName); + info.setStartTime(System.currentTimeMillis()); + return info; + } + + /** + * 检查队列是否存在 + */ + public boolean isQueueExists(String queueName) { + try { + Queue queue = new Queue(queueName); + amqpAdmin.declareQueue(queue); + return true; + } catch (Exception e) { + return false; + } + } + + /** + * 初始化广播队列 + */ + private void initBroadcastQueue() { + // 为当前服务器创建广播队列 + String broadcastQueueName = "im.broadcast.queue." + serverId; + Queue broadcastQueue = new Queue(broadcastQueueName, true, false, false); + amqpAdmin.declareQueue(broadcastQueue); + + // 绑定到广播交换机 + Binding broadcastBinding = BindingBuilder.bind(broadcastQueue) + .to(imFanoutExchange); + amqpAdmin.declareBinding(broadcastBinding); + + System.out.println("广播队列初始化完成: " + broadcastQueueName); + } + + /** + * 服务器信息类 + */ + public static class ServerInfo { + private String serverId; + private String queueName; + private String routingKey; + private String broadcastQueueName; + private long startTime; + + // getter和setter + public String getServerId() { return serverId; } + public void setServerId(String serverId) { this.serverId = serverId; } + public String getQueueName() { return queueName; } + public void setQueueName(String queueName) { this.queueName = queueName; } + public String getRoutingKey() { return routingKey; } + public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } + public String getBroadcastQueueName() { return broadcastQueueName; } + public void setBroadcastQueueName(String broadcastQueueName) { this.broadcastQueueName = broadcastQueueName; } + public long getStartTime() { return startTime; } + public void setStartTime(long startTime) { this.startTime = startTime; } + } +} diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java index 82ed6eb2..41fc1a88 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java @@ -6,6 +6,9 @@ import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONUtil; import com.suisung.mall.common.feignService.AccountService; import com.suisung.mall.common.utils.CheckUtil; +import com.suisung.mall.im.common.websocket.service.DistributedMessageService; +import com.suisung.mall.im.common.websocket.service.DistributedSessionService; +import com.suisung.mall.im.common.websocket.service.LocalSessionManager; import com.suisung.mall.im.common.websocket.utils.Constants; import com.suisung.mall.im.pojo.entity.ChatHistory; import com.suisung.mall.im.pojo.vo.MineDTO; @@ -19,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.socket.*; + import java.io.IOException; import java.util.*; @@ -26,51 +30,27 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { private static Logger logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class); - private static ArrayList users; - private static ArrayList usersStr; + // 移除原有的静态集合,使用注入的服务 + // private static ArrayList users; + // private static ArrayList usersStr; + // private static Map> userSession; + // private static Map> groupSession; - //存入用户的所有终端的连接信息 - private static Map> userSession; + @Autowired + private DistributedSessionService distributedSessionService; - //群组信息 - private static Map> groupSession; + @Autowired + private DistributedMessageService distributedMessageService; - static { - users = new ArrayList<>(); - usersStr = new ArrayList<>(); - userSession = Collections.synchronizedMap(new HashMap<>()); - groupSession = Collections.synchronizedMap(new HashMap<>()); - logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class); - } - - public ArrayList getOnlineLoginNames() { - ArrayList onlineLoginNames = new ArrayList(); - for (WebSocketSession user : users) { - String userName = (String) user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); - if (userName != null) { - onlineLoginNames.add(userName); - } - } - return onlineLoginNames; - - } - - public ArrayList getOnlineLoginUserId() { - ArrayList onlineLoginUserId = new ArrayList(); - for (WebSocketSession user : users) { - Integer user_id = Convert.toInt(user.getAttributes().get("user_id")); - if (CheckUtil.isNotEmpty(user_id)) { - onlineLoginUserId.add(user_id); - } - } - return onlineLoginUserId; - - } + @Autowired + private LocalSessionManager localSessionManager; @Autowired private ChatHistoryService chatHistoryService; + @Autowired private LayGroupService layGroupService; + @Autowired private AccountService accountService; @@ -78,99 +58,36 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.debug("connect to the websocket success......"); + + String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); String sessionId = session.getId(); - users.add(session); + + // 存储到本地会话管理 + localSessionManager.addUserSession(loginUserId, session); + + // 注册到分布式会话服务 + Map attributes = new HashMap<>(); + attributes.put("user_id", session.getAttributes().get("user_id")); + attributes.put("loginName", loginUserId); + distributedSessionService.registerUserSession(loginUserId, sessionId, attributes); + + // 处理离线消息等原有逻辑... this.updateOnlineStatus(); - String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);//获取刚上线用户的loginName - List loginSessions = userSession.get(loginUserId); - if (CollUtil.isEmpty(loginSessions)) { - if (null == loginSessions) { - loginSessions = new ArrayList<>(); - } - - loginSessions.add(session); - userSession.put(loginUserId, loginSessions); - } else { - if (!loginSessions.contains(session)) { - loginSessions.add(session); - } - } - - if (loginUserId != null) { - SendVO msg = new SendVO(); - //读取离线信息 - ChatHistory chat = new ChatHistory(); - chat.setReceiver(loginUserId);//发给刚上线的用户信息 - chat.setStatus("0"); - List list = chatHistoryService.findList(chat); - /* - for(ChatHistory c : list){ - String sender=""; - String receiver=""; - - if("friend".equals(c.getType()) || "user".equals(c.getType())){//如果是个人信息 - sender = c.getSender(); //todo 确认数据库存的数据uid user_id 不一致 - msg.setId(sender); - msg.setToid(c.getReceiver()); - msg.setContent(c.getMsg()); - //todu 缺少数据 - //msg = sender+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_; - }else if(c.getType().contains("_group")){//如果是直播群组信息 - msg.setType("group"); - - sender = c.getSender(); - String groupId = c.getReceiver().split("_")[0];//直播群组id - receiver=c.getReceiver().split("_")[1];//直播群组所属user_id - msg = sender+Constants._msg_+groupId+Constants._msg_+c.getMsg()+Constants._msg_; - - }else{//如果是群组信息 - msg.setType("group"); - - String groupId = c.getSender().split(Constants._msg_)[0];//群组id - sender=c.getSender().split(Constants._msg_)[1];//发送者loginName - msg = groupId+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_; - - } - - //todo 应该冗余 - //获取用户基本信息 - AccountUserBase accountUserBase = accountService.getUserBase(Integer.valueOf(sender)); - //AccountUserBase accountUserBase= result.getFenResult(AccountUserBase.class); - //AccountUserInfo userInfo=accountService.getUserInfo(accountUserBase.getUser_id()); - AccountUserInfo userInfo=accountService.getUserInfo(Integer.valueOf(sender)); - - - msg.setUser_id(Convert.toInt(session.getAttributes().get("user_id"))); - msg.setUsername(accountUserBase.getUser_nickname()); - msg.setAvatar(userInfo.getUser_avatar()); - //msg.setMessage_id(c.getId()); - msg.setFromid(msg.getId()); - msg.setTime(c.getCreate_date().getTime()); - msg.setStatus(0); //? - msg.setMsg(new HashMap()); - - boolean isSuccess = this.sendMessageToUser(loginUserId, msg); - if(isSuccess) { - c.setStatus("1");//标记为已读 - chatHistoryService.saveNew(c); - } - } - */ - } } //接收js侧发送来的用户信息 @Override public void handleMessage(WebSocketSession session, WebSocketMessage socketMessage) throws Exception { ReceiveDTO receiveDTO = JSONUtil.toBean(socketMessage.getPayload().toString(), ReceiveDTO.class); - if (receiveDTO != null) {//发送消息 + if (receiveDTO != null) { SendVO sendVO = new SendVO(); MineDTO mine = receiveDTO.getMine(); ToDTO to = receiveDTO.getTo(); + // 设置sendVO的代码保持不变... sendVO.setToid(to.getId()); - sendVO.setZid(to.getZid()); //群组特有 + sendVO.setZid(to.getZid()); sendVO.setId(mine.getId()); Integer toUserId = Convert.toInt(session.getAttributes().get("user_id")); sendVO.setUser_id(toUserId); @@ -182,32 +99,33 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { to.setType("friend"); } - sendVO.setType(to.getType()); //是从to中读取Type + sendVO.setType(to.getType()); sendVO.setSendmethod(sendVO.getType()); - sendVO.setContent(mine.getContent()); sendVO.setMessage_id(mine.getMessage_id()); - sendVO.setFromid(mine.getId()); sendVO.setTime(new Date().getTime()); - sendVO.setStatus(0); //? + sendVO.setStatus(0); sendVO.setMsg_type(mine.getType()); sendVO.setItem_id(mine.getItem_id()); sendVO.setMsg(new HashMap()); sendVO.setMessage_length(mine.getLength()); - String sender = mine.getId();//信息发送者登录名(loginName)或user_id - String receiver = to.getId();//信息接收者,如果是私聊就是用户loginName,如果是群聊就是群组id + String sender = mine.getId(); + String receiver = to.getId(); String msg = mine.getContent(); String avatar = mine.getAvatar(); String type = to.getType(); - String senderName = mine.getUsername();//发送者姓名(name) + String senderName = mine.getUsername(); + + // 更新心跳 + distributedSessionService.updateHeartbeat(sender, session.getId()); if (!mine.getId().equals(to.getId())) { sendVO.setMine(false); - //保存聊天记录 + if ("friend".equals(type) || "user".equals(type)) { - //如果是私聊 + // 私聊消息 - 使用分布式发送 ChatHistory chat = new ChatHistory(); chat.setId(IdUtil.simpleUUID()); chat.setSender(sender); @@ -215,124 +133,49 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { chat.setMsg(msg); chat.setType("friend"); chat.setCreate_date(new Date()); - boolean isSuccess = this.sendMessageToUser(receiver, sendVO); - if (isSuccess) {//如果信息发送成功 - chat.setStatus("1");//设置为已读 + + boolean isSuccess = distributedMessageService.sendToUser(receiver, sendVO); + if (isSuccess) { + chat.setStatus("1"); } else { + // 发送离线消息提醒 sendVO.setToid(mine.getId()); sendVO.setContent(to.getName() + " 对方现在离线,他将在上线后收到你的消息!"); sendVO.setMsg_type("text"); sendVO.setMessage_length("0"); sendVO.setId(to.getId()); - this.sendMessageToUser(sender, sendVO);//同时向本人发送对方不在线消息 - chat.setStatus("0");//设置为未读 + distributedMessageService.sendToUser(sender, sendVO); + chat.setStatus("0"); } chatHistoryService.saveNew(chat); - } else if ("group".equals(type)) {//如果是群聊 - // 临时,不经过数据库 - List groupLoginSession = groupSession.get(to.getZid()); - if (!CollUtil.isEmpty(groupLoginSession)) { - for (WebSocketSession gs : groupLoginSession) { + } else if ("group".equals(type)) { + // 群聊消息 - 使用分布式发送 + String groupId = to.getZid(); + distributedMessageService.sendToGroup(groupId, sendVO, session.getId()); - if (sender.equals(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString())) {//群消息不发给自己,但是存一条记录当做聊天记录查询 - - } else { - //设置接受用户 - sendVO.setToid(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString()); - boolean isSuccess = this.sendMessageToUser(to.getName(), sendVO); - if (isSuccess) { - - } else { - } - } - - //chatHistoryService.saveNew(chat); - } - } - - - /* - List layGroupUserlist = new ArrayList(); - - //群主 - LayGroupUser owner = new LayGroupUser(); - LayGroup layGroup = layGroupService.get(receiver); - owner.setUser_id(layGroup.getCreate_by()); - layGroupUserlist.add(owner); - - //群成员1 - List zlist = layGroupService.getUsersByGroup(layGroupService.get(receiver)); - layGroupUserlist.addAll(zlist); - - for (LayGroupUser lgUser : layGroupUserlist) { - AccountUserBase user_base_row = accountService.getUserBase(Integer.valueOf(lgUser.getUser_id())); - ChatHistory chat = new ChatHistory(); - chat.setId(IdUtil.simpleUUID()); - //群聊时信息先发送给群聊id(即receiver),在后台转发给所有非发送者(sender)的人的话,群聊id(即receiver)就变成发送者。 - String groupId = receiver; - //保存聊天记录 - chat.setSender(groupId + Constants._msg_ + sender);//群聊时保存群聊id和发送者id - chat.setReceiver(user_base_row.getUser_account());//群中所有信息获得者人员 - chat.setMsg(msg); - chat.setType("group"); - //message = groupId + Constants._msg_ + user_base_row.getUser_account() + Constants._msg_ + msg + Constants._msg_ + avatar + Constants._msg_ + type + Constants._msg_ + senderName + Constants._msg_ + datatime; - if (sender.equals(user_base_row.getUser_account())) {//群消息不发给自己,但是存一条记录当做聊天记录查询 - chat.setStatus("1");//发送成功,设置为已读 - } else { - boolean isSuccess = this.sendMessageToUser(user_base_row.getUser_account(), sendVO); - if (isSuccess) { - chat.setStatus("1");//发送成功,设置为已读 - } else { - chat.setStatus("0");//用户不在线,设置为未读 - } - } - - chatHistoryService.saveNew(chat); - } - */ - - } else if ("join_group".equals(type)) { //临时群组,聚焦当前窗口 + } else if ("join_group".equals(type)) { + // 加入群组 String zid = to.getZid(); - - //设置session,属性上标出所属群组: 目前无同时多个群组需求,如果存在,此处存入list session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid); - //设置群组中用户 - List groupLoginSession = groupSession.get(zid); + // 添加到本地群组 + localSessionManager.addGroupSession(zid, session); - if (CollUtil.isEmpty(groupLoginSession)) { - if (null == groupLoginSession) { - groupLoginSession = new ArrayList<>(); - } + // 添加到分布式群组 + String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); + distributedSessionService.addUserToGroup(zid, userId, session.getId()); - groupLoginSession.add(session); - groupSession.put(zid, groupLoginSession); - } else { - if (!groupLoginSession.contains(session)) { - groupLoginSession.add(session); - } - } - - //todo通知已存在群组用户消息 可启动独立task - } else if ("leave_group".equals(type)) { //临时群组,聚焦当前窗口 + } else if ("leave_group".equals(type)) { + // 离开群组 String zid = to.getZid(); - - //设置session,属性上标出所属群组 session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, null); - //设置群组中用户 - List groupLoginSession = groupSession.get(zid); + // 从本地群组移除 + localSessionManager.removeGroupSession(zid, session); - if (CollUtil.isEmpty(groupLoginSession)) { - } else { - if (groupLoginSession.contains(session)) { - groupLoginSession.remove(session); - } - } - - //todo通知已存在群组用户消息 可启动独立task - } else { + // 从分布式群组移除 + distributedSessionService.removeUserFromGroup(zid, session.getId()); } } else { sendVO.setMine(true); @@ -346,25 +189,33 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { session.close(); } logger.debug("websocket connection closed......"); - users.remove(session); - userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session); - - if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) { - groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session); - } - - this.updateOnlineStatus(); - + cleanupSession(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); - users.remove(session); - userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session); + cleanupSession(session); + } - if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) { - groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session); + /** + * 清理会话资源 + */ + private void cleanupSession(WebSocketSession session) { + String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); + String sessionId = session.getId(); + String groupId = (String) session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY); + + // 从本地管理移除 + localSessionManager.removeUserSession(loginUserId, session); + if (groupId != null) { + localSessionManager.removeGroupSession(groupId, session); + } + + // 从分布式存储注销 + distributedSessionService.unregisterUserSession(loginUserId, sessionId); + if (groupId != null) { + distributedSessionService.removeUserFromGroup(groupId, sessionId); } this.updateOnlineStatus(); @@ -376,64 +227,104 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { } /** - * 给所有在线用户发送消息 - * - * @param message + * 给某个用户发送消息 (兼容原有接口) */ - public void sendMessageToAllUsers(List message) { + public boolean sendMessageToUser(String loginName, SendVO message) { + return distributedMessageService.sendToUser(loginName, message); + } - SendVO msg = new SendVO(); - msg.setContent(message.toString()); - msg.setType("online"); - msg.setSendmethod(msg.getType()); + /** + * 获取所有在线用户登录名 + */ + public List getOnlineLoginNames() { + // 使用分布式服务获取所有在线用户登录名 + return distributedSessionService.getOnlineLoginNames(); + } - //不能这么操作 - for (WebSocketSession user : users) { - try { - if (user.isOpen()) { - user.sendMessage(new TextMessage(msg.toString())); + /** + * 获取所有在线用户ID + */ + public List getOnlineLoginUserId() { + // 使用分布式服务获取所有在线用户ID + return distributedSessionService.getAllOnlineUserIds(); + } + + /** + * 获取当前服务器的在线用户ID + */ + public List getLocalOnlineLoginUserId() { + // 使用本地会话管理器获取当前服务器的在线用户ID + return localSessionManager.getLocalOnlineUserIds(); + } + + /** + * 获取当前服务器的在线用户登录名 + */ + public List getLocalOnlineLoginNames() { + // 使用本地会话管理器获取当前服务器的在线用户登录名 + return localSessionManager.getLocalOnlineLoginNames(); + } + + /** + * 更新在线状态 - 通知所有用户在线列表变化 + */ + public void updateOnlineStatus() { + try { + // 创建在线状态消息 + SendVO onlineStatusMsg = new SendVO(); + onlineStatusMsg.setType("online_status"); + onlineStatusMsg.setSendmethod("broadcast"); + + // 获取在线用户列表 + List onlineUsers = getOnlineLoginNames(); + List onlineUserIds = getOnlineLoginUserId(); + + Map data = new HashMap<>(); + data.put("onlineUsers", onlineUsers); + data.put("onlineUserIds", onlineUserIds); + data.put("timestamp", System.currentTimeMillis()); + + onlineStatusMsg.setMsg(data); + + // 广播在线状态更新(只广播到当前服务器,避免循环广播) + broadcastLocalOnlineStatus(onlineStatusMsg); + + logger.debug("在线状态更新: {} 个用户在线", onlineUsers.size()); + } catch (Exception e) { + logger.error("更新在线状态失败: " + e.getMessage(), e); + } + } + + /** + * 广播在线状态到本地用户 + */ + private void broadcastLocalOnlineStatus(SendVO message) { + // 获取本地所有用户会话 + for (List sessions : localSessionManager.userSessions.values()) { + for (WebSocketSession session : sessions) { + if (session.isOpen()) { + try { + session.sendMessage(new TextMessage(message.toString())); + } catch (IOException e) { + logger.error("发送在线状态消息失败: " + e.getMessage(), e); + } } - } catch (IOException e) { - logger.error("给所有在线用户发送信息失败!" + e.getMessage(), e); } } } /** - * 给某个用户发送消息 - * - * @param loginName - * @param message + * 给所有在线用户发送消息 */ - public boolean sendMessageToUser(String loginName, SendVO message) { - boolean result = false; + public void sendMessageToAllUsers(SendVO message) { + // 获取所有在线用户ID + List onlineUserIds = getOnlineLoginUserId(); - //不能这样操作。 - //for (WebSocketSession user : users) { - List lst = userSession.get(message.getToid()); - - if (CollUtil.isNotEmpty(lst)) { - for (WebSocketSession user : lst) { - //if (user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).equals(loginName)) {//允许用户多个浏览器登录,每个浏览器页面都会收到用户信息 - try { - if (user.isOpen()) { - user.sendMessage(new TextMessage(message.toString())); - result = true; - } - } catch (IOException e) { - logger.error("给用户发送信息失败!" + e.getMessage(), e); - } - - //break;//注释掉此处意味着遍历该用户打开的所有页面并发送信息,否则只会向用户登录的第一个网页发送信息。 - //} - } + for (Integer userId : onlineUserIds) { + String userIdStr = String.valueOf(userId); + distributedMessageService.sendToUser(userIdStr, message); } - return result; + logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size()); } - - public void updateOnlineStatus() { - //this.sendMessageToAllUsers(this.getOnlineLoginNames());//通知所有用户更新在线信息 - } - } \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/utils/ServerIdGenerator.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/utils/ServerIdGenerator.java new file mode 100644 index 00000000..60e3a38e --- /dev/null +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/utils/ServerIdGenerator.java @@ -0,0 +1,153 @@ +package com.suisung.mall.im.common.websocket.utils; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.UUID; + +/** + * 服务器ID生成工具类 + */ +public class ServerIdGenerator { + + private static volatile String cachedServerId; + + /** + * 生成服务器唯一标识 + */ + public static String generateServerId() { + if (cachedServerId != null) { + return cachedServerId; + } + + synchronized (ServerIdGenerator.class) { + if (cachedServerId != null) { + return cachedServerId; + } + + // 尝试多种方式生成稳定的服务器ID + String serverId = generateStableServerId(); + cachedServerId = serverId; + return serverId; + } + } + + /** + * 生成稳定的服务器ID(基于机器标识) + */ + private static String generateStableServerId() { + StringBuilder serverId = new StringBuilder("im-server-"); + + try { + // 1. 使用MAC地址(最稳定) + String macAddress = getPhysicalMacAddress(); + if (!"UNKNOWN-MAC".equals(macAddress)) { + // 取MAC地址的后6位作为标识 + String macSuffix = macAddress.replace(":", "").replace("-", ""); + if (macSuffix.length() >= 6) { + serverId.append(macSuffix.substring(macSuffix.length() - 6).toLowerCase()); + return serverId.toString(); + } + } + + // 2. 使用IP地址 + String ipSuffix = getLocalIpSuffix(); + if (!"UNKNOWN-IP".equals(ipSuffix)) { + serverId.append(ipSuffix); + return serverId.toString(); + } + + // 3. 使用主机名哈希 + String hostname = InetAddress.getLocalHost().getHostName(); + int hostnameHash = Math.abs(hostname.hashCode()); + serverId.append(String.format("%06x", hostnameHash & 0xFFFFFF)); + return serverId.toString(); + + } catch (Exception e) { + // 4. 最终备选:UUID + String uuidSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 6); + serverId.append(uuidSuffix); + return serverId.toString(); + } + } + + /** + * 获取物理MAC地址 + */ + private static String getPhysicalMacAddress() { + try { + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface networkInterface = interfaces.nextElement(); + + // 排除回环、虚拟、未启用的接口 + if (networkInterface.isLoopback() || + networkInterface.isVirtual() || + !networkInterface.isUp()) { + continue; + } + + // 排除常见的虚拟接口名称 + String name = networkInterface.getName().toLowerCase(); + if (name.contains("virtual") || name.contains("vmware") || + name.contains("vbox") || name.contains("docker") || + name.contains("wsl")) { + continue; + } + + byte[] mac = networkInterface.getHardwareAddress(); + if (mac != null && mac.length > 0) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < mac.length; i++) { + sb.append(String.format("%02X", mac[i])); + } + return sb.toString(); + } + } + } catch (SocketException e) { + // 忽略异常 + } + return "UNKNOWN-MAC"; + } + + /** + * 获取本地IP地址后缀 + */ + private static String getLocalIpSuffix() { + try { + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface networkInterface = interfaces.nextElement(); + if (networkInterface.isLoopback() || !networkInterface.isUp()) { + continue; + } + + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress address = addresses.nextElement(); + if (address.isSiteLocalAddress()) { + String ip = address.getHostAddress(); + // 取IP地址的最后一段 + String[] segments = ip.split("\\."); + if (segments.length == 4) { + return segments[3]; + } + } + } + } + } catch (SocketException e) { + // 忽略异常 + } + return "UNKNOWN-IP"; + } + + /** + * 生成带时间戳的服务器ID(适合临时使用) + */ + public static String generateTimestampServerId() { + String baseId = generateServerId(); + long timestamp = System.currentTimeMillis() % 1000000; // 取后6位 + return baseId + "-" + String.format("%06d", timestamp); + } +} \ No newline at end of file diff --git a/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java b/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java index e11857eb..5fd72d69 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java +++ b/mall-im/src/main/java/com/suisung/mall/im/controller/admin/ChatSocketInfoController.java @@ -40,7 +40,7 @@ public class ChatSocketInfoController { @RequestMapping(value = "/getUserOnline", method = RequestMethod.POST) public List getUserOnline(@RequestBody List user_ids) { logger.info(I18nUtil._("接收的用户ids:") + CollUtil.join(user_ids, ",")); - ArrayList onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId(); + List onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId(); Iterator online_user_ids_iter = onlineLoginUserIds.iterator(); // 处理移除非本店铺客服 while (online_user_ids_iter.hasNext()) { diff --git a/mall-im/src/main/resources/bootstrap-dev.yml b/mall-im/src/main/resources/bootstrap-dev.yml index d42ee698..5fa7731e 100644 --- a/mall-im/src/main/resources/bootstrap-dev.yml +++ b/mall-im/src/main/resources/bootstrap-dev.yml @@ -37,6 +37,15 @@ spring: max-wait: -1ms max-idle: 64 min-idle: 0 + rabbitmq: + host: @rabbitmq.host@ + port: @rabbitmq.port@ + virtual-host: @rabbitmq.virtual-host@ + listener: + simple: + acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认 + username: @rabbitmq.username@ + password: @rabbitmq.password@ cloud: nacos: discovery: diff --git a/mall-im/src/main/resources/bootstrap-local.yml b/mall-im/src/main/resources/bootstrap-local.yml index d42ee698..5fa7731e 100644 --- a/mall-im/src/main/resources/bootstrap-local.yml +++ b/mall-im/src/main/resources/bootstrap-local.yml @@ -37,6 +37,15 @@ spring: max-wait: -1ms max-idle: 64 min-idle: 0 + rabbitmq: + host: @rabbitmq.host@ + port: @rabbitmq.port@ + virtual-host: @rabbitmq.virtual-host@ + listener: + simple: + acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认 + username: @rabbitmq.username@ + password: @rabbitmq.password@ cloud: nacos: discovery: diff --git a/mall-im/src/main/resources/bootstrap-prod.yml b/mall-im/src/main/resources/bootstrap-prod.yml index 429d947b..005bc298 100644 --- a/mall-im/src/main/resources/bootstrap-prod.yml +++ b/mall-im/src/main/resources/bootstrap-prod.yml @@ -39,6 +39,15 @@ spring: max-wait: -1ms max-idle: 64 min-idle: 0 + rabbitmq: + host: @rabbitmq.host@ + port: @rabbitmq.port@ + virtual-host: @rabbitmq.virtual-host@ + listener: + simple: + acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认 + username: @rabbitmq.username@ + password: @rabbitmq.password@ cloud: nacos: discovery: diff --git a/mall-im/src/main/resources/bootstrap-uat.yml b/mall-im/src/main/resources/bootstrap-uat.yml index d42ee698..5fa7731e 100644 --- a/mall-im/src/main/resources/bootstrap-uat.yml +++ b/mall-im/src/main/resources/bootstrap-uat.yml @@ -37,6 +37,15 @@ spring: max-wait: -1ms max-idle: 64 min-idle: 0 + rabbitmq: + host: @rabbitmq.host@ + port: @rabbitmq.port@ + virtual-host: @rabbitmq.virtual-host@ + listener: + simple: + acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认 + username: @rabbitmq.username@ + password: @rabbitmq.password@ cloud: nacos: discovery: