im分布式测试方案
This commit is contained in:
parent
928b16da60
commit
8c9661c77c
@ -1,13 +1,17 @@
|
||||
package com.suisung.mall.im.common.websocket.service;
|
||||
|
||||
import com.suisung.mall.im.pojo.vo.SendVO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.IM_FANOUT_EXCHANGE;
|
||||
|
||||
@ -15,6 +19,7 @@ import static com.suisung.mall.im.common.websocket.config.RabbitMQConfig.IM_FANO
|
||||
@Service
|
||||
public class DistributedMessageService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DistributedMessageService.class);
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@ -30,7 +35,8 @@ public class DistributedMessageService {
|
||||
/**
|
||||
* 发送消息给用户
|
||||
*/
|
||||
public boolean sendToUser(String targetUserId, SendVO message) {
|
||||
public boolean sendToUser(String targetUserId, SendVO message, ConcurrentHashMap<String, List<WebSocketSession>> userSessions) {
|
||||
log.info("targetUserId{},message:{}", targetUserId, message);
|
||||
String targetServer = sessionService.getUserServer(targetUserId);
|
||||
|
||||
if (targetServer == null) {
|
||||
@ -40,7 +46,7 @@ public class DistributedMessageService {
|
||||
|
||||
if (isCurrentServer(targetServer)) {
|
||||
// 用户在当前服务器,直接发送
|
||||
return localSessionManager.sendToUser(targetUserId, message);
|
||||
return localSessionManager.sendToUser(targetUserId, message,userSessions);
|
||||
} else {
|
||||
// 用户在其他服务器,通过RabbitMQ转发
|
||||
forwardToUser(targetServer, targetUserId, message);
|
||||
|
||||
@ -42,9 +42,10 @@ public class LocalSessionManager {
|
||||
if (existingSessions == null) {
|
||||
existingSessions = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
if (!existingSessions.contains(session)) {
|
||||
// if (!existingSessions.contains(session)) {
|
||||
// existingSessions.add(session);
|
||||
// }
|
||||
existingSessions.add(session);
|
||||
}
|
||||
return existingSessions;
|
||||
});
|
||||
|
||||
@ -68,12 +69,15 @@ public class LocalSessionManager {
|
||||
/**
|
||||
* 发送消息给用户
|
||||
*/
|
||||
public boolean sendToUser(String userId, SendVO message) {
|
||||
public boolean sendToUser(String userId, SendVO message,ConcurrentHashMap<String, List<WebSocketSession>> userSession) {
|
||||
if (userId == null || message == null) {
|
||||
log.info("发送消息失败: 参数为空");
|
||||
return false;
|
||||
}
|
||||
|
||||
if(null!=userSession){
|
||||
this.userSessions=userSession;
|
||||
}
|
||||
log.info("userSessions={}", userSession);
|
||||
List<WebSocketSession> sessions = userSessions.get(userId);
|
||||
if (sessions == null || sessions.isEmpty()) {
|
||||
log.info("用户没有活跃会话: {}", userId);
|
||||
|
||||
@ -65,7 +65,7 @@ public class MessageConsumerService {
|
||||
}
|
||||
|
||||
if (userMsg != null) {
|
||||
boolean success = localSessionManager.sendToUser(userId, userMsg);
|
||||
boolean success = localSessionManager.sendToUser(userId, userMsg,null);
|
||||
logger.info("发送消息给用户 {}: {}", userId, success ? "成功" : "失败");
|
||||
} else {
|
||||
logger.info("消息格式错误,无法发送给用户: {}", userId);
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
package com.suisung.mall.im.common.websocket.service.onchat;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.suisung.mall.common.feignService.AccountService;
|
||||
import com.suisung.mall.common.utils.CheckUtil;
|
||||
import com.suisung.mall.im.common.websocket.service.DistributedMessageService;
|
||||
import com.suisung.mall.im.common.websocket.service.DistributedSessionService;
|
||||
import com.suisung.mall.im.common.websocket.service.LocalSessionManager;
|
||||
import com.suisung.mall.im.common.websocket.utils.Constants;
|
||||
import com.suisung.mall.im.pojo.entity.ChatHistory;
|
||||
import com.suisung.mall.im.pojo.vo.MineDTO;
|
||||
@ -13,164 +13,78 @@ import com.suisung.mall.im.pojo.vo.ReceiveDTO;
|
||||
import com.suisung.mall.im.pojo.vo.SendVO;
|
||||
import com.suisung.mall.im.pojo.vo.ToDTO;
|
||||
import com.suisung.mall.im.service.ChatHistoryService;
|
||||
import com.suisung.mall.im.service.LayGroupService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import org.springframework.web.socket.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class MallsuiteImSocketHandler implements WebSocketHandler {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class);
|
||||
|
||||
private static ArrayList<WebSocketSession> users;
|
||||
private static ArrayList<String> usersStr;
|
||||
// 移除原有的静态集合,使用注入的服务
|
||||
// private static ArrayList<WebSocketSession> users;
|
||||
// private static ArrayList<String> usersStr;
|
||||
// private static Map<String, List<WebSocketSession>> userSession;
|
||||
// private static Map<String, List<WebSocketSession>> groupSession;
|
||||
|
||||
//存入用户的所有终端的连接信息
|
||||
private static Map<String, List<WebSocketSession>> userSession;
|
||||
@Autowired
|
||||
private DistributedSessionService distributedSessionService;
|
||||
|
||||
//群组信息
|
||||
private static Map<String, List<WebSocketSession>> groupSession;
|
||||
@Autowired
|
||||
private DistributedMessageService distributedMessageService;
|
||||
|
||||
static {
|
||||
users = new ArrayList<>();
|
||||
usersStr = new ArrayList<>();
|
||||
userSession = Collections.synchronizedMap(new HashMap<>());
|
||||
groupSession = Collections.synchronizedMap(new HashMap<>());
|
||||
logger = LoggerFactory.getLogger(MallsuiteImSocketHandler.class);
|
||||
}
|
||||
|
||||
public ArrayList<String> getOnlineLoginNames() {
|
||||
ArrayList<String> onlineLoginNames = new ArrayList<String>();
|
||||
for (WebSocketSession user : users) {
|
||||
String userName = (String) user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
|
||||
if (userName != null) {
|
||||
onlineLoginNames.add(userName);
|
||||
}
|
||||
}
|
||||
return onlineLoginNames;
|
||||
|
||||
}
|
||||
|
||||
public ArrayList<Integer> getOnlineLoginUserId() {
|
||||
ArrayList<Integer> onlineLoginUserId = new ArrayList<Integer>();
|
||||
for (WebSocketSession user : users) {
|
||||
Integer user_id = Convert.toInt(user.getAttributes().get("user_id"));
|
||||
if (CheckUtil.isNotEmpty(user_id)) {
|
||||
onlineLoginUserId.add(user_id);
|
||||
}
|
||||
}
|
||||
return onlineLoginUserId;
|
||||
|
||||
}
|
||||
@Autowired
|
||||
private LocalSessionManager localSessionManager;
|
||||
|
||||
@Autowired
|
||||
private ChatHistoryService chatHistoryService;
|
||||
@Autowired
|
||||
private LayGroupService layGroupService;
|
||||
@Autowired
|
||||
private AccountService accountService;
|
||||
|
||||
|
||||
//用户上线后触发
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
logger.debug("connect to the websocket success......");
|
||||
|
||||
String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ;
|
||||
String sessionId = session.getId();
|
||||
users.add(session);
|
||||
this.updateOnlineStatus();
|
||||
String loginUserId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);//获取刚上线用户的loginName
|
||||
|
||||
List<WebSocketSession> loginSessions = userSession.get(loginUserId);
|
||||
if (CollUtil.isEmpty(loginSessions)) {
|
||||
if (null == loginSessions) {
|
||||
loginSessions = new ArrayList<>();
|
||||
}
|
||||
// 存储到本地会话管理
|
||||
ConcurrentHashMap<String, List<WebSocketSession>> userSessions=new ConcurrentHashMap<>();
|
||||
localSessionManager.setUserSessions(userSessions);
|
||||
localSessionManager.addUserSession(loginUserId, session);
|
||||
|
||||
loginSessions.add(session);
|
||||
userSession.put(loginUserId, loginSessions);
|
||||
} else {
|
||||
if (!loginSessions.contains(session)) {
|
||||
loginSessions.add(session);
|
||||
}
|
||||
}
|
||||
logger.info("添加会话到本地成功:{}", localSessionManager.getUserSessions().values());
|
||||
|
||||
if (loginUserId != null) {
|
||||
SendVO msg = new SendVO();
|
||||
//读取离线信息
|
||||
ChatHistory chat = new ChatHistory();
|
||||
chat.setReceiver(loginUserId);//发给刚上线的用户信息
|
||||
chat.setStatus("0");
|
||||
List<ChatHistory> list = chatHistoryService.findList(chat);
|
||||
/*
|
||||
for(ChatHistory c : list){
|
||||
String sender="";
|
||||
String receiver="";
|
||||
// 注册到分布式会话服务
|
||||
Map<String, Object> attributes = new HashMap<>();
|
||||
attributes.put("user_id", String.valueOf(session.getAttributes().get("user_id")));
|
||||
attributes.put("loginName", loginUserId);
|
||||
distributedSessionService.registerUserSession(loginUserId, sessionId, attributes);
|
||||
|
||||
if("friend".equals(c.getType()) || "user".equals(c.getType())){//如果是个人信息
|
||||
sender = c.getSender(); //todo 确认数据库存的数据uid user_id 不一致
|
||||
msg.setId(sender);
|
||||
msg.setToid(c.getReceiver());
|
||||
msg.setContent(c.getMsg());
|
||||
//todu 缺少数据
|
||||
//msg = sender+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_;
|
||||
}else if(c.getType().contains("_group")){//如果是直播群组信息
|
||||
msg.setType("group");
|
||||
// 处理离线消息等原有逻辑...
|
||||
// this.updateOnlineStatus();
|
||||
|
||||
sender = c.getSender();
|
||||
String groupId = c.getReceiver().split("_")[0];//直播群组id
|
||||
receiver=c.getReceiver().split("_")[1];//直播群组所属user_id
|
||||
msg = sender+Constants._msg_+groupId+Constants._msg_+c.getMsg()+Constants._msg_;
|
||||
|
||||
}else{//如果是群组信息
|
||||
msg.setType("group");
|
||||
|
||||
String groupId = c.getSender().split(Constants._msg_)[0];//群组id
|
||||
sender=c.getSender().split(Constants._msg_)[1];//发送者loginName
|
||||
msg = groupId+Constants._msg_+c.getReceiver()+Constants._msg_+c.getMsg()+Constants._msg_;
|
||||
|
||||
}
|
||||
|
||||
//todo 应该冗余
|
||||
//获取用户基本信息
|
||||
AccountUserBase accountUserBase = accountService.getUserBase(Integer.valueOf(sender));
|
||||
//AccountUserBase accountUserBase= result.getFenResult(AccountUserBase.class);
|
||||
//AccountUserInfo userInfo=accountService.getUserInfo(accountUserBase.getUser_id());
|
||||
AccountUserInfo userInfo=accountService.getUserInfo(Integer.valueOf(sender));
|
||||
|
||||
|
||||
msg.setUser_id(Convert.toInt(session.getAttributes().get("user_id")));
|
||||
msg.setUsername(accountUserBase.getUser_nickname());
|
||||
msg.setAvatar(userInfo.getUser_avatar());
|
||||
//msg.setMessage_id(c.getId());
|
||||
msg.setFromid(msg.getId());
|
||||
msg.setTime(c.getCreate_date().getTime());
|
||||
msg.setStatus(0); //?
|
||||
msg.setMsg(new HashMap());
|
||||
|
||||
boolean isSuccess = this.sendMessageToUser(loginUserId, msg);
|
||||
if(isSuccess) {
|
||||
c.setStatus("1");//标记为已读
|
||||
chatHistoryService.saveNew(c);
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
//接收js侧发送来的用户信息
|
||||
@Override
|
||||
public void handleMessage(WebSocketSession session, WebSocketMessage<?> socketMessage) throws Exception {
|
||||
ReceiveDTO receiveDTO = JSONUtil.toBean(socketMessage.getPayload().toString(), ReceiveDTO.class);
|
||||
if (receiveDTO != null) {//发送消息
|
||||
logger.info("接收到信息{}", receiveDTO);
|
||||
if (receiveDTO != null) {
|
||||
SendVO sendVO = new SendVO();
|
||||
MineDTO mine = receiveDTO.getMine();
|
||||
ToDTO to = receiveDTO.getTo();
|
||||
|
||||
// 设置sendVO的代码保持不变...
|
||||
sendVO.setToid(to.getId());
|
||||
sendVO.setZid(to.getZid()); //群组特有
|
||||
sendVO.setZid(to.getZid());
|
||||
sendVO.setId(mine.getId());
|
||||
Integer toUserId = Convert.toInt(session.getAttributes().get("user_id"));
|
||||
sendVO.setUser_id(toUserId);
|
||||
@ -182,32 +96,33 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
|
||||
to.setType("friend");
|
||||
}
|
||||
|
||||
sendVO.setType(to.getType()); //是从to中读取Type
|
||||
sendVO.setType(to.getType());
|
||||
sendVO.setSendmethod(sendVO.getType());
|
||||
|
||||
sendVO.setContent(mine.getContent());
|
||||
sendVO.setMessage_id(mine.getMessage_id());
|
||||
|
||||
sendVO.setFromid(mine.getId());
|
||||
sendVO.setTime(new Date().getTime());
|
||||
sendVO.setStatus(0); //?
|
||||
sendVO.setStatus(0);
|
||||
sendVO.setMsg_type(mine.getType());
|
||||
sendVO.setItem_id(mine.getItem_id());
|
||||
sendVO.setMsg(new HashMap());
|
||||
sendVO.setMessage_length(mine.getLength());
|
||||
|
||||
String sender = mine.getId();//信息发送者登录名(loginName)或user_id
|
||||
String receiver = to.getId();//信息接收者,如果是私聊就是用户loginName,如果是群聊就是群组id
|
||||
String sender = mine.getId();
|
||||
String receiver = to.getId();
|
||||
String msg = mine.getContent();
|
||||
String avatar = mine.getAvatar();
|
||||
String type = to.getType();
|
||||
String senderName = mine.getUsername();//发送者姓名(name)
|
||||
String senderName = mine.getUsername();
|
||||
|
||||
// 更新心跳
|
||||
distributedSessionService.updateHeartbeat(sender, session.getId());
|
||||
|
||||
if (!mine.getId().equals(to.getId())) {
|
||||
sendVO.setMine(false);
|
||||
//保存聊天记录
|
||||
|
||||
if ("friend".equals(type) || "user".equals(type)) {
|
||||
//如果是私聊
|
||||
// 私聊消息 - 使用分布式发送
|
||||
ChatHistory chat = new ChatHistory();
|
||||
chat.setId(IdUtil.simpleUUID());
|
||||
chat.setSender(sender);
|
||||
@ -215,124 +130,48 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
|
||||
chat.setMsg(msg);
|
||||
chat.setType("friend");
|
||||
chat.setCreate_date(new Date());
|
||||
boolean isSuccess = this.sendMessageToUser(receiver, sendVO);
|
||||
if (isSuccess) {//如果信息发送成功
|
||||
chat.setStatus("1");//设置为已读
|
||||
boolean isSuccess = distributedMessageService.sendToUser(receiver, sendVO,localSessionManager.getUserSessions());
|
||||
if (isSuccess) {
|
||||
chat.setStatus("1");
|
||||
} else {
|
||||
// 发送离线消息提醒
|
||||
sendVO.setToid(mine.getId());
|
||||
sendVO.setContent(to.getName() + " 对方现在离线,他将在上线后收到你的消息!");
|
||||
sendVO.setMsg_type("text");
|
||||
sendVO.setMessage_length("0");
|
||||
sendVO.setId(to.getId());
|
||||
this.sendMessageToUser(sender, sendVO);//同时向本人发送对方不在线消息
|
||||
chat.setStatus("0");//设置为未读
|
||||
distributedMessageService.sendToUser(sender, sendVO,localSessionManager.getUserSessions());
|
||||
chat.setStatus("0");
|
||||
}
|
||||
chatHistoryService.saveNew(chat);
|
||||
} else if ("group".equals(type)) {//如果是群聊
|
||||
// 临时,不经过数据库
|
||||
List<WebSocketSession> groupLoginSession = groupSession.get(to.getZid());
|
||||
|
||||
if (!CollUtil.isEmpty(groupLoginSession)) {
|
||||
for (WebSocketSession gs : groupLoginSession) {
|
||||
} else if ("group".equals(type)) {
|
||||
// 群聊消息 - 使用分布式发送
|
||||
String groupId = to.getZid();
|
||||
distributedMessageService.sendToGroup(groupId, sendVO, session.getId());
|
||||
|
||||
if (sender.equals(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString())) {//群消息不发给自己,但是存一条记录当做聊天记录查询
|
||||
|
||||
} else {
|
||||
//设置接受用户
|
||||
sendVO.setToid(gs.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).toString());
|
||||
boolean isSuccess = this.sendMessageToUser(to.getName(), sendVO);
|
||||
if (isSuccess) {
|
||||
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
//chatHistoryService.saveNew(chat);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
List<LayGroupUser> layGroupUserlist = new ArrayList();
|
||||
|
||||
//群主
|
||||
LayGroupUser owner = new LayGroupUser();
|
||||
LayGroup layGroup = layGroupService.get(receiver);
|
||||
owner.setUser_id(layGroup.getCreate_by());
|
||||
layGroupUserlist.add(owner);
|
||||
|
||||
//群成员1
|
||||
List<LayGroupUser> zlist = layGroupService.getUsersByGroup(layGroupService.get(receiver));
|
||||
layGroupUserlist.addAll(zlist);
|
||||
|
||||
for (LayGroupUser lgUser : layGroupUserlist) {
|
||||
AccountUserBase user_base_row = accountService.getUserBase(Integer.valueOf(lgUser.getUser_id()));
|
||||
ChatHistory chat = new ChatHistory();
|
||||
chat.setId(IdUtil.simpleUUID());
|
||||
//群聊时信息先发送给群聊id(即receiver),在后台转发给所有非发送者(sender)的人的话,群聊id(即receiver)就变成发送者。
|
||||
String groupId = receiver;
|
||||
//保存聊天记录
|
||||
chat.setSender(groupId + Constants._msg_ + sender);//群聊时保存群聊id和发送者id
|
||||
chat.setReceiver(user_base_row.getUser_account());//群中所有信息获得者人员
|
||||
chat.setMsg(msg);
|
||||
chat.setType("group");
|
||||
//message = groupId + Constants._msg_ + user_base_row.getUser_account() + Constants._msg_ + msg + Constants._msg_ + avatar + Constants._msg_ + type + Constants._msg_ + senderName + Constants._msg_ + datatime;
|
||||
if (sender.equals(user_base_row.getUser_account())) {//群消息不发给自己,但是存一条记录当做聊天记录查询
|
||||
chat.setStatus("1");//发送成功,设置为已读
|
||||
} else {
|
||||
boolean isSuccess = this.sendMessageToUser(user_base_row.getUser_account(), sendVO);
|
||||
if (isSuccess) {
|
||||
chat.setStatus("1");//发送成功,设置为已读
|
||||
} else {
|
||||
chat.setStatus("0");//用户不在线,设置为未读
|
||||
}
|
||||
}
|
||||
|
||||
chatHistoryService.saveNew(chat);
|
||||
}
|
||||
*/
|
||||
|
||||
} else if ("join_group".equals(type)) { //临时群组,聚焦当前窗口
|
||||
} else if ("join_group".equals(type)) {
|
||||
// 加入群组
|
||||
String zid = to.getZid();
|
||||
|
||||
//设置session,属性上标出所属群组: 目前无同时多个群组需求,如果存在,此处存入list
|
||||
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, zid);
|
||||
|
||||
//设置群组中用户
|
||||
List<WebSocketSession> groupLoginSession = groupSession.get(zid);
|
||||
// 添加到本地群组
|
||||
localSessionManager.addGroupSession(zid, session);
|
||||
|
||||
if (CollUtil.isEmpty(groupLoginSession)) {
|
||||
if (null == groupLoginSession) {
|
||||
groupLoginSession = new ArrayList<>();
|
||||
}
|
||||
// 添加到分布式群组
|
||||
String userId = (String) session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME);
|
||||
distributedSessionService.addUserToGroup(zid, userId, session.getId());
|
||||
|
||||
groupLoginSession.add(session);
|
||||
groupSession.put(zid, groupLoginSession);
|
||||
} else {
|
||||
if (!groupLoginSession.contains(session)) {
|
||||
groupLoginSession.add(session);
|
||||
}
|
||||
}
|
||||
|
||||
//todo通知已存在群组用户消息 可启动独立task
|
||||
} else if ("leave_group".equals(type)) { //临时群组,聚焦当前窗口
|
||||
} else if ("leave_group".equals(type)) {
|
||||
// 离开群组
|
||||
String zid = to.getZid();
|
||||
|
||||
//设置session,属性上标出所属群组
|
||||
session.getAttributes().put(Constants.WEBSOCKET_GROUP_KEY, null);
|
||||
|
||||
//设置群组中用户
|
||||
List<WebSocketSession> groupLoginSession = groupSession.get(zid);
|
||||
// 从本地群组移除
|
||||
localSessionManager.removeGroupSession(zid, session);
|
||||
|
||||
if (CollUtil.isEmpty(groupLoginSession)) {
|
||||
} else {
|
||||
if (groupLoginSession.contains(session)) {
|
||||
groupLoginSession.remove(session);
|
||||
}
|
||||
}
|
||||
|
||||
//todo通知已存在群组用户消息 可启动独立task
|
||||
} else {
|
||||
// 从分布式群组移除
|
||||
distributedSessionService.removeUserFromGroup(zid, session.getId());
|
||||
}
|
||||
} else {
|
||||
sendVO.setMine(true);
|
||||
@ -346,28 +185,39 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
|
||||
session.close();
|
||||
}
|
||||
logger.debug("websocket connection closed......");
|
||||
users.remove(session);
|
||||
userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session);
|
||||
|
||||
if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) {
|
||||
groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session);
|
||||
}
|
||||
|
||||
this.updateOnlineStatus();
|
||||
|
||||
cleanupSession(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
|
||||
logger.debug("websocket connection closed......");
|
||||
users.remove(session);
|
||||
userSession.get(session.getAttributes().get(Constants.WEBSOCKET_LOGINNAME)).remove(session);
|
||||
|
||||
if (CollUtil.isNotEmpty(groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)))) {
|
||||
groupSession.get(session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY)).remove(session);
|
||||
cleanupSession(session);
|
||||
}
|
||||
|
||||
this.updateOnlineStatus();
|
||||
/**
|
||||
* 清理会话资源
|
||||
*/
|
||||
private void cleanupSession(WebSocketSession session) {
|
||||
String loginUserId = String.valueOf(session.getAttributes().get("user_id")) ;
|
||||
String sessionId = session.getId();
|
||||
String groupId = (String) session.getAttributes().get(Constants.WEBSOCKET_GROUP_KEY);
|
||||
|
||||
// 从本地管理移除
|
||||
if(localSessionManager.getUserSessions()==null){
|
||||
localSessionManager.setUserSessions(new ConcurrentHashMap<>());
|
||||
}
|
||||
localSessionManager.removeUserSession(loginUserId, session);
|
||||
if (groupId != null) {
|
||||
localSessionManager.removeGroupSession(groupId, session);
|
||||
}
|
||||
|
||||
// 从分布式存储注销
|
||||
distributedSessionService.unregisterUserSession(loginUserId, sessionId);
|
||||
if (groupId != null) {
|
||||
distributedSessionService.removeUserFromGroup(groupId, sessionId);
|
||||
}
|
||||
|
||||
// this.updateOnlineStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -376,64 +226,112 @@ public class MallsuiteImSocketHandler implements WebSocketHandler {
|
||||
}
|
||||
|
||||
/**
|
||||
* 给所有在线用户发送消息
|
||||
*
|
||||
* @param message
|
||||
* 给某个用户发送消息 (兼容原有接口)
|
||||
*/
|
||||
public void sendMessageToAllUsers(List<String> message) {
|
||||
|
||||
SendVO msg = new SendVO();
|
||||
msg.setContent(message.toString());
|
||||
msg.setType("online");
|
||||
msg.setSendmethod(msg.getType());
|
||||
|
||||
//不能这么操作
|
||||
for (WebSocketSession user : users) {
|
||||
try {
|
||||
if (user.isOpen()) {
|
||||
user.sendMessage(new TextMessage(msg.toString()));
|
||||
public boolean sendMessageToUser(String loginName, SendVO message) {
|
||||
return distributedMessageService.sendToUser(loginName, message,localSessionManager.getUserSessions());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有在线用户登录名
|
||||
*/
|
||||
public List<String> getOnlineLoginNames() {
|
||||
// 使用分布式服务获取所有在线用户登录名
|
||||
return distributedSessionService.getOnlineLoginNames();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有在线用户ID
|
||||
*/
|
||||
public List<Integer> getOnlineLoginUserId() {
|
||||
// 使用分布式服务获取所有在线用户ID
|
||||
return distributedSessionService.getAllOnlineUserIds();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前服务器的在线用户ID
|
||||
*/
|
||||
public List<Integer> getLocalOnlineLoginUserId() {
|
||||
// 使用本地会话管理器获取当前服务器的在线用户ID
|
||||
return localSessionManager.getLocalOnlineUserIds();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前服务器的在线用户登录名
|
||||
*/
|
||||
public List<String> getLocalOnlineLoginNames() {
|
||||
// 使用本地会话管理器获取当前服务器的在线用户登录名
|
||||
return localSessionManager.getLocalOnlineLoginNames();
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新在线状态 - 通知所有用户在线列表变化
|
||||
*/
|
||||
public void updateOnlineStatus() {
|
||||
try {
|
||||
// 创建在线状态消息
|
||||
SendVO onlineStatusMsg = new SendVO();
|
||||
onlineStatusMsg.setType("online_status");
|
||||
onlineStatusMsg.setSendmethod("broadcast");
|
||||
|
||||
// 获取在线用户列表
|
||||
List<String> onlineUsers = getOnlineLoginNames();
|
||||
List<Integer> onlineUserIds = getOnlineLoginUserId();
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("onlineUsers", onlineUsers);
|
||||
data.put("onlineUserIds", onlineUserIds);
|
||||
data.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
onlineStatusMsg.setMsg(data);
|
||||
|
||||
// 广播在线状态更新(只广播到当前服务器,避免循环广播)
|
||||
broadcastLocalOnlineStatus(onlineStatusMsg);
|
||||
|
||||
logger.info("在线状态更新: {} 个用户在线", onlineUsers.size());
|
||||
} catch (Exception e) {
|
||||
logger.error("更新在线状态失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播在线状态到本地用户
|
||||
*/
|
||||
private void broadcastLocalOnlineStatus(SendVO message) {
|
||||
logger.info("localSessionManager: {}", localSessionManager);
|
||||
logger.info("localSessionManager.userSessions: {}", localSessionManager.getUserSessions());
|
||||
logger.info("localSessionManager.userSessions.values: {}", localSessionManager.getUserSessions().values());
|
||||
if (localSessionManager == null || localSessionManager.getUserSessions().isEmpty()) {
|
||||
logger.error("localSessionManager is null, cannot broadcast online status");
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取本地所有用户会话
|
||||
for (List<WebSocketSession> sessions : localSessionManager.getUserSessions().values()) {
|
||||
for (WebSocketSession session : sessions) {
|
||||
if (session!=null&&session.isOpen()) {
|
||||
try {
|
||||
session.sendMessage(new TextMessage(message.toString()));
|
||||
} catch (IOException e) {
|
||||
logger.error("给所有在线用户发送信息失败!" + e.getMessage(), e);
|
||||
logger.error("发送在线状态消息失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 给某个用户发送消息
|
||||
*
|
||||
* @param loginName
|
||||
* @param message
|
||||
* 给所有在线用户发送消息
|
||||
*/
|
||||
public boolean sendMessageToUser(String loginName, SendVO message) {
|
||||
boolean result = false;
|
||||
public void sendMessageToAllUsers(SendVO message) {
|
||||
// 获取所有在线用户ID
|
||||
List<Integer> onlineUserIds = getOnlineLoginUserId();
|
||||
|
||||
//不能这样操作。
|
||||
//for (WebSocketSession user : users) {
|
||||
List<WebSocketSession> lst = userSession.get(message.getToid());
|
||||
|
||||
if (CollUtil.isNotEmpty(lst)) {
|
||||
for (WebSocketSession user : lst) {
|
||||
//if (user.getAttributes().get(Constants.WEBSOCKET_LOGINNAME).equals(loginName)) {//允许用户多个浏览器登录,每个浏览器页面都会收到用户信息
|
||||
try {
|
||||
if (user.isOpen()) {
|
||||
user.sendMessage(new TextMessage(message.toString()));
|
||||
result = true;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("给用户发送信息失败!" + e.getMessage(), e);
|
||||
for (Integer userId : onlineUserIds) {
|
||||
String userIdStr = String.valueOf(userId);
|
||||
distributedMessageService.sendToUser(userIdStr, message,localSessionManager.getUserSessions());
|
||||
}
|
||||
|
||||
//break;//注释掉此处意味着遍历该用户打开的所有页面并发送信息,否则只会向用户登录的第一个网页发送信息。
|
||||
//}
|
||||
logger.debug("向 {} 个在线用户发送消息", onlineUserIds.size());
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public void updateOnlineStatus() {
|
||||
//this.sendMessageToAllUsers(this.getOnlineLoginNames());//通知所有用户更新在线信息
|
||||
}
|
||||
|
||||
}
|
||||
@ -45,8 +45,8 @@ public class ChatSocketInfoController {
|
||||
@RequestMapping(value = "/getUserOnline", method = RequestMethod.POST)
|
||||
public List<Integer> getUserOnline(@RequestBody List<Integer> user_ids) {
|
||||
logger.info(I18nUtil._("接收的用户ids:") + CollUtil.join(user_ids, ","));
|
||||
//List<Integer> onlineLoginUserIds = distributedSessionService.getOnlineUserIds();
|
||||
List<Integer> onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId();
|
||||
List<Integer> onlineLoginUserIds = distributedSessionService.getOnlineUserIds();
|
||||
// List<Integer> onlineLoginUserIds = new MallsuiteImSocketHandler().getOnlineLoginUserId();
|
||||
Iterator<Integer> online_user_ids_iter = onlineLoginUserIds.iterator();
|
||||
// 处理移除非本店铺客服
|
||||
while (online_user_ids_iter.hasNext()) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user