im还原单机测试

This commit is contained in:
liyj 2025-11-19 15:14:53 +08:00
parent 4259a7e454
commit 7dd3822e44

View File

@ -1,11 +1,11 @@
package com.suisung.mall.im.common.websocket.service.onchat;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONUtil;
import com.suisung.mall.im.common.websocket.service.DistributedMessageService;
import com.suisung.mall.im.common.websocket.service.DistributedSessionService;
import com.suisung.mall.im.common.websocket.service.LocalSessionManager;
import com.suisung.mall.common.feignService.AccountService;
import com.suisung.mall.common.utils.CheckUtil;
import com.suisung.mall.im.common.websocket.utils.Constants;
import com.suisung.mall.im.pojo.entity.ChatHistory;
import com.suisung.mall.im.pojo.vo.MineDTO;
@ -13,77 +13,164 @@ import com.suisung.mall.im.pojo.vo.ReceiveDTO;
import com.suisung.mall.im.pojo.vo.SendVO;
import com.suisung.mall.im.pojo.vo.ToDTO;
import com.suisung.mall.im.service.ChatHistoryService;
import com.suisung.mall.im.service.LayGroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class MallsuiteImSocketHandler implements WebSocketHandler {
private static Logger logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class);
// 移除原有的静态集合使用注入的服务
// private static ArrayList<WebSocketSession> users;
// private static ArrayList<String> usersStr;
// private static Map<String, List<WebSocketSession>> userSession;
// private static Map<String, List<WebSocketSession>> groupSession;
private static ArrayList<WebSocketSession> users;
private static ArrayList<String> usersStr;
@Autowired
private DistributedSessionService distributedSessionService;
//存入用户的所有终端的连接信息
private static Map<String, List<WebSocketSession>> userSession;
@Autowired
private DistributedMessageService distributedMessageService;
//群组信息
private static Map<String, List<WebSocketSession>> groupSession;
@Autowired
private LocalSessionManager localSessionManager;
static {
users = new ArrayList<>();
usersStr = new ArrayList<>();
userSession = Collections.synchronizedMap(new HashMap<>());
groupSession = Collections.synchronizedMap(new HashMap<>());
logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class);
}
public ArrayList<String> getOnlineLoginNames() {
ArrayList<String> onlineLoginNames = new ArrayList<String>();
for (WebSocketSession user : users) {
String userName = (String) user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
if (userName != null) {
onlineLoginNames.add(userName);
}
}
return onlineLoginNames;
}
public ArrayList<Integer> getOnlineLoginUserId() {
ArrayList<Integer> onlineLoginUserId = new ArrayList<Integer>();
for (WebSocketSession user : users) {
Integer user_id = Convert.toInt(user.getAttributes().get("user_id"));
if (CheckUtil.isNotEmpty(user_id)) {
onlineLoginUserId.add(user_id);
}
}
return onlineLoginUserId;
}
@Autowired
private ChatHistoryService chatHistoryService;
@Autowired
private LayGroupService layGroupService;
@Autowired
private AccountService accountService;
//用户上线后触发
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.debug("connect to the websocket success......");
String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ;
String sessionId = session.getId();
users.add(session);
this.updateOnlineStatus();
String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);//获取刚上线用户的loginName
// 存储到本地会话管理
ConcurrentHashMap<String, List<WebSocketSession>> userSessions=new ConcurrentHashMap<>();
localSessionManager.setUserSessions(userSessions);
localSessionManager.addUserSession(loginUserId, session);
List<WebSocketSession> loginSessions = userSession.get(loginUserId);
if (CollUtil.isEmpty(loginSessions)) {
if (null == loginSessions) {
loginSessions = new ArrayList<>();
}
logger.info("添加会话到本地成功:{}", localSessionManager.getUserSessions().values());
loginSessions.add(session);
userSession.put(loginUserId, loginSessions);
} else {
if (!loginSessions.contains(session)) {
loginSessions.add(session);
}
}
// 注册到分布式会话服务
Map<String, Object> attributes = new HashMap<>();
attributes.put("user_id", String.valueOf(session.getAttributes().get("user_id")));
attributes.put("loginName", loginUserId);
distributedSessionService.registerUserSession(loginUserId, sessionId, attributes);
if (loginUserId != null) {
SendVO msg = new SendVO();
//读取离线信息
ChatHistory chat = new ChatHistory();
chat.setReceiver(loginUserId);//发给刚上线的用户信息
chat.setStatus("0");
List<ChatHistory> list = chatHistoryService.findList(chat);
/*
for(ChatHistory c : list){
String sender="";
String receiver="";
// 处理离线消息等原有逻辑...
// this.updateOnlineStatus();
if("friend".equals(c.getType()) || "user".equals(c.getType())){//如果是个人信息
sender = c.getSender(); //todo 确认数据库存的数据uid user_id 不一致
msg.setId(sender);
msg.setToid(c.getReceiver());
msg.setContent(c.getMsg());
//todu 缺少数据
//msg = sender+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_;
}else if(c.getType().contains("_group")){//如果是直播群组信息
msg.setType("group");
sender = c.getSender();
String groupId = c.getReceiver().split("_")[0];//直播群组id
receiver=c.getReceiver().split("_")[1];//直播群组所属user_id
msg = sender+Constants._msg_+groupId+Constants._msg_+c.getMsg()+Constants._msg_;
}else{//如果是群组信息
msg.setType("group");
String groupId = c.getSender().split(Constants._msg_)[0];//群组id
sender=c.getSender().split(Constants._msg_)[1];//发送者loginName
msg = groupId+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_;
}
//todo 应该冗余
//获取用户基本信息
AccountUserBase accountUserBase = accountService.getUserBase(Integer.valueOf(sender));
//AccountUserBase accountUserBase= result.getFenResult(AccountUserBase.class);
//AccountUserInfo userInfo=accountService.getUserInfo(accountUserBase.getUser_id());
AccountUserInfo userInfo=accountService.getUserInfo(Integer.valueOf(sender));
msg.setUser_id(Convert.toInt(session.getAttributes().get("user_id")));
msg.setUsername(accountUserBase.getUser_nickname());
msg.setAvatar(userInfo.getUser_avatar());
//msg.setMessage_id(c.getId());
msg.setFromid(msg.getId());
msg.setTime(c.getCreate_date().getTime());
msg.setStatus(0); //?
msg.setMsg(new HashMap());
boolean isSuccess = this.sendMessageToUser(loginUserId, msg);
if(isSuccess) {
c.setStatus("1");//标记为已读
chatHistoryService.saveNew(c);
}
}
*/
}
}
//接收js侧发送来的用户信息
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> socketMessage) throws Exception {
ReceiveDTO receiveDTO = JSONUtil.toBean(socketMessage.getPayload().toString(), ReceiveDTO.class);
if (receiveDTO != null) {
if (receiveDTO != null) {//发送消息
SendVO sendVO = new SendVO();
MineDTO mine = receiveDTO.getMine();
ToDTO to = receiveDTO.getTo();
// 设置sendVO的代码保持不变...
sendVO.setToid(to.getId());
sendVO.setZid(to.getZid());
sendVO.setZid(to.getZid()); //群组特有
sendVO.setId(mine.getId());
Integer toUserId = Convert.toInt(session.getAttributes().get("user_id"));
sendVO.setUser_id(toUserId);
@ -95,33 +182,32 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
to.setType("friend");
}
sendVO.setType(to.getType());
sendVO.setType(to.getType()); //是从to中读取Type
sendVO.setSendmethod(sendVO.getType());
sendVO.setContent(mine.getContent());
sendVO.setMessage_id(mine.getMessage_id());
sendVO.setFromid(mine.getId());
sendVO.setTime(new Date().getTime());
sendVO.setStatus(0);
sendVO.setStatus(0); //?
sendVO.setMsg_type(mine.getType());
sendVO.setItem_id(mine.getItem_id());
sendVO.setMsg(new HashMap());
sendVO.setMessage_length(mine.getLength());
String sender = mine.getId();
String receiver = to.getId();
String sender = mine.getId();//信息发送者登录名(loginName)或user_id
String receiver = to.getId();//信息接收者如果是私聊就是用户loginName如果是群聊就是群组id
String msg = mine.getContent();
String avatar = mine.getAvatar();
String type = to.getType();
String senderName = mine.getUsername();
// 更新心跳
distributedSessionService.updateHeartbeat(sender, session.getId());
String senderName = mine.getUsername();//发送者姓名(name)
if (!mine.getId().equals(to.getId())) {
sendVO.setMine(false);
//保存聊天记录
if ("friend".equals(type) || "user".equals(type)) {
// 私聊消息 - 使用分布式发送
//如果是私聊
ChatHistory chat = new ChatHistory();
chat.setId(IdUtil.simpleUUID());
chat.setSender(sender);
@ -129,49 +215,124 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
chat.setMsg(msg);
chat.setType("friend");
chat.setCreate_date(new Date());
boolean isSuccess = distributedMessageService.sendToUser(receiver, sendVO);
if (isSuccess) {
chat.setStatus("1");
boolean isSuccess = this.sendMessageToUser(receiver, sendVO);
if (isSuccess) {//如果信息发送成功
chat.setStatus("1");//设置为已读
} else {
// 发送离线消息提醒
sendVO.setToid(mine.getId());
sendVO.setContent(to.getName() + " 对方现在离线,他将在上线后收到你的消息!");
sendVO.setMsg_type("text");
sendVO.setMessage_length("0");
sendVO.setId(to.getId());
distributedMessageService.sendToUser(sender, sendVO);
chat.setStatus("0");
this.sendMessageToUser(sender, sendVO);//同时向本人发送对方不在线消息
chat.setStatus("0");//设置为未读
}
chatHistoryService.saveNew(chat);
} else if ("group".equals(type)) {//如果是群聊
// 临时不经过数据库
List<WebSocketSession> groupLoginSession = groupSession.get(to.getZid());
} else if ("group".equals(type)) {
// 群聊消息 - 使用分布式发送
String groupId = to.getZid();
distributedMessageService.sendToGroup(groupId, sendVO, session.getId());
if (!CollUtil.isEmpty(groupLoginSession)) {
for (WebSocketSession gs : groupLoginSession) {
} else if ("join_group".equals(type)) {
// 加入群组
if (sender.equals(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString())) {//群消息不发给自己但是存一条记录当做聊天记录查询
} else {
//设置接受用户
sendVO.setToid(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString());
boolean isSuccess = this.sendMessageToUser(to.getName(), sendVO);
if (isSuccess) {
} else {
}
}
//chatHistoryService.saveNew(chat);
}
}
/*
List<LayGroupUser> layGroupUserlist = new ArrayList();
//群主
LayGroupUser owner = new LayGroupUser();
LayGroup layGroup = layGroupService.get(receiver);
owner.setUser_id(layGroup.getCreate_by());
layGroupUserlist.add(owner);
//群成员1
List<LayGroupUser> zlist = layGroupService.getUsersByGroup(layGroupService.get(receiver));
layGroupUserlist.addAll(zlist);
for (LayGroupUser lgUser : layGroupUserlist) {
AccountUserBase user_base_row = accountService.getUserBase(Integer.valueOf(lgUser.getUser_id()));
ChatHistory chat = new ChatHistory();
chat.setId(IdUtil.simpleUUID());
//群聊时信息先发送给群聊id即receiver)在后台转发给所有非发送者(sender)的人的话群聊id即receiver)就变成发送者
String groupId = receiver;
//保存聊天记录
chat.setSender(groupId + Constants._msg_ + sender);//群聊时保存群聊id和发送者id
chat.setReceiver(user_base_row.getUser_account());//群中所有信息获得者人员
chat.setMsg(msg);
chat.setType("group");
//message = groupId + Constants._msg_ + user_base_row.getUser_account() + Constants._msg_ + msg + Constants._msg_ + avatar + Constants._msg_ + type + Constants._msg_ + senderName + Constants._msg_ + datatime;
if (sender.equals(user_base_row.getUser_account())) {//群消息不发给自己但是存一条记录当做聊天记录查询
chat.setStatus("1");//发送成功设置为已读
} else {
boolean isSuccess = this.sendMessageToUser(user_base_row.getUser_account(), sendVO);
if (isSuccess) {
chat.setStatus("1");//发送成功设置为已读
} else {
chat.setStatus("0");//用户不在线设置为未读
}
}
chatHistoryService.saveNew(chat);
}
*/
} else if ("join_group".equals(type)) { //临时群组聚焦当前窗口
String zid = to.getZid();
//设置session属性上标出所属群组 目前无同时多个群组需求如果存在此处存入list
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid);
// 添加到本地群组
localSessionManager.addGroupSession(zid, session);
//设置群组中用户
List<WebSocketSession> groupLoginSession = groupSession.get(zid);
// 添加到分布式群组
String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
distributedSessionService.addUserToGroup(zid, userId, session.getId());
if (CollUtil.isEmpty(groupLoginSession)) {
if (null == groupLoginSession) {
groupLoginSession = new ArrayList<>();
}
} else if ("leave_group".equals(type)) {
// 离开群组
groupLoginSession.add(session);
groupSession.put(zid, groupLoginSession);
} else {
if (!groupLoginSession.contains(session)) {
groupLoginSession.add(session);
}
}
//todo通知已存在群组用户消息 可启动独立task
} else if ("leave_group".equals(type)) { //临时群组聚焦当前窗口
String zid = to.getZid();
//设置session属性上标出所属群组
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, null);
// 从本地群组移除
localSessionManager.removeGroupSession(zid, session);
//设置群组中用户
List<WebSocketSession> groupLoginSession = groupSession.get(zid);
// 从分布式群组移除
distributedSessionService.removeUserFromGroup(zid, session.getId());
if (CollUtil.isEmpty(groupLoginSession)) {
} else {
if (groupLoginSession.contains(session)) {
groupLoginSession.remove(session);
}
}
//todo通知已存在群组用户消息 可启动独立task
} else {
}
} else {
sendVO.setMine(true);
@ -185,39 +346,28 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
session.close();
}
logger.debug("websocket connection closed......");
cleanupSession(session);
users.remove(session);
userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session);
if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) {
groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session);
}
this.updateOnlineStatus();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.debug("websocket connection closed......");
cleanupSession(session);
}
users.remove(session);
userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session);
/**
* 清理会话资源
*/
private void cleanupSession(WebSocketSession session) {
String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ;
String sessionId = session.getId();
String groupId = (String) session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY);
// 从本地管理移除
if(localSessionManager.getUserSessions()==null){
localSessionManager.setUserSessions(new ConcurrentHashMap<>());
}
localSessionManager.removeUserSession(loginUserId, session);
if (groupId != null) {
localSessionManager.removeGroupSession(groupId, session);
if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) {
groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session);
}
// 从分布式存储注销
distributedSessionService.unregisterUserSession(loginUserId, sessionId);
if (groupId != null) {
distributedSessionService.removeUserFromGroup(groupId, sessionId);
}
// this.updateOnlineStatus();
this.updateOnlineStatus();
}
@Override
@ -226,112 +376,64 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
}
/**
* 给某个用户发送消息 (兼容原有接口)
* 给所有在线用户发送消息
*
* @param message
*/
public boolean sendMessageToUser(String loginName, SendVO message) {
return distributedMessageService.sendToUser(loginName, message);
}
public void sendMessageToAllUsers(List<String> message) {
/**
* 获取所有在线用户登录名
*/
public List<String> getOnlineLoginNames() {
// 使用分布式服务获取所有在线用户登录名
return distributedSessionService.getOnlineLoginNames();
}
SendVO msg = new SendVO();
msg.setContent(message.toString());
msg.setType("online");
msg.setSendmethod(msg.getType());
/**
* 获取所有在线用户ID
*/
public List<Integer> getOnlineLoginUserId() {
// 使用分布式服务获取所有在线用户ID
return distributedSessionService.getAllOnlineUserIds();
}
/**
* 获取当前服务器的在线用户ID
*/
public List<Integer> getLocalOnlineLoginUserId() {
// 使用本地会话管理器获取当前服务器的在线用户ID
return localSessionManager.getLocalOnlineUserIds();
}
/**
* 获取当前服务器的在线用户登录名
*/
public List<String> getLocalOnlineLoginNames() {
// 使用本地会话管理器获取当前服务器的在线用户登录名
return localSessionManager.getLocalOnlineLoginNames();
}
/**
* 更新在线状态 - 通知所有用户在线列表变化
*/
public void updateOnlineStatus() {
try {
// 创建在线状态消息
SendVO onlineStatusMsg = new SendVO();
onlineStatusMsg.setType("online_status");
onlineStatusMsg.setSendmethod("broadcast");
// 获取在线用户列表
List<String> onlineUsers = getOnlineLoginNames();
List<Integer> onlineUserIds = getOnlineLoginUserId();
Map<String, Object> data = new HashMap<>();
data.put("onlineUsers", onlineUsers);
data.put("onlineUserIds", onlineUserIds);
data.put("timestamp", System.currentTimeMillis());
onlineStatusMsg.setMsg(data);
// 广播在线状态更新只广播到当前服务器避免循环广播
broadcastLocalOnlineStatus(onlineStatusMsg);
logger.info("在线状态更新: {} 个用户在线", onlineUsers.size());
} catch (Exception e) {
logger.error("更新在线状态失败: " + e.getMessage(), e);
}
}
/**
* 广播在线状态到本地用户
*/
private void broadcastLocalOnlineStatus(SendVO message) {
logger.info("localSessionManager: {}", localSessionManager);
logger.info("localSessionManager.userSessions: {}", localSessionManager.getUserSessions());
logger.info("localSessionManager.userSessions.values: {}", localSessionManager.getUserSessions().values());
if (localSessionManager == null || localSessionManager.getUserSessions().isEmpty()) {
logger.error("localSessionManager is null, cannot broadcast online status");
return;
}
// 获取本地所有用户会话
for (List<WebSocketSession> sessions : localSessionManager.getUserSessions().values()) {
for (WebSocketSession session : sessions) {
if (session!=null&&session.isOpen()) {
try {
session.sendMessage(new TextMessage(message.toString()));
} catch (IOException e) {
logger.error("发送在线状态消息失败: " + e.getMessage(), e);
}
//不能这么操作
for (WebSocketSession user : users) {
try {
if (user.isOpen()) {
user.sendMessage(new TextMessage(msg.toString()));
}
} catch (IOException e) {
logger.error("给所有在线用户发送信息失败!" + e.getMessage(), e);
}
}
}
/**
* 给所有在线用户发送消息
* 给某个用户发送消息
*
* @param loginName
* @param message
*/
public void sendMessageToAllUsers(SendVO message) {
// 获取所有在线用户ID
List<Integer> onlineUserIds = getOnlineLoginUserId();
public boolean sendMessageToUser(String loginName, SendVO message) {
boolean result = false;
for (Integer userId : onlineUserIds) {
String userIdStr = String.valueOf(userId);
distributedMessageService.sendToUser(userIdStr, message);
//不能这样操作
//for (WebSocketSession user : users) {
List<WebSocketSession> lst = userSession.get(message.getToid());
if (CollUtil.isNotEmpty(lst)) {
for (WebSocketSession user : lst) {
//if (user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).equals(loginName)) {//允许用户多个浏览器登录每个浏览器页面都会收到用户信息
try {
if (user.isOpen()) {
user.sendMessage(new TextMessage(message.toString()));
result = true;
}
} catch (IOException e) {
logger.error("给用户发送信息失败!" + e.getMessage(), e);
}
//break;//注释掉此处意味着遍历该用户打开的所有页面并发送信息否则只会向用户登录的第一个网页发送信息
//}
}
}
logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size());
return result;
}
public void updateOnlineStatus() {
//this.sendMessageToAllUsers(this.getOnlineLoginNames());//通知所有用户更新在线信息
}
}