From d493ebc20e3fe1c78eec236b23c730b663a7ee68 Mon Sep 17 00:00:00 2001 From: liyj <1617420630@qq.com> Date: Wed, 19 Nov 2025 15:14:53 +0800 Subject: [PATCH] =?UTF-8?q?im=E8=BF=98=E5=8E=9F=E5=8D=95=E6=9C=BA=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../onchat/MallsuiteImSocketHandler.java | 476 +++++++++++------- 1 file changed, 289 insertions(+), 187 deletions(-) diff --git a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java index 5d35d14b..82ed6eb2 100644 --- a/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java +++ b/mall-im/src/main/java/com/suisung/mall/im/common/websocket/service/onchat/MallsuiteImSocketHandler.java @@ -1,11 +1,11 @@ package com.suisung.mall.im.common.websocket.service.onchat; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONUtil; -import com.suisung.mall.im.common.websocket.service.DistributedMessageService; -import com.suisung.mall.im.common.websocket.service.DistributedSessionService; -import com.suisung.mall.im.common.websocket.service.LocalSessionManager; +import com.suisung.mall.common.feignService.AccountService; +import com.suisung.mall.common.utils.CheckUtil; import com.suisung.mall.im.common.websocket.utils.Constants; import com.suisung.mall.im.pojo.entity.ChatHistory; import com.suisung.mall.im.pojo.vo.MineDTO; @@ -13,77 +13,164 @@ import com.suisung.mall.im.pojo.vo.ReceiveDTO; import com.suisung.mall.im.pojo.vo.SendVO; import com.suisung.mall.im.pojo.vo.ToDTO; import com.suisung.mall.im.service.ChatHistoryService; +import com.suisung.mall.im.service.LayGroupService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; - import org.springframework.web.socket.*; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; public class MallsuiteImSocketHandler implements WebSocketHandler { private static Logger logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class); - // 移除原有的静态集合,使用注入的服务 - // private static ArrayList users; - // private static ArrayList usersStr; - // private static Map> userSession; - // private static Map> groupSession; + private static ArrayList users; + private static ArrayList usersStr; - @Autowired - private DistributedSessionService distributedSessionService; + //存入用户的所有终端的连接信息 + private static Map> userSession; - @Autowired - private DistributedMessageService distributedMessageService; + //群组信息 + private static Map> groupSession; - @Autowired - private LocalSessionManager localSessionManager; + static { + users = new ArrayList<>(); + usersStr = new ArrayList<>(); + userSession = Collections.synchronizedMap(new HashMap<>()); + groupSession = Collections.synchronizedMap(new HashMap<>()); + logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class); + } + + public ArrayList getOnlineLoginNames() { + ArrayList onlineLoginNames = new ArrayList(); + for (WebSocketSession user : users) { + String userName = (String) user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); + if (userName != null) { + onlineLoginNames.add(userName); + } + } + return onlineLoginNames; + + } + + public ArrayList getOnlineLoginUserId() { + ArrayList onlineLoginUserId = new ArrayList(); + for (WebSocketSession user : users) { + Integer user_id = Convert.toInt(user.getAttributes().get("user_id")); + if (CheckUtil.isNotEmpty(user_id)) { + onlineLoginUserId.add(user_id); + } + } + return onlineLoginUserId; + + } @Autowired private ChatHistoryService chatHistoryService; - + @Autowired + private LayGroupService layGroupService; + @Autowired + private AccountService accountService; //用户上线后触发 @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.debug("connect to the websocket success......"); - - String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ; String sessionId = session.getId(); + users.add(session); + this.updateOnlineStatus(); + String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);//获取刚上线用户的loginName - // 存储到本地会话管理 - ConcurrentHashMap> userSessions=new ConcurrentHashMap<>(); - localSessionManager.setUserSessions(userSessions); - localSessionManager.addUserSession(loginUserId, session); + List loginSessions = userSession.get(loginUserId); + if (CollUtil.isEmpty(loginSessions)) { + if (null == loginSessions) { + loginSessions = new ArrayList<>(); + } - logger.info("添加会话到本地成功:{}", localSessionManager.getUserSessions().values()); + loginSessions.add(session); + userSession.put(loginUserId, loginSessions); + } else { + if (!loginSessions.contains(session)) { + loginSessions.add(session); + } + } - // 注册到分布式会话服务 - Map attributes = new HashMap<>(); - attributes.put("user_id", String.valueOf(session.getAttributes().get("user_id"))); - attributes.put("loginName", loginUserId); - distributedSessionService.registerUserSession(loginUserId, sessionId, attributes); + if (loginUserId != null) { + SendVO msg = new SendVO(); + //读取离线信息 + ChatHistory chat = new ChatHistory(); + chat.setReceiver(loginUserId);//发给刚上线的用户信息 + chat.setStatus("0"); + List list = chatHistoryService.findList(chat); + /* + for(ChatHistory c : list){ + String sender=""; + String receiver=""; - // 处理离线消息等原有逻辑... - // this.updateOnlineStatus(); + if("friend".equals(c.getType()) || "user".equals(c.getType())){//如果是个人信息 + sender = c.getSender(); //todo 确认数据库存的数据uid user_id 不一致 + msg.setId(sender); + msg.setToid(c.getReceiver()); + msg.setContent(c.getMsg()); + //todu 缺少数据 + //msg = sender+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_; + }else if(c.getType().contains("_group")){//如果是直播群组信息 + msg.setType("group"); + sender = c.getSender(); + String groupId = c.getReceiver().split("_")[0];//直播群组id + receiver=c.getReceiver().split("_")[1];//直播群组所属user_id + msg = sender+Constants._msg_+groupId+Constants._msg_+c.getMsg()+Constants._msg_; + + }else{//如果是群组信息 + msg.setType("group"); + + String groupId = c.getSender().split(Constants._msg_)[0];//群组id + sender=c.getSender().split(Constants._msg_)[1];//发送者loginName + msg = groupId+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_; + + } + + //todo 应该冗余 + //获取用户基本信息 + AccountUserBase accountUserBase = accountService.getUserBase(Integer.valueOf(sender)); + //AccountUserBase accountUserBase= result.getFenResult(AccountUserBase.class); + //AccountUserInfo userInfo=accountService.getUserInfo(accountUserBase.getUser_id()); + AccountUserInfo userInfo=accountService.getUserInfo(Integer.valueOf(sender)); + + + msg.setUser_id(Convert.toInt(session.getAttributes().get("user_id"))); + msg.setUsername(accountUserBase.getUser_nickname()); + msg.setAvatar(userInfo.getUser_avatar()); + //msg.setMessage_id(c.getId()); + msg.setFromid(msg.getId()); + msg.setTime(c.getCreate_date().getTime()); + msg.setStatus(0); //? + msg.setMsg(new HashMap()); + + boolean isSuccess = this.sendMessageToUser(loginUserId, msg); + if(isSuccess) { + c.setStatus("1");//标记为已读 + chatHistoryService.saveNew(c); + } + } + */ + } } //接收js侧发送来的用户信息 @Override public void handleMessage(WebSocketSession session, WebSocketMessage socketMessage) throws Exception { ReceiveDTO receiveDTO = JSONUtil.toBean(socketMessage.getPayload().toString(), ReceiveDTO.class); - if (receiveDTO != null) { + if (receiveDTO != null) {//发送消息 SendVO sendVO = new SendVO(); MineDTO mine = receiveDTO.getMine(); ToDTO to = receiveDTO.getTo(); - // 设置sendVO的代码保持不变... sendVO.setToid(to.getId()); - sendVO.setZid(to.getZid()); + sendVO.setZid(to.getZid()); //群组特有 sendVO.setId(mine.getId()); Integer toUserId = Convert.toInt(session.getAttributes().get("user_id")); sendVO.setUser_id(toUserId); @@ -95,33 +182,32 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { to.setType("friend"); } - sendVO.setType(to.getType()); + sendVO.setType(to.getType()); //是从to中读取Type sendVO.setSendmethod(sendVO.getType()); + sendVO.setContent(mine.getContent()); sendVO.setMessage_id(mine.getMessage_id()); + sendVO.setFromid(mine.getId()); sendVO.setTime(new Date().getTime()); - sendVO.setStatus(0); + sendVO.setStatus(0); //? sendVO.setMsg_type(mine.getType()); sendVO.setItem_id(mine.getItem_id()); sendVO.setMsg(new HashMap()); sendVO.setMessage_length(mine.getLength()); - String sender = mine.getId(); - String receiver = to.getId(); + String sender = mine.getId();//信息发送者登录名(loginName)或user_id + String receiver = to.getId();//信息接收者,如果是私聊就是用户loginName,如果是群聊就是群组id String msg = mine.getContent(); String avatar = mine.getAvatar(); String type = to.getType(); - String senderName = mine.getUsername(); - - // 更新心跳 - distributedSessionService.updateHeartbeat(sender, session.getId()); + String senderName = mine.getUsername();//发送者姓名(name) if (!mine.getId().equals(to.getId())) { sendVO.setMine(false); - + //保存聊天记录 if ("friend".equals(type) || "user".equals(type)) { - // 私聊消息 - 使用分布式发送 + //如果是私聊 ChatHistory chat = new ChatHistory(); chat.setId(IdUtil.simpleUUID()); chat.setSender(sender); @@ -129,49 +215,124 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { chat.setMsg(msg); chat.setType("friend"); chat.setCreate_date(new Date()); - - boolean isSuccess = distributedMessageService.sendToUser(receiver, sendVO); - if (isSuccess) { - chat.setStatus("1"); + boolean isSuccess = this.sendMessageToUser(receiver, sendVO); + if (isSuccess) {//如果信息发送成功 + chat.setStatus("1");//设置为已读 } else { - // 发送离线消息提醒 sendVO.setToid(mine.getId()); sendVO.setContent(to.getName() + " 对方现在离线,他将在上线后收到你的消息!"); sendVO.setMsg_type("text"); sendVO.setMessage_length("0"); sendVO.setId(to.getId()); - distributedMessageService.sendToUser(sender, sendVO); - chat.setStatus("0"); + this.sendMessageToUser(sender, sendVO);//同时向本人发送对方不在线消息 + chat.setStatus("0");//设置为未读 } chatHistoryService.saveNew(chat); + } else if ("group".equals(type)) {//如果是群聊 + // 临时,不经过数据库 + List groupLoginSession = groupSession.get(to.getZid()); - } else if ("group".equals(type)) { - // 群聊消息 - 使用分布式发送 - String groupId = to.getZid(); - distributedMessageService.sendToGroup(groupId, sendVO, session.getId()); + if (!CollUtil.isEmpty(groupLoginSession)) { + for (WebSocketSession gs : groupLoginSession) { - } else if ("join_group".equals(type)) { - // 加入群组 + if (sender.equals(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString())) {//群消息不发给自己,但是存一条记录当做聊天记录查询 + + } else { + //设置接受用户 + sendVO.setToid(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString()); + boolean isSuccess = this.sendMessageToUser(to.getName(), sendVO); + if (isSuccess) { + + } else { + } + } + + //chatHistoryService.saveNew(chat); + } + } + + + /* + List layGroupUserlist = new ArrayList(); + + //群主 + LayGroupUser owner = new LayGroupUser(); + LayGroup layGroup = layGroupService.get(receiver); + owner.setUser_id(layGroup.getCreate_by()); + layGroupUserlist.add(owner); + + //群成员1 + List zlist = layGroupService.getUsersByGroup(layGroupService.get(receiver)); + layGroupUserlist.addAll(zlist); + + for (LayGroupUser lgUser : layGroupUserlist) { + AccountUserBase user_base_row = accountService.getUserBase(Integer.valueOf(lgUser.getUser_id())); + ChatHistory chat = new ChatHistory(); + chat.setId(IdUtil.simpleUUID()); + //群聊时信息先发送给群聊id(即receiver),在后台转发给所有非发送者(sender)的人的话,群聊id(即receiver)就变成发送者。 + String groupId = receiver; + //保存聊天记录 + chat.setSender(groupId + Constants._msg_ + sender);//群聊时保存群聊id和发送者id + chat.setReceiver(user_base_row.getUser_account());//群中所有信息获得者人员 + chat.setMsg(msg); + chat.setType("group"); + //message = groupId + Constants._msg_ + user_base_row.getUser_account() + Constants._msg_ + msg + Constants._msg_ + avatar + Constants._msg_ + type + Constants._msg_ + senderName + Constants._msg_ + datatime; + if (sender.equals(user_base_row.getUser_account())) {//群消息不发给自己,但是存一条记录当做聊天记录查询 + chat.setStatus("1");//发送成功,设置为已读 + } else { + boolean isSuccess = this.sendMessageToUser(user_base_row.getUser_account(), sendVO); + if (isSuccess) { + chat.setStatus("1");//发送成功,设置为已读 + } else { + chat.setStatus("0");//用户不在线,设置为未读 + } + } + + chatHistoryService.saveNew(chat); + } + */ + + } else if ("join_group".equals(type)) { //临时群组,聚焦当前窗口 String zid = to.getZid(); + + //设置session,属性上标出所属群组: 目前无同时多个群组需求,如果存在,此处存入list session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid); - // 添加到本地群组 - localSessionManager.addGroupSession(zid, session); + //设置群组中用户 + List groupLoginSession = groupSession.get(zid); - // 添加到分布式群组 - String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME); - distributedSessionService.addUserToGroup(zid, userId, session.getId()); + if (CollUtil.isEmpty(groupLoginSession)) { + if (null == groupLoginSession) { + groupLoginSession = new ArrayList<>(); + } - } else if ("leave_group".equals(type)) { - // 离开群组 + groupLoginSession.add(session); + groupSession.put(zid, groupLoginSession); + } else { + if (!groupLoginSession.contains(session)) { + groupLoginSession.add(session); + } + } + + //todo通知已存在群组用户消息 可启动独立task + } else if ("leave_group".equals(type)) { //临时群组,聚焦当前窗口 String zid = to.getZid(); + + //设置session,属性上标出所属群组 session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, null); - // 从本地群组移除 - localSessionManager.removeGroupSession(zid, session); + //设置群组中用户 + List groupLoginSession = groupSession.get(zid); - // 从分布式群组移除 - distributedSessionService.removeUserFromGroup(zid, session.getId()); + if (CollUtil.isEmpty(groupLoginSession)) { + } else { + if (groupLoginSession.contains(session)) { + groupLoginSession.remove(session); + } + } + + //todo通知已存在群组用户消息 可启动独立task + } else { } } else { sendVO.setMine(true); @@ -185,39 +346,28 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { session.close(); } logger.debug("websocket connection closed......"); - cleanupSession(session); + users.remove(session); + userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session); + + if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) { + groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session); + } + + this.updateOnlineStatus(); + } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); - cleanupSession(session); - } + users.remove(session); + userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session); - /** - * 清理会话资源 - */ - private void cleanupSession(WebSocketSession session) { - String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ; - String sessionId = session.getId(); - String groupId = (String) session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY); - - // 从本地管理移除 - if(localSessionManager.getUserSessions()==null){ - localSessionManager.setUserSessions(new ConcurrentHashMap<>()); - } - localSessionManager.removeUserSession(loginUserId, session); - if (groupId != null) { - localSessionManager.removeGroupSession(groupId, session); + if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) { + groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session); } - // 从分布式存储注销 - distributedSessionService.unregisterUserSession(loginUserId, sessionId); - if (groupId != null) { - distributedSessionService.removeUserFromGroup(groupId, sessionId); - } - - // this.updateOnlineStatus(); + this.updateOnlineStatus(); } @Override @@ -226,112 +376,64 @@ public class MallsuiteImSocketHandler implements WebSocketHandler { } /** - * 给某个用户发送消息 (兼容原有接口) + * 给所有在线用户发送消息 + * + * @param message */ - public boolean sendMessageToUser(String loginName, SendVO message) { - return distributedMessageService.sendToUser(loginName, message); - } + public void sendMessageToAllUsers(List message) { - /** - * 获取所有在线用户登录名 - */ - public List getOnlineLoginNames() { - // 使用分布式服务获取所有在线用户登录名 - return distributedSessionService.getOnlineLoginNames(); - } + SendVO msg = new SendVO(); + msg.setContent(message.toString()); + msg.setType("online"); + msg.setSendmethod(msg.getType()); - /** - * 获取所有在线用户ID - */ - public List getOnlineLoginUserId() { - // 使用分布式服务获取所有在线用户ID - return distributedSessionService.getAllOnlineUserIds(); - } - - /** - * 获取当前服务器的在线用户ID - */ - public List getLocalOnlineLoginUserId() { - // 使用本地会话管理器获取当前服务器的在线用户ID - return localSessionManager.getLocalOnlineUserIds(); - } - - /** - * 获取当前服务器的在线用户登录名 - */ - public List getLocalOnlineLoginNames() { - // 使用本地会话管理器获取当前服务器的在线用户登录名 - return localSessionManager.getLocalOnlineLoginNames(); - } - - /** - * 更新在线状态 - 通知所有用户在线列表变化 - */ - public void updateOnlineStatus() { - try { - // 创建在线状态消息 - SendVO onlineStatusMsg = new SendVO(); - onlineStatusMsg.setType("online_status"); - onlineStatusMsg.setSendmethod("broadcast"); - - // 获取在线用户列表 - List onlineUsers = getOnlineLoginNames(); - List onlineUserIds = getOnlineLoginUserId(); - - Map data = new HashMap<>(); - data.put("onlineUsers", onlineUsers); - data.put("onlineUserIds", onlineUserIds); - data.put("timestamp", System.currentTimeMillis()); - - onlineStatusMsg.setMsg(data); - - // 广播在线状态更新(只广播到当前服务器,避免循环广播) - broadcastLocalOnlineStatus(onlineStatusMsg); - - logger.info("在线状态更新: {} 个用户在线", onlineUsers.size()); - } catch (Exception e) { - logger.error("更新在线状态失败: " + e.getMessage(), e); - } - } - - /** - * 广播在线状态到本地用户 - */ - private void broadcastLocalOnlineStatus(SendVO message) { - logger.info("localSessionManager: {}", localSessionManager); - logger.info("localSessionManager.userSessions: {}", localSessionManager.getUserSessions()); - logger.info("localSessionManager.userSessions.values: {}", localSessionManager.getUserSessions().values()); - if (localSessionManager == null || localSessionManager.getUserSessions().isEmpty()) { - logger.error("localSessionManager is null, cannot broadcast online status"); - return; - } - - // 获取本地所有用户会话 - for (List sessions : localSessionManager.getUserSessions().values()) { - for (WebSocketSession session : sessions) { - if (session!=null&&session.isOpen()) { - try { - session.sendMessage(new TextMessage(message.toString())); - } catch (IOException e) { - logger.error("发送在线状态消息失败: " + e.getMessage(), e); - } + //不能这么操作 + for (WebSocketSession user : users) { + try { + if (user.isOpen()) { + user.sendMessage(new TextMessage(msg.toString())); } + } catch (IOException e) { + logger.error("给所有在线用户发送信息失败!" + e.getMessage(), e); } } } /** - * 给所有在线用户发送消息 + * 给某个用户发送消息 + * + * @param loginName + * @param message */ - public void sendMessageToAllUsers(SendVO message) { - // 获取所有在线用户ID - List onlineUserIds = getOnlineLoginUserId(); + public boolean sendMessageToUser(String loginName, SendVO message) { + boolean result = false; - for (Integer userId : onlineUserIds) { - String userIdStr = String.valueOf(userId); - distributedMessageService.sendToUser(userIdStr, message); + //不能这样操作。 + //for (WebSocketSession user : users) { + List lst = userSession.get(message.getToid()); + + if (CollUtil.isNotEmpty(lst)) { + for (WebSocketSession user : lst) { + //if (user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).equals(loginName)) {//允许用户多个浏览器登录,每个浏览器页面都会收到用户信息 + try { + if (user.isOpen()) { + user.sendMessage(new TextMessage(message.toString())); + result = true; + } + } catch (IOException e) { + logger.error("给用户发送信息失败!" + e.getMessage(), e); + } + + //break;//注释掉此处意味着遍历该用户打开的所有页面并发送信息,否则只会向用户登录的第一个网页发送信息。 + //} + } } - logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size()); + return result; } + + public void updateOnlineStatus() { + //this.sendMessageToAllUsers(this.getOnlineLoginNames());//通知所有用户更新在线信息 + } + } \ No newline at end of file