im服务新增分布式rabbimq解决方案

This commit is contained in:
liyj 2025-11-13 17:18:53 +08:00
parent ff7acb1b73
commit c8857caadf
13 changed files with 1354 additions and 284 deletions

View File

@ -0,0 +1,84 @@
package com.suisung.mall.im.common.websocket.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
private static Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
// 直连交换机 - 用于精确路由
public static final String IM_DIRECT_EXCHANGE = "im.direct.exchange";
// 广播交换机 - 用于广播消息
public static final String IM_FANOUT_EXCHANGE = "im.fanout.exchange";
// 队列前缀
public static final String IM_QUEUE_PREFIX = "im.queue.";
// 路由键前缀
public static final String IM_ROUTING_KEY_PREFIX = "im.server.";
/**
* 创建直连交换机 - 用于点对点消息
*/
@Bean
public DirectExchange imDirectExchange() {
return new DirectExchange(IM_DIRECT_EXCHANGE, true, false);
}
/**
* 创建广播交换机 - 用于广播消息
*/
@Bean
public FanoutExchange imFanoutExchange() {
return new FanoutExchange(IM_FANOUT_EXCHANGE, true, false);
}
/**
* JSON消息转换器 - 支持SendVO等复杂对象
*/
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 配置RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
// 设置确认回调
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
logger.info("消息发送失败: {}", cause);
}
});
return template;
}
/**
* 配置监听器容器工厂
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(1); // 每次处理1条消息
return factory;
}
}

View File

@ -0,0 +1,125 @@
package com.suisung.mall.im.common.websocket.service;
import com.suisung.mall.im.pojo.vo.SendVO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.IM_FANOUT_EXCHANGE;
@Service
public class DistributedMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DistributedSessionService sessionService;
@Autowired
private LocalSessionManager localSessionManager;
@Autowired
private RabbitMQManager rabbitMQManager;
/**
* 发送消息给用户
*/
public boolean sendToUser(String targetUserId, SendVO message) {
String targetServer = sessionService.getUserServer(targetUserId);
if (targetServer == null) {
// 用户离线可以存储离线消息
return false;
}
if (isCurrentServer(targetServer)) {
// 用户在当前服务器直接发送
return localSessionManager.sendToUser(targetUserId, message);
} else {
// 用户在其他服务器通过RabbitMQ转发
forwardToUser(targetServer, targetUserId, message);
return true;
}
}
/**
* 发送消息给群组
*/
public void sendToGroup(String groupId, SendVO message, String excludeSessionId) {
// 获取群组所有成员
List<Map<String, Object>> members = sessionService.getGroupMembers(groupId);
for (Map<String, Object> member : members) {
String userId = (String) member.get("userId");
String sessionId = (String) member.get("sessionId");
String memberServer = (String) member.get("serverId");
// 排除发送者自己
if (sessionId.equals(excludeSessionId)) {
continue;
}
if (isCurrentServer(memberServer)) {
// 成员在当前服务器
localSessionManager.sendToSession(sessionId, message);
} else {
// 成员在其他服务器
forwardToSession(memberServer, sessionId, message);
}
}
}
/**
* 广播消息给所有在线用户
*/
public void broadcast(SendVO message, String excludeServerId) {
// 创建广播消息
Map<String, Object> broadcastMsg = new HashMap<>();
broadcastMsg.put("type", "BROADCAST");
broadcastMsg.put("message", message);
broadcastMsg.put("excludeServer", excludeServerId);
broadcastMsg.put("sourceServer", rabbitMQManager.getServerId());
// 广播到所有服务器这里简化处理实际应该获取所有服务器列表
// 可以通过Redis维护服务器列表然后遍历发送
rabbitTemplate.convertAndSend(IM_FANOUT_EXCHANGE, "im.server.all", broadcastMsg);
}
/**
* 转发消息到用户
*/
private void forwardToUser(String targetServer, String userId, SendVO message) {
Map<String, Object> forwardMsg = new HashMap<>();
forwardMsg.put("type", "SEND_TO_USER");
forwardMsg.put("targetUserId", userId);
forwardMsg.put("message", message);
forwardMsg.put("sourceServer", rabbitMQManager.getServerId());
String routingKey = rabbitMQManager.getTargetRoutingKey(targetServer);
rabbitTemplate.convertAndSend(IM_FANOUT_EXCHANGE, routingKey, forwardMsg);
}
/**
* 转发消息到会话
*/
private void forwardToSession(String targetServer, String sessionId, SendVO message) {
Map<String, Object> forwardMsg = new HashMap<>();
forwardMsg.put("type", "SEND_TO_SESSION");
forwardMsg.put("targetSessionId", sessionId);
forwardMsg.put("message", message);
forwardMsg.put("sourceServer", rabbitMQManager.getServerId());
String routingKey = rabbitMQManager.getTargetRoutingKey(targetServer);
rabbitTemplate.convertAndSend(IM_FANOUT_EXCHANGE, routingKey, forwardMsg);
}
private boolean isCurrentServer(String serverId) {
return serverId.equals(rabbitMQManager.getServerId());
}
}

View File

