diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/components/IpUtil.java b/mall-shop/src/main/java/com/suisung/mall/shop/components/IpUtil.java index 21c13e73..0a999075 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/components/IpUtil.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/components/IpUtil.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; +import org.springframework.core.io.ClassPathResource; import java.io.IOException; import java.io.InputStream; @@ -16,14 +17,26 @@ import java.util.List; import java.util.Objects; /** + * IP地址工具类 + * 用于解析IP地址对应的地理位置信息 + * * @auth: Xinze */ @Component public class IpUtil implements ApplicationRunner { private static final Logger log = LoggerFactory.getLogger(IpUtil.class); + /** + * ip2region搜索器实例 + */ private static Searcher searcher = null; + /** + * 根据IP地址获取地区信息 + * + * @param ip IP地址 + * @return 地区信息字符串,格式为:国家|区域|省份|城市|ISP + */ public static String getRegion(String ip) { if (Objects.isNull(searcher)) { log.error("IP2RegionUtils 没有成功加载数据文件"); @@ -37,27 +50,54 @@ public class IpUtil implements ApplicationRunner { try { return searcher.search(ip); } catch (Exception e) { - log.error("IP:{} 格式错误:{}", ip, e.getMessage()); + String errorMsg = e.getMessage(); + if (errorMsg == null) { + errorMsg = e.getClass().getName() + ": " + e.getCause(); + if (e.getCause() != null) { + errorMsg += " -> " + e.getCause().getMessage(); + } + } + log.error("IP:{} 格式错误:{}", ip, errorMsg, e); return ip; } } + /** + * 根据IP地址获取详细地址信息 + * + * @param ip IP地址 + * @return 地址信息字符串 + */ public static String getAddr(String ip) { return getRegion(ip); } + /** + * 根据IP地址获取地区对象 + * + * @param ip IP地址 + * @return 地区对象,包含国家、省份、城市信息 + */ public static DistrictVo getDistrict(String ip) { DistrictVo districtVo = new DistrictVo(); String ss = getAddr(ip); if (CheckUtil.isNotEmpty(ss)) { - List split = Arrays.asList(ss.split("\\|")); + try { + List split = Arrays.asList(ss.split("\\|")); - if (split.size() >= 4) { - districtVo.setCountry(split.get(0)); - districtVo.setProvince(split.get(2)); - districtVo.setCity(split.get(3)); - } else { + if (split.size() >= 4) { + districtVo.setCountry(split.get(0)); // 设置国家 + districtVo.setProvince(split.get(2)); // 设置省份 + districtVo.setCity(split.get(3)); // 设置城市 + } else { + return null; + } + } catch (ArrayIndexOutOfBoundsException e) { + log.error("解析IP地址 {} 的地区信息时发生数组越界异常,原始数据: {}", ip, ss, e); + return null; + } catch (Exception e) { + log.error("解析IP地址 {} 的地区信息时发生异常: {}", ip, e.getMessage(), e); return null; } } @@ -65,21 +105,45 @@ public class IpUtil implements ApplicationRunner { return districtVo; } + /** + * 应用启动时加载ip2region数据文件 + * + * @param args 应用启动参数 + * @throws Exception 加载过程中可能抛出的异常 + */ @Override public void run(ApplicationArguments args) throws Exception { try { log.info("开始加载 ip2region 数据文件"); - //Resource resource = new ClassPathResource("static/ip/ip2region.xdb"); - //InputStream inputStream = resource.getInputStream() - InputStream inputStream = getClass().getClassLoader().getResourceAsStream("static/ip/ip2region.xdb"); - byte[] bytes = new byte[inputStream.available()]; - inputStream.read(bytes); + ClassPathResource resource = new ClassPathResource("static/ip/ip2region.xdb"); + InputStream inputStream = resource.getInputStream(); + + // 使用 ByteArrayOutputStream 替代 available() 方法来正确读取文件内容 + java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream(); + int nRead; + byte[] data = new byte[1024]; + while ((nRead = inputStream.read(data, 0, data.length)) != -1) { + buffer.write(data, 0, nRead); + } + buffer.flush(); + byte[] bytes = buffer.toByteArray(); + inputStream.close(); searcher = Searcher.newWithBuffer(bytes); log.info("成功加载 ip2region 数据文件。"); + + // 测试本地IP地址解析 + try { + String testResult = searcher.search("127.0.0.1"); + log.info("测试本地IP解析结果: {}", testResult); + } catch (Exception e) { + log.warn("本地IP测试解析失败: {}", e.getMessage()); + } } catch (IOException e) { - log.error("加载 ip2region 失败。{}", e.getMessage()); + log.error("加载 ip2region 失败。", e); + } catch (Exception e) { + log.error("初始化 ip2region 搜索器失败。", e); } } } \ No newline at end of file diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java index 6b78c9dd..9960b719 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java @@ -1,6 +1,8 @@ package com.suisung.mall.shop.message.service.impl; import cn.hutool.core.util.IdUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.suisung.mall.common.api.ResultCode; import com.suisung.mall.common.constant.MqConstant; import com.suisung.mall.common.exception.ApiException; @@ -11,6 +13,7 @@ import com.suisung.mall.shop.message.service.MqMessageService; import com.suisung.mall.shop.message.vo.MqMessageVo; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; +import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -67,7 +70,13 @@ public class MqMessageServiceImpl extends BaseServiceImpl { + // 为延迟消息设置 messageId + String uuid = IdUtil.simpleUUID(); + message.getMessageProperties().setMessageId(uuid); + message.getMessageProperties().setHeader("messageId", uuid); message.getMessageProperties().setExpiration(String.valueOf(delayMillis)); return message; } ); } -} +} \ No newline at end of file diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java index c9b24ef8..34647043 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java @@ -13,6 +13,7 @@ import cn.hutool.json.JSONObject; import com.rabbitmq.client.Channel; import com.suisung.mall.common.constant.CommonConstant; import com.suisung.mall.common.constant.MqConstant; +import com.suisung.mall.common.utils.CheckUtil; import com.suisung.mall.shop.message.service.PushMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; @@ -34,6 +35,9 @@ import java.io.IOException; @Slf4j public class DelayMessageReceiver { + // 消息最大重试次数 + private static final int MAX_RETRY_COUNT = 5; + @Lazy @Resource private PushMessageService pushMessageService; @@ -45,15 +49,22 @@ public class DelayMessageReceiver { * @param channel RabbitMQ通道,用于手动确认消息 * @param msg 消息对象,包含消息属性等信息 */ - @RabbitListener(queues = "dead.letter.queue") + @RabbitListener(queues = MqConstant.DEAD_LETTER_QUEUE_NAME) public void handleExpiredMessage(JSONObject message, Channel channel, Message msg) { log.info("收到过期消息,开始执行触发方法: {}", message); try { // 处理死信消息 - processDeadMessage(message, channel, msg); + boolean success = processDeadMessage(message, channel, msg); + if (!success) { + log.warn("处理死信消息失败,消息将重新入队,消息内容: {}", message); + rejectMessage(channel, msg); // 处理失败时拒绝消息并重新入队 + } + + ackMessage(channel, msg); } catch (Exception e) { log.error("处理过期消息时发生异常,消息将重新入队,消息内容: {}", message, e); + rejectMessage(channel, msg); // 出现异常时拒绝消息并重新入队 } } @@ -69,7 +80,6 @@ public class DelayMessageReceiver { // 检查消息是否为空 if (message == null) { log.warn("收到空消息,无法处理"); - ackMessage(channel, msg); return false; } @@ -82,10 +92,15 @@ public class DelayMessageReceiver { return false; } + // 检查消息重试次数 + if (!checkAndIncrementRetryCount(message, channel, msg)) { + return false; + } + // 根据消息分类处理不同类型的消息 if (category == MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED) { // 处理订单超时消息 - handleOrderExpiredMessage(message, channel, msg); + handleOrderExpiredMessage(message); return true; } else if (category == MqConstant.DEAD_EVENT_CATE_PRE_ORDER) { // 处理预订单消息 @@ -93,12 +108,10 @@ public class DelayMessageReceiver { return true; } else { log.warn("未知的消息分类: {},消息内容: {}", category, message); - ackMessage(channel, msg); return false; } } catch (Exception e) { log.error("处理死信消息时发生异常,消息内容: {}", message, e); - ackMessage(channel, msg); return false; } } @@ -109,13 +122,18 @@ public class DelayMessageReceiver { * @param message 消息内容 * @return 处理结果 */ - private void handleOrderExpiredMessage(JSONObject message, Channel channel, Message msg) { + private boolean handleOrderExpiredMessage(JSONObject message) { try { String orderId = message.getStr("orderId"); Integer storeId = message.getInt("storeId"); String title = message.getStr("title"); String content = message.getStr("message"); + if (StrUtil.isBlank(orderId) && CheckUtil.isEmpty(storeId)) { + log.warn("订单ID和店铺ID不能同时为空,消息内容: {}", message); + return false; + } + if (StrUtil.isBlank(title)) { title = "有一笔已超时的订单!"; } @@ -134,10 +152,10 @@ public class DelayMessageReceiver { content, payload); log.info("订单超时消息处理完成,订单号: {}, 店铺ID: {}", orderId, storeId); - ackMessage(channel, msg); + return true; } catch (Exception e) { log.error("处理订单超时消息时发生异常,消息内容: {}", message, e); - ackMessage(channel, msg); + return false; } } @@ -158,6 +176,38 @@ public class DelayMessageReceiver { } } + /** + * 检查并增加消息重试次数 + * + * @param message 消息内容 + * @param channel RabbitMQ通道 + * @param msg 消息对象 + * @return true-可以继续处理 false-已达最大重试次数,应确认消息 + */ + private boolean checkAndIncrementRetryCount(JSONObject message, Channel channel, Message msg) { + try { + // 获取当前重试次数,默认为0 + Integer retryCount = message.getInt("retryCount"); + if (retryCount == null) { + retryCount = 0; + } + + // 如果重试次数超过最大限制,则确认消息并不再处理 + if (retryCount >= MAX_RETRY_COUNT) { + log.warn("消息重试次数已达上限({}),不再处理,消息内容: {}", MAX_RETRY_COUNT, message); + ackMessage(channel, msg); + return false; + } + + // 增加重试次数 + message.put("retryCount", retryCount + 1); + return true; + } catch (Exception e) { + log.error("检查消息重试次数时发生异常,消息内容: {}", message, e); + return true; // 出现异常时继续处理消息 + } + } + /** * 确认消息处理成功 * @@ -166,10 +216,12 @@ public class DelayMessageReceiver { */ private void ackMessage(Channel channel, Message message) { try { + // 获取消息ID,优先从标准属性获取,其次从自定义头部获取 + String messageId = getMessageId(message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); - log.debug("消息确认成功,消息ID: {}", message.getMessageProperties().getMessageId()); + log.debug("消息确认成功,消息ID: {}", messageId); } catch (IOException e) { - log.error("确认消息失败,消息ID: {},异常原因:", message.getMessageProperties().getMessageId(), e); + log.error("确认消息失败,异常原因:", e); } } @@ -181,14 +233,39 @@ public class DelayMessageReceiver { */ private void rejectMessage(Channel channel, Message message) { try { + // 获取消息ID,优先从标准属性获取,其次从自定义头部获取 + String messageId = getMessageId(message); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); - log.debug("消息拒绝成功,消息将重新入队,消息ID: {}", message.getMessageProperties().getMessageId()); + log.debug("消息拒绝成功,消息将重新入队,消息ID: {}", messageId); Thread.sleep(1000); } catch (IOException | InterruptedException e) { - log.error("拒绝消息失败,消息ID: {},异常原因:", message.getMessageProperties().getMessageId(), e); + log.error("拒绝消息失败,异常原因:", e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } + + /** + * 获取消息ID,支持从标准属性和自定义头部获取 + * + * @param message 消息对象 + * @return 消息ID + */ + private String getMessageId(Message message) { + // 首先尝试从标准 messageId 属性获取 + String messageId = message.getMessageProperties().getMessageId(); + if (StrUtil.isNotBlank(messageId)) { + return messageId; + } + + // 如果标准属性为空,尝试从自定义头部获取 + Object headerMessageId = message.getMessageProperties().getHeaders().get("messageId"); + if (headerMessageId != null) { + return headerMessageId.toString(); + } + + // 如果都获取不到,返回 null + return null; + } } \ No newline at end of file