fix rabbitmq bug

This commit is contained in:
Jack 2025-07-26 15:15:10 +08:00
parent 5445d200c4
commit 3a300fa979
3 changed files with 197 additions and 31 deletions

View File

@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.core.io.ClassPathResource;
import java.io.IOException;
import java.io.InputStream;
@ -16,14 +17,26 @@ import java.util.List;
import java.util.Objects;
/**
* IP地址工具类
* 用于解析IP地址对应的地理位置信息
*
* @auth: Xinze
*/
@Component
public class IpUtil implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(IpUtil.class);
/**
* ip2region搜索器实例
*/
private static Searcher searcher = null;
/**
* 根据IP地址获取地区信息
*
* @param ip IP地址
* @return 地区信息字符串格式为国家|区域|省份|城市|ISP
*/
public static String getRegion(String ip) {
if (Objects.isNull(searcher)) {
log.error("IP2RegionUtils 没有成功加载数据文件");
@ -37,27 +50,54 @@ public class IpUtil implements ApplicationRunner {
try {
return searcher.search(ip);
} catch (Exception e) {
log.error("IP{} 格式错误:{}", ip, e.getMessage());
String errorMsg = e.getMessage();
if (errorMsg == null) {
errorMsg = e.getClass().getName() + ": " + e.getCause();
if (e.getCause() != null) {
errorMsg += " -> " + e.getCause().getMessage();
}
}
log.error("IP{} 格式错误:{}", ip, errorMsg, e);
return ip;
}
}
/**
* 根据IP地址获取详细地址信息
*
* @param ip IP地址
* @return 地址信息字符串
*/
public static String getAddr(String ip) {
return getRegion(ip);
}
/**
* 根据IP地址获取地区对象
*
* @param ip IP地址
* @return 地区对象包含国家省份城市信息
*/
public static DistrictVo getDistrict(String ip) {
DistrictVo districtVo = new DistrictVo();
String ss = getAddr(ip);
if (CheckUtil.isNotEmpty(ss)) {
List<String> split = Arrays.asList(ss.split("\\|"));
try {
List<String> split = Arrays.asList(ss.split("\\|"));
if (split.size() >= 4) {
districtVo.setCountry(split.get(0));
districtVo.setProvince(split.get(2));
districtVo.setCity(split.get(3));
} else {
if (split.size() >= 4) {
districtVo.setCountry(split.get(0)); // 设置国家
districtVo.setProvince(split.get(2)); // 设置省份
districtVo.setCity(split.get(3)); // 设置城市
} else {
return null;
}
} catch (ArrayIndexOutOfBoundsException e) {
log.error("解析IP地址 {} 的地区信息时发生数组越界异常,原始数据: {}", ip, ss, e);
return null;
} catch (Exception e) {
log.error("解析IP地址 {} 的地区信息时发生异常: {}", ip, e.getMessage(), e);
return null;
}
}
@ -65,21 +105,45 @@ public class IpUtil implements ApplicationRunner {
return districtVo;
}
/**
* 应用启动时加载ip2region数据文件
*
* @param args 应用启动参数
* @throws Exception 加载过程中可能抛出的异常
*/
@Override
public void run(ApplicationArguments args) throws Exception {
try {
log.info("开始加载 ip2region 数据文件");
//Resource resource = new ClassPathResource("static/ip/ip2region.xdb");
//InputStream inputStream = resource.getInputStream()
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("static/ip/ip2region.xdb");
byte[] bytes = new byte[inputStream.available()];
inputStream.read(bytes);
ClassPathResource resource = new ClassPathResource("static/ip/ip2region.xdb");
InputStream inputStream = resource.getInputStream();
// 使用 ByteArrayOutputStream 替代 available() 方法来正确读取文件内容
java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream();
int nRead;
byte[] data = new byte[1024];
while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
buffer.flush();
byte[] bytes = buffer.toByteArray();
inputStream.close();
searcher = Searcher.newWithBuffer(bytes);
log.info("成功加载 ip2region 数据文件。");
// 测试本地IP地址解析
try {
String testResult = searcher.search("127.0.0.1");
log.info("测试本地IP解析结果: {}", testResult);
} catch (Exception e) {
log.warn("本地IP测试解析失败: {}", e.getMessage());
}
} catch (IOException e) {
log.error("加载 ip2region 失败。{}", e.getMessage());
log.error("加载 ip2region 失败。", e);
} catch (Exception e) {
log.error("初始化 ip2region 搜索器失败。", e);
}
}
}

View File

@ -1,6 +1,8 @@
package com.suisung.mall.shop.message.service.impl;
import cn.hutool.core.util.IdUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.suisung.mall.common.api.ResultCode;
import com.suisung.mall.common.constant.MqConstant;
import com.suisung.mall.common.exception.ApiException;
@ -11,6 +13,7 @@ import com.suisung.mall.shop.message.service.MqMessageService;
import com.suisung.mall.shop.message.vo.MqMessageVo;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@ -67,7 +70,13 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
rabbitTemplate.convertAndSend(exchange, routing_key, message, new CorrelationData(uuid));
*/
rabbitTemplate.convertAndSend(exchange, routing_key, data.toString(), new CorrelationData(uuid));
// 使用 MessageBuilder 确保设置 messageId
Message message = MessageBuilder.withBody(data.toString().getBytes(StandardCharsets.UTF_8))
.setHeader("messageId", uuid) // 添加自定义头部存储消息ID
.setMessageId(uuid) // 设置标准 messageId 属性
.build();
rabbitTemplate.send(exchange, routing_key, message, new CorrelationData(uuid));
}
/**
@ -107,7 +116,14 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
rabbitTemplate.convertAndSend(msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), message, new CorrelationData(msgVo.getMessage_id()));
*/
rabbitTemplate.convertAndSend(msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), msgVo.getMessage_content().toString(), new CorrelationData(msgVo.getMessage_id()));
// 使用 MessageBuilder 确保设置 messageId
Message message = MessageBuilder.withBody(msgVo.getMessage_content().toString().getBytes(StandardCharsets.UTF_8))
.setHeader("messageId", msgVo.getMessage_id()) // 添加自定义头部存储消息ID
.setMessageId(msgVo.getMessage_id()) // 设置标准 messageId 属性
.build();
rabbitTemplate.send(msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), message, new CorrelationData(msgVo.getMessage_id()));
}
}
@ -155,13 +171,18 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
throw new IllegalArgumentException("延迟时间必须大于0");
}
// 创建消息并设置TTL
// 生成消息ID
String uuid = IdUtil.simpleUUID();
// 创建消息并设置TTL和MessageId
Message delayMessage = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
.setExpiration(String.valueOf(delayMillis)) // 设置消息过期时间(毫秒)
.setMessageId(uuid) // 设置消息ID
.setHeader("messageId", uuid) // 添加自定义头部存储消息ID
.build();
// 发送消息到延迟队列
rabbitTemplate.send(exchange, routingKey, delayMessage);
rabbitTemplate.send(exchange, routingKey, delayMessage, new CorrelationData(uuid));
}
/**
@ -184,9 +205,13 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
MqConstant.DELAY_ROUTING_KEY,
obj,
message -> {
// 为延迟消息设置 messageId
String uuid = IdUtil.simpleUUID();
message.getMessageProperties().setMessageId(uuid);
message.getMessageProperties().setHeader("messageId", uuid);
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return message;
}
);
}
}
}

View File

@ -13,6 +13,7 @@ import cn.hutool.json.JSONObject;
import com.rabbitmq.client.Channel;
import com.suisung.mall.common.constant.CommonConstant;
import com.suisung.mall.common.constant.MqConstant;
import com.suisung.mall.common.utils.CheckUtil;
import com.suisung.mall.shop.message.service.PushMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
@ -34,6 +35,9 @@ import java.io.IOException;
@Slf4j
public class DelayMessageReceiver {
// 消息最大重试次数
private static final int MAX_RETRY_COUNT = 5;
@Lazy
@Resource
private PushMessageService pushMessageService;
@ -45,15 +49,22 @@ public class DelayMessageReceiver {
* @param channel RabbitMQ通道用于手动确认消息
* @param msg 消息对象包含消息属性等信息
*/
@RabbitListener(queues = "dead.letter.queue")
@RabbitListener(queues = MqConstant.DEAD_LETTER_QUEUE_NAME)
public void handleExpiredMessage(JSONObject message, Channel channel, Message msg) {
log.info("收到过期消息,开始执行触发方法: {}", message);
try {
// 处理死信消息
processDeadMessage(message, channel, msg);
boolean success = processDeadMessage(message, channel, msg);
if (!success) {
log.warn("处理死信消息失败,消息将重新入队,消息内容: {}", message);
rejectMessage(channel, msg); // 处理失败时拒绝消息并重新入队
}
ackMessage(channel, msg);
} catch (Exception e) {
log.error("处理过期消息时发生异常,消息将重新入队,消息内容: {}", message, e);
rejectMessage(channel, msg); // 出现异常时拒绝消息并重新入队
}
}
@ -69,7 +80,6 @@ public class DelayMessageReceiver {
// 检查消息是否为空
if (message == null) {
log.warn("收到空消息,无法处理");
ackMessage(channel, msg);
return false;
}
@ -82,10 +92,15 @@ public class DelayMessageReceiver {
return false;
}
// 检查消息重试次数
if (!checkAndIncrementRetryCount(message, channel, msg)) {
return false;
}
// 根据消息分类处理不同类型的消息
if (category == MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED) {
// 处理订单超时消息
handleOrderExpiredMessage(message, channel, msg);
handleOrderExpiredMessage(message);
return true;
} else if (category == MqConstant.DEAD_EVENT_CATE_PRE_ORDER) {
// 处理预订单消息
@ -93,12 +108,10 @@ public class DelayMessageReceiver {
return true;
} else {
log.warn("未知的消息分类: {},消息内容: {}", category, message);
ackMessage(channel, msg);
return false;
}
} catch (Exception e) {
log.error("处理死信消息时发生异常,消息内容: {}", message, e);
ackMessage(channel, msg);
return false;
}
}
@ -109,13 +122,18 @@ public class DelayMessageReceiver {
* @param message 消息内容
* @return 处理结果
*/
private void handleOrderExpiredMessage(JSONObject message, Channel channel, Message msg) {
private boolean handleOrderExpiredMessage(JSONObject message) {
try {
String orderId = message.getStr("orderId");
Integer storeId = message.getInt("storeId");
String title = message.getStr("title");
String content = message.getStr("message");
if (StrUtil.isBlank(orderId) && CheckUtil.isEmpty(storeId)) {
log.warn("订单ID和店铺ID不能同时为空消息内容: {}", message);
return false;
}
if (StrUtil.isBlank(title)) {
title = "有一笔已超时的订单!";
}
@ -134,10 +152,10 @@ public class DelayMessageReceiver {
content, payload);
log.info("订单超时消息处理完成,订单号: {}, 店铺ID: {}", orderId, storeId);
ackMessage(channel, msg);
return true;
} catch (Exception e) {
log.error("处理订单超时消息时发生异常,消息内容: {}", message, e);
ackMessage(channel, msg);
return false;
}
}
@ -158,6 +176,38 @@ public class DelayMessageReceiver {
}
}
/**
* 检查并增加消息重试次数
*
* @param message 消息内容
* @param channel RabbitMQ通道
* @param msg 消息对象
* @return true-可以继续处理 false-已达最大重试次数应确认消息
*/
private boolean checkAndIncrementRetryCount(JSONObject message, Channel channel, Message msg) {
try {
// 获取当前重试次数默认为0
Integer retryCount = message.getInt("retryCount");
if (retryCount == null) {
retryCount = 0;
}
// 如果重试次数超过最大限制则确认消息并不再处理
if (retryCount >= MAX_RETRY_COUNT) {
log.warn("消息重试次数已达上限({}),不再处理,消息内容: {}", MAX_RETRY_COUNT, message);
ackMessage(channel, msg);
return false;
}
// 增加重试次数
message.put("retryCount", retryCount + 1);
return true;
} catch (Exception e) {
log.error("检查消息重试次数时发生异常,消息内容: {}", message, e);
return true; // 出现异常时继续处理消息
}
}
/**
* 确认消息处理成功
*
@ -166,10 +216,12 @@ public class DelayMessageReceiver {
*/
private void ackMessage(Channel channel, Message message) {
try {
// 获取消息ID优先从标准属性获取其次从自定义头部获取
String messageId = getMessageId(message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.debug("消息确认成功消息ID: {}", message.getMessageProperties().getMessageId());
log.debug("消息确认成功消息ID: {}", messageId);
} catch (IOException e) {
log.error("确认消息失败,消息ID: {}异常原因:", message.getMessageProperties().getMessageId(), e);
log.error("确认消息失败,异常原因:", e);
}
}
@ -181,14 +233,39 @@ public class DelayMessageReceiver {
*/
private void rejectMessage(Channel channel, Message message) {
try {
// 获取消息ID优先从标准属性获取其次从自定义头部获取
String messageId = getMessageId(message);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
log.debug("消息拒绝成功消息将重新入队消息ID: {}", message.getMessageProperties().getMessageId());
log.debug("消息拒绝成功消息将重新入队消息ID: {}", messageId);
Thread.sleep(1000);
} catch (IOException | InterruptedException e) {
log.error("拒绝消息失败,消息ID: {}异常原因:", message.getMessageProperties().getMessageId(), e);
log.error("拒绝消息失败,异常原因:", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
/**
* 获取消息ID支持从标准属性和自定义头部获取
*
* @param message 消息对象
* @return 消息ID
*/
private String getMessageId(Message message) {
// 首先尝试从标准 messageId 属性获取
String messageId = message.getMessageProperties().getMessageId();
if (StrUtil.isNotBlank(messageId)) {
return messageId;
}
// 如果标准属性为空尝试从自定义头部获取
Object headerMessageId = message.getMessageProperties().getHeaders().get("messageId");
if (headerMessageId != null) {
return headerMessageId.toString();
}
// 如果都获取不到返回 null
return null;
}
}