@ -0,0 +1,234 @@
package com.suisung.mall.im.common.websocket.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
@Service
public class DistributedSessionService {
private static final Logger log = LoggerFactory.getLogger(DistributedSessionService.class);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitMQManager rabbitMQManager; // 使用RabbitMQManager获取serverId
private static final String USER_SERVER_KEY = "im:user:server:";
private static final String USER_SESSIONS_KEY = "im:user:sessions:";
private static final String SERVER_USERS_KEY = "im:server:users:";
private static final String GROUP_SESSIONS_KEY = "im:group:sessions:";
/**
* 注册用户会话
*/
public void registerUserSession(String userId, String sessionId, Map<String, Object> attributes) {
String userKey = USER_SESSIONS_KEY + userId;
String serverKey = USER_SERVER_KEY + userId;
String serverUsersKey = SERVER_USERS_KEY + getServerId();
// 存储用户会话信息
Map<String, Object> sessionInfo = new HashMap<>();
sessionInfo.put("sessionId", sessionId);
sessionInfo.put("serverId", getServerId());
sessionInfo.put("connectTime", System.currentTimeMillis());
sessionInfo.put("lastHeartbeat", System.currentTimeMillis());
sessionInfo.put("attributes", attributes);
redisTemplate.opsForHash().put(userKey, sessionId, sessionInfo);
redisTemplate.expire(userKey, Duration.ofMinutes(30));
// 更新用户-服务器映射
redisTemplate.opsForValue().set(serverKey, getServerId(), Duration.ofMinutes(30));
// 添加到服务器用户集合
redisTemplate.opsForSet().add(serverUsersKey, userId);
redisTemplate.expire(serverUsersKey, Duration.ofMinutes(30));
log.info("用户会话注册: userId={}, sessionId={}, serverId={}", userId, sessionId, getServerId());
}
/**
* 注销用户会话
*/
public void unregisterUserSession(String userId, String sessionId) {
String userKey = USER_SESSIONS_KEY + userId;
String serverUsersKey = SERVER_USERS_KEY + getServerId();
// 从用户会话中移除
redisTemplate.opsForHash().delete(userKey, sessionId);
// 从服务器用户集合中移除
redisTemplate.opsForSet().remove(serverUsersKey, userId);
// 检查用户是否还有活跃会话
Long size = redisTemplate.opsForHash().size(userKey);
if (size == null || size == 0) {
String serverKey = USER_SERVER_KEY + userId;
redisTemplate.delete(serverKey);
}
}
/**
* 获取用户所在服务器
*/
public String getUserServer(String userId) {
String key = USER_SERVER_KEY + userId;
return (String) redisTemplate.opsForValue().get(key);
}
/**
* 获取用户所有会话ID
*/
public Set<String> getUserSessions(String userId) {
String key = USER_SESSIONS_KEY + userId;
return redisTemplate.opsForHash().keys(key).stream()
.map(Object::toString)
.collect(HashSet::new, HashSet::add, HashSet::addAll);
}
/**
* 添加用户到群组
*/
public void addUserToGroup(String groupId, String userId, String sessionId) {
String groupKey = GROUP_SESSIONS_KEY + groupId;
Map<String, Object> memberInfo = new HashMap<>();
memberInfo.put("userId", userId);
memberInfo.put("sessionId", sessionId);
memberInfo.put("serverId", getServerId());
memberInfo.put("joinTime", System.currentTimeMillis());
redisTemplate.opsForHash().put(groupKey, sessionId, memberInfo);
redisTemplate.expire(groupKey, Duration.ofHours(2));
}
/**
* 从群组移除用户
*/
public void removeUserFromGroup(String groupId, String sessionId) {
String groupKey = GROUP_SESSIONS_KEY + groupId;
redisTemplate.opsForHash().delete(groupKey, sessionId);
}
/**
* 获取群组所有成员
*/
public List<Map<String, Object>> getGroupMembers(String groupId) {
String groupKey = GROUP_SESSIONS_KEY + groupId;
return redisTemplate.opsForHash().values(groupKey).stream()
.map(obj -> (Map<String, Object>) obj)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
}
/**
* 更新心跳
*/
public void updateHeartbeat(String userId, String sessionId) {
String userKey = USER_SESSIONS_KEY + userId;
Map<String, Object> sessionInfo = (Map<String, Object>) redisTemplate.opsForHash().get(userKey, sessionId);
if (sessionInfo != null) {
sessionInfo.put("lastHeartbeat", System.currentTimeMillis());
redisTemplate.opsForHash().put(userKey, sessionId, sessionInfo);
redisTemplate.expire(userKey, Duration.ofMinutes(30));
}
}
/**
* 获取所有在线用户ID列表
*/
public List<Integer> getOnlineUserIds() {
// 方法1: 通过服务器用户集合获取推荐
String serverUsersKey = SERVER_USERS_KEY + new RabbitMQManager().getServerId();
Set<Object> userIds = redisTemplate.opsForSet().members(serverUsersKey);
if (userIds != null && !userIds.isEmpty()) {
return userIds.stream()
.map(userIdStr -> {
try {
return Integer.parseInt(userIdStr.toString());
} catch (NumberFormatException e) {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
return new ArrayList<>();
}
/**
* 获取所有服务器的在线用户ID列表全局
*/
public List<Integer> getAllOnlineUserIds() {
// 获取所有服务器的用户集合
Set<String> serverKeys = redisTemplate.keys(SERVER_USERS_KEY + "*");
List<Integer> allUserIds = new ArrayList<>();
if (serverKeys != null) {
for (String serverKey : serverKeys) {
Set<Object> userIds = redisTemplate.opsForSet().members(serverKey);
if (userIds != null) {
userIds.forEach(userIdStr -> {
try {
allUserIds.add(Integer.parseInt(userIdStr.toString()));
} catch (NumberFormatException e) {
// 忽略格式错误的用户ID
}
});
}
}
}
// 去重
return allUserIds.stream().distinct().collect(Collectors.toList());
}
/**
* 获取在线用户登录名列表
*/
public List<String> getOnlineLoginNames() {
List<String> loginNames = new ArrayList<>();
List<Integer> userIds = getOnlineUserIds();
// 这里可以根据用户ID获取对应的登录名
// 实际实现可能需要调用用户服务
for (Integer userId : userIds) {
// 模拟获取登录名实际应该从用户服务或缓存中获取
String loginName = "user_" + userId;
loginNames.add(loginName);
}
return loginNames;
}
/**
* 获取指定用户的会话信息
*/
public List<Map<String, Object>> getUserSessionsInfo(String userId) {
String userKey = USER_SESSIONS_KEY + userId;
Map<Object, Object> sessions = redisTemplate.opsForHash().entries(userKey);
return sessions.values().stream()
.map(obj -> (Map<String, Object>) obj)
.collect(Collectors.toList());
}
/**
* 检查用户是否在线
*/
public boolean isUserOnline(String userId) {
String userKey = USER_SESSIONS_KEY + userId;
Long size = redisTemplate.opsForHash().size(userKey);
return size != null && size > 0;
}
private String getServerId() {
return rabbitMQManager.getServerId();
}
}

View File

@ -0,0 +1,144 @@
package com.suisung.mall.im.common.websocket.service;
import com.suisung.mall.im.pojo.vo.SendVO;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
public class LocalSessionManager {
// 本地存储的用户会话 (userId -> sessions)
public final ConcurrentHashMap<String, List<WebSocketSession>> userSessions = new ConcurrentHashMap<>();
// 本地存储的群组会话 (groupId -> sessions)
private final ConcurrentHashMap<String, List<WebSocketSession>> groupSessions = new ConcurrentHashMap<>();
/**
* 添加用户会话
*/
public void addUserSession(String userId, WebSocketSession session) {
userSessions.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>())
.add(session);
}
/**
* 移除用户会话
*/
public void removeUserSession(String userId, WebSocketSession session) {
List<WebSocketSession> sessions = userSessions.get(userId);
if (sessions != null) {
sessions.remove(session);
if (sessions.isEmpty()) {
userSessions.remove(userId);
}
}
}
/**
* 发送消息给用户
*/
public boolean sendToUser(String userId, SendVO message) {
List<WebSocketSession> sessions = userSessions.get(userId);
if (sessions != null && !sessions.isEmpty()) {
boolean success = false;
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(message.toString()));
success = true;
} catch (IOException e) {
// 发送失败移除会话
removeUserSession(userId, session);
}
}
}
return success;
}
return false;
}
/**
* 发送消息给会话
*/
public boolean sendToSession(String sessionId, SendVO message) {
// 遍历所有会话找到指定的sessionId
for (List<WebSocketSession> sessions : userSessions.values()) {
for (WebSocketSession session : sessions) {
if (session.getId().equals(sessionId) && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message.toString()));
return true;
} catch (IOException e) {
return false;
}
}
}
}
return false;
}
/**
* 添加群组会话
*/
public void addGroupSession(String groupId, WebSocketSession session) {
groupSessions.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>())
.add(session);
}
/**
* 移除群组会话
*/
public void removeGroupSession(String groupId, WebSocketSession session) {
List<WebSocketSession> sessions = groupSessions.get(groupId);
if (sessions != null) {
sessions.remove(session);
if (sessions.isEmpty()) {
groupSessions.remove(groupId);
}
}
}
/**
* 获取本地所有在线用户ID
*/
public List<Integer> getLocalOnlineUserIds() {
List<Integer> userIds = new ArrayList<>();
for (String userIdStr : userSessions.keySet()) {
try {
userIds.add(Integer.parseInt(userIdStr));
} catch (NumberFormatException e) {
// 忽略格式错误的用户ID
}
}
return userIds;
}
/**
* 获取本地所有在线用户登录名
*/
public List<String> getLocalOnlineLoginNames() {
return new ArrayList<>(userSessions.keySet());
}
/**
* 获取本地会话数量统计
*/
public Map<String, Object> getLocalSessionStats() {
Map<String, Object> stats = new HashMap<>();
stats.put("totalUsers", userSessions.size());
stats.put("totalSessions", userSessions.values().stream().mapToInt(List::size).sum());
stats.put("totalGroups", groupSessions.size());
return stats;
}
}

