im调整组的sesion,代码优化

This commit is contained in:
liyj 2025-11-20 10:35:14 +08:00
parent 0b6c87aa31
commit 47331a5b2a
4 changed files with 25 additions and 26 deletions

View File

@ -36,10 +36,9 @@ public class DistributedMessageService {
* 发送消息给用户 * 发送消息给用户
* @param targetUserId 目标用户就是account的id * @param targetUserId 目标用户就是account的id
* @param message * @param message
* @param userSessions
* @return * @return
*/ */
public boolean sendToUser(String targetUserId, SendVO message, ConcurrentHashMap<String, List<WebSocketSession>> userSessions) { public boolean sendToUser(String targetUserId, SendVO message) {
log.info("targetUserId{},message:{}", targetUserId, message); log.info("targetUserId{},message:{}", targetUserId, message);
String targetServer = sessionService.getUserServer(targetUserId); String targetServer = sessionService.getUserServer(targetUserId);
@ -50,7 +49,7 @@ public class DistributedMessageService {
if (isCurrentServer(targetServer)) { if (isCurrentServer(targetServer)) {
// 用户在当前服务器直接发送 // 用户在当前服务器直接发送
return localSessionManager.sendToUser(targetUserId, message,userSessions); return localSessionManager.sendToUser(targetUserId, message);
} else { } else {
// 用户在其他服务器通过RabbitMQ转发 // 用户在其他服务器通过RabbitMQ转发
forwardToUser(targetServer, targetUserId, message); forwardToUser(targetServer, targetUserId, message);

View File

@ -27,10 +27,11 @@ public class LocalSessionManager {
private ConcurrentHashMap<String, List<WebSocketSession>> userSessions; private ConcurrentHashMap<String, List<WebSocketSession>> userSessions;
// 本地存储的群组会话 (groupId -> sessions) // 本地存储的群组会话 (groupId -> sessions)
private ConcurrentHashMap<String, List<WebSocketSession>> groupSessions = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, List<WebSocketSession>> groupSessions;
public LocalSessionManager() { public LocalSessionManager() {
this.userSessions = MallsuiteImSocketHandler.userSessions; this.userSessions = MallsuiteImSocketHandler.userSessions;
this.groupSessions = MallsuiteImSocketHandler.groupSessions;
} }
/** /**
@ -51,18 +52,14 @@ public class LocalSessionManager {
* 发送消息给用户 * 发送消息给用户
* @param userId account的id * @param userId account的id
* @param message * @param message
* @param userSession
* @return * @return
*/ */
public boolean sendToUser(String userId, SendVO message,ConcurrentHashMap<String, List<WebSocketSession>> userSession) { public boolean sendToUser(String userId, SendVO message) {
if (userId == null || message == null) { if (userId == null || message == null) {
log.info("发送消息失败: 参数为空"); log.info("发送消息失败: 参数为空");
return false; return false;
} }
if(null!=userSession){ log.info("userSessions={}", userSessions);
this.userSessions=userSession;
}
log.info("userSessions={}", userSession);
List<WebSocketSession> sessions = userSessions.get(userId); List<WebSocketSession> sessions = userSessions.get(userId);
if (sessions == null || sessions.isEmpty()) { if (sessions == null || sessions.isEmpty()) {
log.info("用户没有活跃会话: {}", userId); log.info("用户没有活跃会话: {}", userId);
@ -111,14 +108,6 @@ public class LocalSessionManager {
return false; return false;
} }
/**
* 添加群组会话
*/
public void addGroupSession(String groupId, WebSocketSession session) {
groupSessions.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>())
.add(session);
}
/** /**
* 移除群组会话 * 移除群组会话
*/ */

View File

@ -26,8 +26,7 @@ public class MessageConsumerService {
public void consumeForwardMessage(Map<String, Object> message) { public void consumeForwardMessage(Map<String, Object> message) {
try { try {
String type = (String) message.get("type"); String type = (String) message.get("type");
System.out.println("收到RabbitMQ消息, 类型: " + type + ", 服务器: " + rabbitMQManager.getServerId()); logger.info("收到RabbitMQ消息, 类型: {}, 服务器: {}", type, rabbitMQManager.getServerId());
switch (type) { switch (type) {
case "SEND_TO_USER": case "SEND_TO_USER":
handleSendToUser(message); handleSendToUser(message);
@ -65,7 +64,7 @@ public class MessageConsumerService {
} }
if (userMsg != null) { if (userMsg != null) {
boolean success = localSessionManager.sendToUser(userId, userMsg,null); boolean success = localSessionManager.sendToUser(userId, userMsg);
logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败"); logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败");
} else { } else {
logger.info("消息格式错误,无法发送给用户: {}", userId); logger.info("消息格式错误,无法发送给用户: {}", userId);

View File

@ -36,6 +36,8 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
public static ConcurrentHashMap<String, List<WebSocketSession>> userSessions; public static ConcurrentHashMap<String, List<WebSocketSession>> userSessions;
public static ConcurrentHashMap<String, List<WebSocketSession>> groupSessions;
@Autowired @Autowired
private DistributedSessionService distributedSessionService; private DistributedSessionService distributedSessionService;
@ -50,6 +52,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
static { static {
userSessions=new ConcurrentHashMap<>(); userSessions=new ConcurrentHashMap<>();
groupSessions=new ConcurrentHashMap<>();
} }
//用户上线后触发 //用户上线后触发
@ -134,7 +137,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
chat.setMsg(msg); chat.setMsg(msg);
chat.setType("friend"); chat.setType("friend");
chat.setCreate_date(new Date()); chat.setCreate_date(new Date());
boolean isSuccess = distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO,localSessionManager.getUserSessions()); boolean isSuccess = distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO);
if (isSuccess) { if (isSuccess) {
chat.setStatus("1"); chat.setStatus("1");
} else { } else {
@ -144,7 +147,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
sendVO.setMsg_type("text"); sendVO.setMsg_type("text");
sendVO.setMessage_length("0"); sendVO.setMessage_length("0");
sendVO.setId(to.getId()); sendVO.setId(to.getId());
distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO,localSessionManager.getUserSessions()); distributedMessageService.sendToUser(String.valueOf(receiveDTO.getTo().getFriend_id()), sendVO);
chat.setStatus("0"); chat.setStatus("0");
} }
chatHistoryService.saveNew(chat); chatHistoryService.saveNew(chat);
@ -160,7 +163,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid); session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid);
// 添加到本地群组 // 添加到本地群组
localSessionManager.addGroupSession(zid, session); this.addGroupSession(zid, session);
// 添加到分布式群组 // 添加到分布式群组
String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
@ -233,7 +236,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
* 给某个用户发送消息 (兼容原有接口) * 给某个用户发送消息 (兼容原有接口)
*/ */
public boolean sendMessageToUser(String loginName, SendVO message) { public boolean sendMessageToUser(String loginName, SendVO message) {
return distributedMessageService.sendToUser(loginName, message,localSessionManager.getUserSessions()); return distributedMessageService.sendToUser(loginName, message);
} }
/** /**
@ -333,7 +336,7 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
for (Integer userId : onlineUserIds) { for (Integer userId : onlineUserIds) {
String userIdStr = String.valueOf(userId); String userIdStr = String.valueOf(userId);
distributedMessageService.sendToUser(userIdStr, message,localSessionManager.getUserSessions()); distributedMessageService.sendToUser(userIdStr, message);
} }
logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size()); logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size());
@ -360,4 +363,13 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
logger.info("添加用户会话成功: userId={}, sessionId={}", userId, session.getId()); logger.info("添加用户会话成功: userId={}, sessionId={}", userId, session.getId());
} }
/**
* 添加群组会话
*/
public void addGroupSession(String groupId, WebSocketSession session) {
groupSessions.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>())
.add(session);
}
} }