View File

@ -0,0 +1,233 @@
package com.suisung.mall.im.common.websocket.service;
import com.suisung.mall.im.pojo.vo.SendVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class MessageConsumerService {
private static Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);
@Autowired
private LocalSessionManager localSessionManager;
@Autowired
private RabbitMQManager rabbitMQManager;
/**
* 监听当前服务器的队列
* 使用 RabbitListener 监听动态队列
*/
@RabbitListener(queues = "#{rabbitMQManager.queueName}")
public void consumeForwardMessage(Map<String, Object> message) {
try {
String type = (String) message.get("type");
System.out.println("收到RabbitMQ消息, 类型: " + type + ", 服务器: " + rabbitMQManager.getServerId());
switch (type) {
case "SEND_TO_USER":
handleSendToUser(message);
break;
case "SEND_TO_SESSION":
handleSendToSession(message);
break;
case "BROADCAST":
handleBroadcast(message);
break;
default:
logger.info("未知消息类型: {}", type);
}
} catch (Exception e) {
logger.info("处理RabbitMQ消息失败: {}", e.getMessage());
e.printStackTrace();
}
}
/**
* 处理发送给用户的消息
*/
private void handleSendToUser(Map<String, Object> message) {
String userId = (String) message.get("targetUserId");
Object messageObj = message.get("message");
SendVO userMsg = null;
if (messageObj instanceof SendVO) {
userMsg = (SendVO) messageObj;
} else if (messageObj instanceof Map) {
userMsg = convertMapToSendVO((Map<String, Object>) messageObj);
}
if (userMsg != null) {
boolean success = localSessionManager.sendToUser(userId, userMsg);
logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败");
} else {
logger.info("消息格式错误,无法发送给用户: {}", userId);
}
}
/**
* 处理发送给会话的消息
*/
private void handleSendToSession(Map<String, Object> message) {
String sessionId = (String) message.get("targetSessionId");
Object messageObj = message.get("message");
SendVO sessionMsg = null;
if (messageObj instanceof SendVO) {
sessionMsg = (SendVO) messageObj;
} else if (messageObj instanceof Map) {
sessionMsg = convertMapToSendVO((Map<String, Object>) messageObj);
}
if (sessionMsg != null) {
boolean success = localSessionManager.sendToSession(sessionId, sessionMsg);
logger.info("发送消息给会话 {}: {}", sessionId, success ? "成功" : "失败");
} else {
logger.info("消息格式错误,无法发送给会话: {}", sessionId);
}
}
/**
* 处理广播消息
*/
private void handleBroadcast(Map<String, Object> message) {
String excludeServer = (String) message.get("excludeServer");
String currentServer = rabbitMQManager.getServerId();
// 如果当前服务器不在排除列表中则处理广播
if (!currentServer.equals(excludeServer)) {
Object messageObj = message.get("message");
SendVO broadcastMsg = null;
if (messageObj instanceof SendVO) {
broadcastMsg = (SendVO) messageObj;
} else if (messageObj instanceof Map) {
broadcastMsg = convertMapToSendVO((Map<String, Object>) messageObj);
}
if (broadcastMsg != null) {
// 实现广播给所有本地用户
broadcastToAllLocalUsers(broadcastMsg);
logger.info("广播消息给所有本地用户");
}
} else {
logger.info("当前服务器被排除在广播之外: {}", currentServer);
}
}
/**
* 广播给所有本地用户
*/
private void broadcastToAllLocalUsers(SendVO message) {
// 这里可以实现具体的广播逻辑
// 例如遍历所有本地会话并发送消息
logger.info("执行广播逻辑,消息内容: {}", message.getContent());
}
/**
* 将Map转换为SendVO
*/
private SendVO convertMapToSendVO(Map<String, Object> messageMap) {
try {
SendVO sendVO = new SendVO();
// 设置基本字段
if (messageMap.get("toid") != null) {
sendVO.setToid((String) messageMap.get("toid"));
}
if (messageMap.get("id") != null) {
sendVO.setId((String) messageMap.get("id"));
}
if (messageMap.get("content") != null) {
sendVO.setContent((String) messageMap.get("content"));
}
if (messageMap.get("type") != null) {
sendVO.setType((String) messageMap.get("type"));
}
if (messageMap.get("username") != null) {
sendVO.setUsername((String) messageMap.get("username"));
}
if (messageMap.get("avatar") != null) {
sendVO.setAvatar((String) messageMap.get("avatar"));
}
if (messageMap.get("fromid") != null) {
sendVO.setFromid((String) messageMap.get("fromid"));
}
if (messageMap.get("time") != null) {
sendVO.setTime((Long) messageMap.get("time"));
}
if (messageMap.get("status") != null) {
sendVO.setStatus((Integer) messageMap.get("status"));
}
if (messageMap.get("msg_type") != null) {
sendVO.setMsg_type((String) messageMap.get("msg_type"));
}
if (messageMap.get("item_id") != null) {
sendVO.setItem_id(Long.valueOf((String) messageMap.get("item_id")));
}
if (messageMap.get("message_length") != null) {
sendVO.setMessage_length((String) messageMap.get("message_length"));
}
if (messageMap.get("zid") != null) {
sendVO.setZid((String) messageMap.get("zid"));
}
if (messageMap.get("user_id") != null) {
sendVO.setUser_id((Integer) messageMap.get("user_id"));
}
if (messageMap.get("friend_id") != null) {
sendVO.setFriend_id((Integer) messageMap.get("friend_id"));
}
if (messageMap.get("sendmethod") != null) {
sendVO.setSendmethod((String) messageMap.get("sendmethod"));
}
if (messageMap.get("message_id") != null) {
sendVO.setMessage_id(Integer.valueOf((String) messageMap.get("message_id")));
}
if (messageMap.get("mine") != null) {
sendVO.setMine((Boolean) messageMap.get("mine"));
}
return sendVO;
} catch (Exception e) {
logger.info("Map转换为SendVO失败: {}", e.getMessage());
return null;
}
}
/**
* 处理连接建立事件可选用于服务器间同步
*/
@RabbitListener(queues = "#{rabbitMQManager.queueName}")
public void handleSessionEvent(Map<String, Object> event) {
String eventType = (String) event.get("eventType");
String userId = (String) event.get("userId");
String sessionId = (String) event.get("sessionId");
logger.info("收到会话事件: {}, 用户: {}, 会话: {}", eventType, userId, sessionId);
// 可以根据需要处理各种会话事件
switch (eventType) {
case "SESSION_CREATED":
// 处理会话创建事件
break;
case "SESSION_CLOSED":
// 处理会话关闭事件
break;
}
}
/**
* 监听广播队列
*/
@RabbitListener(queues = "im.broadcast.queue.#{rabbitMQManager.serverId}")
public void consumeBroadcastMessage(Map<String, Object> message) {
handleBroadcast(message);
}
}

View File

@ -0,0 +1,170 @@
package com.suisung.mall.im.common.websocket.service;
import com.suisung.mall.im.common.websocket.service.onchat.MallsuiteImSocketHandler;
import com.suisung.mall.im.common.websocket.utils.ServerIdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.*;
@Service
public class RabbitMQManager {
private static Logger logger = LoggerFactory.getLogger(RabbitMQManager.class);
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private DirectExchange imDirectExchange;
@Autowired
private FanoutExchange imFanoutExchange;
@Value("${im.server.id:#{null}}") // 可选配置优先使用配置的ID
private String configuredServerId;
private String serverId;
private String queueName;
private String routingKey;
private String broadcastQueueName;
@PostConstruct
public void init() {
// 确定服务器ID优先使用配置否则自动生成
this.serverId = determineServerId();
this.queueName = IM_QUEUE_PREFIX + serverId;
this.routingKey = IM_ROUTING_KEY_PREFIX + serverId;
this.broadcastQueueName = "im.broadcast.queue." + serverId;
// 创建点对点队列
Queue queue = new Queue(queueName, true, false, false);
amqpAdmin.declareQueue(queue);
// 绑定到直连交换机
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,
IM_DIRECT_EXCHANGE, routingKey, null);
amqpAdmin.declareBinding(binding);
// 创建广播队列
Queue broadcastQueue = new Queue(broadcastQueueName, true, false, false);
amqpAdmin.declareQueue(broadcastQueue);
// 绑定广播队列到广播交换机
Binding broadcastBinding = new Binding(broadcastQueueName,
Binding.DestinationType.QUEUE,
IM_FANOUT_EXCHANGE,
"",
null);
amqpAdmin.declareBinding(broadcastBinding);
logger.info("=== RabbitMQ队列初始化完成 ===");
logger.info("服务器ID: {}", serverId);
logger.info("点对点队列: {}", queueName);
logger.info("路由键: {}", routingKey);
logger.info("广播队列: {}", broadcastQueueName);
logger.info("============================");
}
/**
* 确定服务器ID
*/
private String determineServerId() {
// 1. 优先使用配置的服务器ID
if (configuredServerId != null && !configuredServerId.trim().isEmpty()) {
return configuredServerId.trim();
}
// 2. 使用自动生成的稳定服务器ID
return ServerIdGenerator.generateServerId();
}
public String getServerId() {
return serverId;
}
public String getQueueName() {
return queueName;
}
public String getRoutingKey() {
return routingKey;
}
/**
* 获取目标服务器的路由键
*/
public String getTargetRoutingKey(String targetServerId) {
return IM_ROUTING_KEY_PREFIX + targetServerId;
}
/**
* 获取服务器信息用于监控
*/
public ServerInfo getServerInfo() {
ServerInfo info = new ServerInfo();
info.setServerId(serverId);
info.setQueueName(queueName);
info.setRoutingKey(routingKey);
info.setBroadcastQueueName(broadcastQueueName);
info.setStartTime(System.currentTimeMillis());
return info;
}
/**
* 检查队列是否存在
*/
public boolean isQueueExists(String queueName) {
try {
Queue queue = new Queue(queueName);
amqpAdmin.declareQueue(queue);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 初始化广播队列
*/
private void initBroadcastQueue() {
// 为当前服务器创建广播队列
String broadcastQueueName = "im.broadcast.queue." + serverId;
Queue broadcastQueue = new Queue(broadcastQueueName, true, false, false);
amqpAdmin.declareQueue(broadcastQueue);
// 绑定到广播交换机
Binding broadcastBinding = BindingBuilder.bind(broadcastQueue)
.to(imFanoutExchange);
amqpAdmin.declareBinding(broadcastBinding);
System.out.println("广播队列初始化完成: " + broadcastQueueName);
}
/**
* 服务器信息类
*/
public static class ServerInfo {
private String serverId;
private String queueName;
private String routingKey;
private String broadcastQueueName;
private long startTime;
// getter和setter
public String getServerId() { return serverId; }
public void setServerId(String serverId) { this.serverId = serverId; }
public String getQueueName() { return queueName; }
public void setQueueName(String queueName) { this.queueName = queueName; }
public String getRoutingKey() { return routingKey; }
public void setRoutingKey(String routingKey) { this.routingKey = routingKey; }
public String getBroadcastQueueName() { return broadcastQueueName; }
public void setBroadcastQueueName(String broadcastQueueName) { this.broadcastQueueName = broadcastQueueName; }
public long getStartTime() { return startTime; }
public void setStartTime(long startTime) { this.startTime = startTime; }
}
}

View File

@ -6,6 +6,9 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONUtil;
import com.suisung.mall.common.feignService.AccountService;
import com.suisung.mall.common.utils.CheckUtil;
import com.suisung.mall.im.common.websocket.service.DistributedMessageService;
import com.suisung.mall.im.common.websocket.service.DistributedSessionService;
import com.suisung.mall.im.common.websocket.service.LocalSessionManager;
import com.suisung.mall.im.common.websocket.utils.Constants;
import com.suisung.mall.im.pojo.entity.ChatHistory;
import com.suisung.mall.im.pojo.vo.MineDTO;
@ -19,6 +22,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.*;
@ -26,51 +30,27 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
private static Logger logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class);
private static ArrayList<WebSocketSession> users;
private static ArrayList<String> usersStr;
// 移除原有的静态集合使用注入的服务
// private static ArrayList<WebSocketSession> users;
// private static ArrayList<String> usersStr;
// private static Map<String, List<WebSocketSession>> userSession;
// private static Map<String, List<WebSocketSession>> groupSession;
//存入用户的所有终端的连接信息
private static Map<String, List<WebSocketSession>> userSession;
@Autowired
private DistributedSessionService distributedSessionService;
//群组信息
private static Map<String, List<WebSocketSession>> groupSession;
@Autowired
private DistributedMessageService distributedMessageService;
static {
users = new ArrayList<>();
usersStr = new ArrayList<>();
userSession = Collections.synchronizedMap(new HashMap<>());
groupSession = Collections.synchronizedMap(new HashMap<>());
logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class);
}
public ArrayList<String> getOnlineLoginNames() {
ArrayList<String> onlineLoginNames = new ArrayList<String>();
for (WebSocketSession user : users) {
String userName = (String) user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
if (userName != null) {
onlineLoginNames.add(userName);
}
}
return onlineLoginNames;
}
public ArrayList<Integer> getOnlineLoginUserId() {
ArrayList<Integer> onlineLoginUserId = new ArrayList<Integer>();
for (WebSocketSession user : users) {
Integer user_id = Convert.toInt(user.getAttributes().get("user_id"));
if (CheckUtil.isNotEmpty(user_id)) {
onlineLoginUserId.add(user_id);
}
}
return onlineLoginUserId;
}
@Autowired
private LocalSessionManager localSessionManager;
@Autowired
private ChatHistoryService chatHistoryService;
@Autowired
private LayGroupService layGroupService;
@Autowired
private AccountService accountService;
@ -78,99 +58,36 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.debug("connect to the websocket success......");
String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
String sessionId = session.getId();
users.add(session);
// 存储到本地会话管理
localSessionManager.addUserSession(loginUserId, session);
// 注册到分布式会话服务
Map<String, Object> attributes = new HashMap<>();
attributes.put("user_id", session.getAttributes().get("user_id"));
attributes.put("loginName", loginUserId);
distributedSessionService.registerUserSession(loginUserId, sessionId, attributes);
// 处理离线消息等原有逻辑...
this.updateOnlineStatus();
String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);//获取刚上线用户的loginName
List<WebSocketSession> loginSessions = userSession.get(loginUserId);
if (CollUtil.isEmpty(loginSessions)) {
if (null == loginSessions) {
loginSessions = new ArrayList<>();
}
loginSessions.add(session);
userSession.put(loginUserId, loginSessions);
} else {
if (!loginSessions.contains(session)) {
loginSessions.add(session);
}
}
if (loginUserId != null) {
SendVO msg = new SendVO();
//读取离线信息
ChatHistory chat = new ChatHistory();
chat.setReceiver(loginUserId);//发给刚上线的用户信息
chat.setStatus("0");
List<ChatHistory> list = chatHistoryService.findList(chat);
/*
for(ChatHistory c : list){
String sender="";
String receiver="";
if("friend".equals(c.getType()) || "user".equals(c.getType())){//如果是个人信息
sender = c.getSender(); //todo 确认数据库存的数据uid user_id 不一致
msg.setId(sender);
msg.setToid(c.getReceiver());
msg.setContent(c.getMsg());
//todu 缺少数据
//msg = sender+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_;
}else if(c.getType().contains("_group")){//如果是直播群组信息
msg.setType("group");
sender = c.getSender();
String groupId = c.getReceiver().split("_")[0];//直播群组id
receiver=c.getReceiver().split("_")[1];//直播群组所属user_id
msg = sender+Constants._msg_+groupId+Constants._msg_+c.getMsg()+Constants._msg_;
}else{//如果是群组信息
msg.setType("group");
String groupId = c.getSender().split(Constants._msg_)[0];//群组id
sender=c.getSender().split(Constants._msg_)[1];//发送者loginName
msg = groupId+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_;
}
//todo 应该冗余
//获取用户基本信息
AccountUserBase accountUserBase = accountService.getUserBase(Integer.valueOf(sender));
//AccountUserBase accountUserBase= result.getFenResult(AccountUserBase.class);
//AccountUserInfo userInfo=accountService.getUserInfo(accountUserBase.getUser_id());
AccountUserInfo userInfo=accountService.getUserInfo(Integer.valueOf(sender));
msg.setUser_id(Convert.toInt(session.getAttributes().get("user_id")));
msg.setUsername(accountUserBase.getUser_nickname());
msg.setAvatar(userInfo.getUser_avatar());
//msg.setMessage_id(c.getId());
msg.setFromid(msg.getId());
msg.setTime(c.getCreate_date().getTime());
msg.setStatus(0); //?
msg.setMsg(new HashMap());
boolean isSuccess = this.sendMessageToUser(loginUserId, msg);
if(isSuccess) {
c.setStatus("1");//标记为已读
chatHistoryService.saveNew(c);
}
}
*/
}
}
//接收js侧发送来的用户信息
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> socketMessage) throws Exception {
ReceiveDTO receiveDTO = JSONUtil.toBean(socketMessage.getPayload().toString(), ReceiveDTO.class);
if (receiveDTO != null) {//发送消息
if (receiveDTO != null) {
SendVO sendVO = new SendVO();
MineDTO mine = receiveDTO.getMine();
ToDTO to = receiveDTO.getTo();
// 设置sendVO的代码保持不变...
sendVO.setToid(to.getId());
sendVO.setZid(to.getZid()); //群组特有
sendVO.setZid(to.getZid());
sendVO.setId(mine.getId());
Integer toUserId = Convert.toInt(session.getAttributes().get("user_id"));
sendVO.setUser_id(toUserId);
@ -182,32 +99,33 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
to.setType("friend");
}
sendVO.setType(to.getType()); //是从to中读取Type
sendVO.setType(to.getType());
sendVO.setSendmethod(sendVO.getType());
sendVO.setContent(mine.getContent());
sendVO.setMessage_id(mine.getMessage_id());
sendVO.setFromid(mine.getId());
sendVO.setTime(new Date().getTime());
sendVO.setStatus(0); //?
sendVO.setStatus(0);
sendVO.setMsg_type(mine.getType());
sendVO.setItem_id(mine.getItem_id());
sendVO.setMsg(new HashMap());
sendVO.setMessage_length(mine.getLength());
String sender = mine.getId();//信息发送者登录名(loginName)或user_id
String receiver = to.getId();//信息接收者如果是私聊就是用户loginName如果是群聊就是群组id
String sender = mine.getId();
String receiver = to.getId();
String msg = mine.getContent();
String avatar = mine.getAvatar();
String type = to.getType();
String senderName = mine.getUsername();//发送者姓名(name)
String senderName = mine.getUsername();
// 更新心跳
distributedSessionService.updateHeartbeat(sender, session.getId());
if (!mine.getId().equals(to.getId())) {
sendVO.setMine(false);
//保存聊天记录
if ("friend".equals(type) || "user".equals(type)) {
//如果是私聊
// 私聊消息 - 使用分布式发送
ChatHistory chat = new ChatHistory();
chat.setId(IdUtil.simpleUUID());
chat.setSender(sender);
@ -215,124 +133,49 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
chat.setMsg(msg);
chat.setType("friend");
chat.setCreate_date(new Date());
boolean isSuccess = this.sendMessageToUser(receiver, sendVO);
if (isSuccess) {//如果信息发送成功
chat.setStatus("1");//设置为已读
boolean isSuccess = distributedMessageService.sendToUser(receiver, sendVO);
if (isSuccess) {
chat.setStatus("1");
} else {
// 发送离线消息提醒
sendVO.setToid(mine.getId());
sendVO.setContent(to.getName() + " 对方现在离线,他将在上线后收到你的消息!");
sendVO.setMsg_type("text");
sendVO.setMessage_length("0");
sendVO.setId(to.getId());
this.sendMessageToUser(sender, sendVO);//同时向本人发送对方不在线消息
chat.setStatus("0");//设置为未读
distributedMessageService.sendToUser(sender, sendVO);
chat.setStatus("0");
}
chatHistoryService.saveNew(chat);
} else if ("group".equals(type)) {//如果是群聊
// 临时不经过数据库
List<WebSocketSession> groupLoginSession = groupSession.get(to.getZid());
if (!CollUtil.isEmpty(groupLoginSession)) {
for (WebSocketSession gs : groupLoginSession) {
} else if ("group".equals(type)) {
// 群聊消息 - 使用分布式发送
String groupId = to.getZid();
distributedMessageService.sendToGroup(groupId, sendVO, session.getId());
if (sender.equals(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString())) {//群消息不发给自己但是存一条记录当做聊天记录查询
} else {
//设置接受用户
sendVO.setToid(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString());
boolean isSuccess = this.sendMessageToUser(to.getName(), sendVO);
if (isSuccess) {
} else {
}
}
//chatHistoryService.saveNew(chat);
}
}
/*
List<LayGroupUser> layGroupUserlist = new ArrayList();
//群主
LayGroupUser owner = new LayGroupUser();
LayGroup layGroup = layGroupService.get(receiver);
owner.setUser_id(layGroup.getCreate_by());
layGroupUserlist.add(owner);
//群成员1
List<LayGroupUser> zlist = layGroupService.getUsersByGroup(layGroupService.get(receiver));
layGroupUserlist.addAll(zlist);
for (LayGroupUser lgUser : layGroupUserlist) {
AccountUserBase user_base_row = accountService.getUserBase(Integer.valueOf(lgUser.getUser_id()));
ChatHistory chat = new ChatHistory();
chat.setId(IdUtil.simpleUUID());
//群聊时信息先发送给群聊id即receiver)在后台转发给所有非发送者(sender)的人的话群聊id即receiver)就变成发送者
String groupId = receiver;
//保存聊天记录
chat.setSender(groupId + Constants._msg_ + sender);//群聊时保存群聊id和发送者id
chat.setReceiver(user_base_row.getUser_account());//群中所有信息获得者人员
chat.setMsg(msg);
chat.setType("group");
//message = groupId + Constants._msg_ + user_base_row.getUser_account() + Constants._msg_ + msg + Constants._msg_ + avatar + Constants._msg_ + type + Constants._msg_ + senderName + Constants._msg_ + datatime;
if (sender.equals(user_base_row.getUser_account())) {//群消息不发给自己但是存一条记录当做聊天记录查询
chat.setStatus("1");//发送成功设置为已读
} else {
boolean isSuccess = this.sendMessageToUser(user_base_row.getUser_account(), sendVO);
if (isSuccess) {
chat.setStatus("1");//发送成功设置为已读
} else {
chat.setStatus("0");//用户不在线设置为未读
}
}
chatHistoryService.saveNew(chat);
}
*/
} else if ("join_group".equals(type)) { //临时群组聚焦当前窗口
} else if ("join_group".equals(type)) {
// 加入群组
String zid = to.getZid();
//设置session属性上标出所属群组 目前无同时多个群组需求如果存在此处存入list
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid);
//设置群组中用户
List<WebSocketSession> groupLoginSession = groupSession.get(zid);
// 添加到本地群组
localSessionManager.addGroupSession(zid, session);
if (CollUtil.isEmpty(groupLoginSession)) {
if (null == groupLoginSession) {
groupLoginSession = new ArrayList<>();
}
// 添加到分布式群组
String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
distributedSessionService.addUserToGroup(zid, userId, session.getId());
groupLoginSession.add(session);
groupSession.put(zid, groupLoginSession);
} else {
if (!groupLoginSession.contains(session)) {
groupLoginSession.add(session);
}
}
//todo通知已存在群组用户消息 可启动独立task
} else if ("leave_group".equals(type)) { //临时群组聚焦当前窗口
} else if ("leave_group".equals(type)) {
// 离开群组
String zid = to.getZid();
//设置session属性上标出所属群组
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, null);
//设置群组中用户
List<WebSocketSession> groupLoginSession = groupSession.get(zid);
// 从本地群组移除
localSessionManager.removeGroupSession(zid, session);
if (CollUtil.isEmpty(groupLoginSession)) {
} else {
if (groupLoginSession.contains(session)) {
groupLoginSession.remove(session);
}
}
//todo通知已存在群组用户消息 可启动独立task
} else {
// 从分布式群组移除
distributedSessionService.removeUserFromGroup(zid, session.getId());
}
} else {
sendVO.setMine(true);
@ -346,25 +189,33 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
session.close();
}
logger.debug("websocket connection closed......");
users.remove(session);
userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session);
if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) {
groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session);
}
this.updateOnlineStatus();
cleanupSession(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.debug("websocket connection closed......");
users.remove(session);
userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session);
cleanupSession(session);
}
if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) {
groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session);
/**
* 清理会话资源
*/
private void cleanupSession(WebSocketSession session) {
String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
String sessionId = session.getId();
String groupId = (String) session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY);
// 从本地管理移除
localSessionManager.removeUserSession(loginUserId, session);
if (groupId != null) {
localSessionManager.removeGroupSession(groupId, session);
}
// 从分布式存储注销
distributedSessionService.unregisterUserSession(loginUserId, sessionId);
if (groupId != null) {
distributedSessionService.removeUserFromGroup(groupId, sessionId);
}
this.updateOnlineStatus();
@ -376,64 +227,104 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
}
/**
* 给所有在线用户发送消息
*
* @param message
* 给某个用户发送消息 (兼容原有接口)
*/
public void sendMessageToAllUsers(List<String> message) {
public boolean sendMessageToUser(String loginName, SendVO message) {
return distributedMessageService.sendToUser(loginName, message);
}
SendVO msg = new SendVO();
msg.setContent(message.toString());
msg.setType("online");
msg.setSendmethod(msg.getType());
/**
* 获取所有在线用户登录名
*/
public List<String> getOnlineLoginNames() {
// 使用分布式服务获取所有在线用户登录名
return distributedSessionService.getOnlineLoginNames();
}
//不能这么操作
for (WebSocketSession user : users) {
try {
if (user.isOpen()) {
user.sendMessage(new TextMessage(msg.toString()));
/**
* 获取所有在线用户ID
*/
public List<Integer> getOnlineLoginUserId() {
// 使用分布式服务获取所有在线用户ID
return distributedSessionService.getAllOnlineUserIds();
}
/**
* 获取当前服务器的在线用户ID
*/
public List<Integer> getLocalOnlineLoginUserId() {
// 使用本地会话管理器获取当前服务器的在线用户ID
return localSessionManager.getLocalOnlineUserIds();
}
/**
* 获取当前服务器的在线用户登录名
*/
public List<String> getLocalOnlineLoginNames() {
// 使用本地会话管理器获取当前服务器的在线用户登录名
return localSessionManager.getLocalOnlineLoginNames();
}
/**
* 更新在线状态 - 通知所有用户在线列表变化
*/
public void updateOnlineStatus() {
try {
// 创建在线状态消息
SendVO onlineStatusMsg = new SendVO();
onlineStatusMsg.setType("online_status");
onlineStatusMsg.setSendmethod("broadcast");
// 获取在线用户列表
List<String> onlineUsers = getOnlineLoginNames();
List<Integer> onlineUserIds = getOnlineLoginUserId();
Map<String, Object> data = new HashMap<>();
data.put("onlineUsers", onlineUsers);
data.put("onlineUserIds", onlineUserIds);
data.put("timestamp", System.currentTimeMillis());
onlineStatusMsg.setMsg(data);
// 广播在线状态更新只广播到当前服务器避免循环广播
broadcastLocalOnlineStatus(onlineStatusMsg);
logger.debug("在线状态更新: {} 个用户在线", onlineUsers.size());
} catch (Exception e) {
logger.error("更新在线状态失败: " + e.getMessage(), e);
}
}
/**
* 广播在线状态到本地用户
*/
private void broadcastLocalOnlineStatus(SendVO message) {
// 获取本地所有用户会话
for (List<WebSocketSession> sessions : localSessionManager.userSessions.values()) {
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(message.toString()));
} catch (IOException e) {
logger.error("发送在线状态消息失败: " + e.getMessage(), e);
}
}
} catch (IOException e) {
logger.error("给所有在线用户发送信息失败!" + e.getMessage(), e);
}
}
}
/**
* 给某个用户发送消息
*
* @param loginName
* @param message
* 给所有在线用户发送消息
*/
public boolean sendMessageToUser(String loginName, SendVO message) {
boolean result = false;
public void sendMessageToAllUsers(SendVO message) {
// 获取所有在线用户ID
List<Integer> onlineUserIds = getOnlineLoginUserId();
//不能这样操作
//for (WebSocketSession user : users) {
List<WebSocketSession> lst = userSession.get(message.getToid());
if (CollUtil.isNotEmpty(lst)) {
for (WebSocketSession user : lst) {
//if (user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).equals(loginName)) {//允许用户多个浏览器登录每个浏览器页面都会收到用户信息
try {
if (user.isOpen()) {
user.sendMessage(new TextMessage(message.toString()));
result = true;
}
} catch (IOException e) {
logger.error("给用户发送信息失败!" + e.getMessage(), e);
}
//break;//注释掉此处意味着遍历该用户打开的所有页面并发送信息否则只会向用户登录的第一个网页发送信息
//}
}
for (Integer userId : onlineUserIds) {
String userIdStr = String.valueOf(userId);
distributedMessageService.sendToUser(userIdStr, message);
}
return result;
logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size());
}
public void updateOnlineStatus() {
//this.sendMessageToAllUsers(this.getOnlineLoginNames());//通知所有用户更新在线信息
}
}

View File

@ -0,0 +1,153 @@
package com.suisung.mall.im.common.websocket.utils;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.UUID;
/**
* 服务器ID生成工具类
*/
public class ServerIdGenerator {
private static volatile String cachedServerId;
/**
* 生成服务器唯一标识
*/
public static String generateServerId() {
if (cachedServerId != null) {
return cachedServerId;
}
synchronized (ServerIdGenerator.class) {
if (cachedServerId != null) {
return cachedServerId;
}
// 尝试多种方式生成稳定的服务器ID
String serverId = generateStableServerId();
cachedServerId = serverId;
return serverId;
}
}
/**
* 生成稳定的服务器ID基于机器标识
*/
private static String generateStableServerId() {
StringBuilder serverId = new StringBuilder("im-server-");
try {
// 1. 使用MAC地址最稳定
String macAddress = getPhysicalMacAddress();
if (!"UNKNOWN-MAC".equals(macAddress)) {
// 取MAC地址的后6位作为标识
String macSuffix = macAddress.replace(":", "").replace("-", "");
if (macSuffix.length() >= 6) {
serverId.append(macSuffix.substring(macSuffix.length() - 6).toLowerCase());
return serverId.toString();
}
}
// 2. 使用IP地址
String ipSuffix = getLocalIpSuffix();
if (!"UNKNOWN-IP".equals(ipSuffix)) {
serverId.append(ipSuffix);
return serverId.toString();
}
// 3. 使用主机名哈希
String hostname = InetAddress.getLocalHost().getHostName();
int hostnameHash = Math.abs(hostname.hashCode());
serverId.append(String.format("%06x", hostnameHash & 0xFFFFFF));
return serverId.toString();
} catch (Exception e) {
// 4. 最终备选UUID
String uuidSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 6);
serverId.append(uuidSuffix);
return serverId.toString();
}
}
/**
* 获取物理MAC地址
*/
private static String getPhysicalMacAddress() {
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
// 排除回环虚拟未启用的接口
if (networkInterface.isLoopback() ||
networkInterface.isVirtual() ||
!networkInterface.isUp()) {
continue;
}
// 排除常见的虚拟接口名称
String name = networkInterface.getName().toLowerCase();
if (name.contains("virtual") || name.contains("vmware") ||
name.contains("vbox") || name.contains("docker") ||
name.contains("wsl")) {
continue;
}
byte[] mac = networkInterface.getHardwareAddress();
if (mac != null && mac.length > 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < mac.length; i++) {
sb.append(String.format("%02X", mac[i]));
}
return sb.toString();
}
}
} catch (SocketException e) {
// 忽略异常
}
return "UNKNOWN-MAC";
}
/**
* 获取本地IP地址后缀
*/
private static String getLocalIpSuffix() {
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
if (networkInterface.isLoopback() || !networkInterface.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address.isSiteLocalAddress()) {
String ip = address.getHostAddress();
// 取IP地址的最后一段
String[] segments = ip.split("\\.");
if (segments.length == 4) {
return segments[3];
}
}
}
}
} catch (SocketException e) {
// 忽略异常
}
return "UNKNOWN-IP";
}
/**
* 生成带时间戳的服务器ID适合临时使用
*/
public static String generateTimestampServerId() {
String baseId = generateServerId();
long timestamp = System.currentTimeMillis() % 1000000; // 取后6位
return baseId + "-" + String.format("%06d", timestamp);
}
}

View File

@ -40,7 +40,7 @@ public class ChatSocketInfoController {
@RequestMapping(value = "/getUserOnline", method = RequestMethod.POST)
public List<Integer> getUserOnline(@RequestBody List<Integer> user_ids) {
logger.info(I18nUtil._("接收的用户ids:") + CollUtil.join(user_ids, ","));
ArrayList<Integer> onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId();
List<Integer> onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId();
Iterator<Integer> online_user_ids_iter = onlineLoginUserIds.iterator();
// 处理移除非本店铺客服
while (online_user_ids_iter.hasNext()) {

View File

@ -37,6 +37,15 @@ spring:
max-wait: -1ms
max-idle: 64
min-idle: 0
rabbitmq:
host: @rabbitmq.host@
port: @rabbitmq.port@
virtual-host: @rabbitmq.virtual-host@
listener:
simple:
acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认
username: @rabbitmq.username@
password: @rabbitmq.password@
cloud:
nacos:
discovery:

View File

@ -37,6 +37,15 @@ spring:
max-wait: -1ms
max-idle: 64
min-idle: 0
rabbitmq:
host: @rabbitmq.host@
port: @rabbitmq.port@
virtual-host: @rabbitmq.virtual-host@
listener:
simple:
acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认
username: @rabbitmq.username@
password: @rabbitmq.password@
cloud:
nacos:
discovery:

View File

@ -39,6 +39,15 @@ spring:
max-wait: -1ms
max-idle: 64
min-idle: 0
rabbitmq:
host: @rabbitmq.host@
port: @rabbitmq.port@
virtual-host: @rabbitmq.virtual-host@
listener:
simple:
acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认
username: @rabbitmq.username@
password: @rabbitmq.password@
cloud:
nacos:
discovery:

View File

@ -37,6 +37,15 @@ spring:
max-wait: -1ms
max-idle: 64
min-idle: 0
rabbitmq:
host: @rabbitmq.host@
port: @rabbitmq.port@
virtual-host: @rabbitmq.virtual-host@
listener:
simple:
acknowledge-mode: manual #手动确认消息,不使用默认的消费端确认
username: @rabbitmq.username@
password: @rabbitmq.password@
cloud:
nacos:
discovery: