From 38f0feb5b578919b9cad140016fe833ce846f386 Mon Sep 17 00:00:00 2001 From: Jack <46790855@qq.com> Date: Fri, 25 Jul 2025 22:45:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A2=E5=8D=95=E8=BF=87=E6=9C=9F=E5=8F=91?= =?UTF-8?q?=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mall/common/constant/MqConstant.java | 21 ++ .../impl/ShopActivityCutpriceServiceImpl.java | 2 +- .../mall/shop/config/PathReplaceUtil.java | 120 ----------- .../mall/shop/config/RabbitMqConfig.java | 74 +++++++ .../controller/mobile/LakalaController.java | 16 +- .../message/service/MqMessageService.java | 28 +++ .../service/impl/MqMessageServiceImpl.java | 64 ++++++ .../order/listener/DelayMessageReceiver.java | 187 ++++++++++++++++++ .../shop/order/listener/MessageListener.java | 43 +++- .../order/listener/OrderPayedListener.java | 4 + .../order/service/ShopOrderBaseService.java | 10 + .../impl/ShopOrderBaseServiceImpl.java | 34 ++++ pom.xml | 14 +- 13 files changed, 481 insertions(+), 136 deletions(-) delete mode 100644 mall-shop/src/main/java/com/suisung/mall/shop/config/PathReplaceUtil.java create mode 100644 mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java diff --git a/mall-common/src/main/java/com/suisung/mall/common/constant/MqConstant.java b/mall-common/src/main/java/com/suisung/mall/common/constant/MqConstant.java index 3cd704f2..f0ff94fb 100644 --- a/mall-common/src/main/java/com/suisung/mall/common/constant/MqConstant.java +++ b/mall-common/src/main/java/com/suisung/mall/common/constant/MqConstant.java @@ -41,4 +41,25 @@ public class MqConstant { public static final String ACCOUNT_UPGRADE_QUEUE = "account.upgrade.queue"; // 用户升级处理队列 public static final String ACCOUNT_UPGRADE_ROUTING_KEY = "account.upgrade_routing_key"; // 用户用户升级队列路由键 + // RabbitMQ 配置类,实现基于 TTL + 死信队列的延迟任务 + // 延迟交换机名称 + public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; + // 延迟队列名称 + public static final String DELAY_QUEUE_NAME = "delay.queue"; + // 延迟队列路由键 + public static final String DELAY_ROUTING_KEY = "delay.routing.key"; + + // 死信交换机名称 + public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; + // 死信队列名称 + public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue"; + // 死信队列路由键 + public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key"; + + // 死信队列事件类型:1-顺丰订单超时;2-预下单; + public static final Integer DEAD_EVENT_CATE_ORDER_EXPIRED = 1; + // 死信队列事件类型:1-顺丰订单超时;2-预下单; + public static final Integer DEAD_EVENT_CATE_PRE_ORDER = 2; + + } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/activity/service/impl/ShopActivityCutpriceServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/activity/service/impl/ShopActivityCutpriceServiceImpl.java index ca56c343..1bd81913 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/activity/service/impl/ShopActivityCutpriceServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/activity/service/impl/ShopActivityCutpriceServiceImpl.java @@ -306,7 +306,7 @@ public class ShopActivityCutpriceServiceImpl extends BaseServiceImpl arguments = new HashMap<>(3); + // 绑定死信交换机 + arguments.put("x-dead-letter-exchange", MqConstant.DEAD_LETTER_EXCHANGE_NAME); + // 绑定死信路由键 + arguments.put("x-dead-letter-routing-key", MqConstant.DEAD_LETTER_ROUTING_KEY); + // 可选:设置队列级别的默认TTL(毫秒),如果消息没有设置TTL则使用此值 + arguments.put("x-message-ttl", 60000); + + // 创建队列 + return QueueBuilder.durable(MqConstant.DELAY_QUEUE_NAME) + .withArguments(arguments) + .build(); + } + + /** + * 声明死信队列,用于接收过期的消息并处理 + */ + @Bean + public Queue deadLetterQueue() { + return QueueBuilder.durable(MqConstant.DEAD_LETTER_QUEUE_NAME) + .build(); + } + + /** + * 绑定延迟队列到延迟交换机 + */ + @Bean + public Binding delayQueueBinding() { + return BindingBuilder.bind(delayQueue()) + .to(delayExchange()) + .with(MqConstant.DELAY_ROUTING_KEY); + } + + /** + * 绑定死信队列到死信交换机 + */ + @Bean + public Binding deadLetterQueueBinding() { + return BindingBuilder.bind(deadLetterQueue()) + .to(deadLetterExchange()) + .with(MqConstant.DEAD_LETTER_ROUTING_KEY); + } + } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java b/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java index 03e09861..c5f3ba5a 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java @@ -14,6 +14,7 @@ import com.suisung.mall.common.api.CommonResult; import com.suisung.mall.common.service.impl.BaseControllerImpl; import com.suisung.mall.shop.lakala.service.LakalaApiService; import com.suisung.mall.shop.library.service.LibraryProductService; +import com.suisung.mall.shop.message.service.MqMessageService; import com.suisung.mall.shop.message.service.PushMessageService; import com.suisung.mall.shop.order.service.ShopOrderReturnService; import com.suisung.mall.shop.store.service.ShopStoreSameCityTransportBaseService; @@ -50,10 +51,13 @@ public class LakalaController extends BaseControllerImpl { @Resource private PushMessageService pushMessageService; + @Resource + private MqMessageService mqMessageService; + @ApiOperation(value = "测试案例", notes = "测试案例") @RequestMapping(value = "/testcase", method = RequestMethod.POST) public Object testcase(@RequestBody JSONObject paramsJSON) { - return shopOrderReturnService.sfExpressExpiredForceRefund(paramsJSON.getStr("orderId")); + // return shopOrderReturnService.sfExpressExpiredForceRefund(paramsJSON.getStr("orderId")); // return lakalaPayService.applyLedgerMerEc(paramsJSON.getStr("mchMobile")); // return lakalaPayService.LedgerMerEcDownload(975790666910121984L); @@ -72,6 +76,16 @@ public class LakalaController extends BaseControllerImpl { // List clientIds = JSONUtil.toList(paramsJSON.getJSONArray("clientIds"), String.class); // return pushMessageService.sendMessage(clientIds, paramsJSON.getStr("title"), paramsJSON.getStr("content"), paramsJSON.getJSONObject("payload")); + JSONObject jsonObject = new JSONObject(); + String orderId = "DD-20250725-1"; + jsonObject.put("category", 1); + jsonObject.put("orderId", "DD-20250725-1"); + jsonObject.put("storeId", 12); + jsonObject.put("title", "有一笔已超时的订单!"); + jsonObject.put("message", "您有一笔已超时的订单[" + orderId + "],请及时处理。"); + mqMessageService.sendDelayMessage(jsonObject.toString(), 10000); + return jsonObject; + } @ApiOperation(value = "批量发送推送消息 - 测试案例", notes = "批量发送推送消息 - 测试案例") diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/message/service/MqMessageService.java b/mall-shop/src/main/java/com/suisung/mall/shop/message/service/MqMessageService.java index 83077155..4801ed85 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/message/service/MqMessageService.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/message/service/MqMessageService.java @@ -22,4 +22,32 @@ public interface MqMessageService extends IBaseService { * 设置消息状态 */ boolean setMessageStatus(String messageId, Integer messageStatus); + + + /** + * 发送延迟消息 + * + * @param message 消息内容 + * @param delayMillis 延迟时间(毫秒) + */ + void sendDelayMessage(String message, long delayMillis); + + /** + * 发送延迟消息(重载方法,可指定交换机和路由键) + * + * @param message 消息内容 + * @param delayMillis 延迟时间(毫秒) + * @param exchange 交换机名称 + * @param routingKey 路由键 + */ + void sendDelayMessage(String message, long delayMillis, String exchange, String routingKey); + + /** + * 发送延迟消息(泛型方法,支持对象消息) + * + * @param obj 消息对象 + * @param delayMillis 延迟时间(毫秒) + * @param 消息类型 + */ + void sendDelayObjectMessage(T obj, long delayMillis); } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java index 2df8a09e..6b78c9dd 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/message/service/impl/MqMessageServiceImpl.java @@ -9,11 +9,14 @@ import com.suisung.mall.core.web.service.impl.BaseServiceImpl; import com.suisung.mall.shop.message.mapper.MqMessageMapper; import com.suisung.mall.shop.message.service.MqMessageService; import com.suisung.mall.shop.message.vo.MqMessageVo; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -125,4 +128,65 @@ public class MqMessageServiceImpl extends BaseServiceImpl 消息类型 + */ + @Override + public void sendDelayObjectMessage(T obj, long delayMillis) { + // 验证延迟时间是否有效 + if (delayMillis <= 0) { + throw new IllegalArgumentException("延迟时间必须大于0"); + } + + // 发送对象消息,设置消息属性 + rabbitTemplate.convertAndSend( + MqConstant.DELAY_EXCHANGE_NAME, + MqConstant.DELAY_ROUTING_KEY, + obj, + message -> { + message.getMessageProperties().setExpiration(String.valueOf(delayMillis)); + return message; + } + ); + } } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java new file mode 100644 index 00000000..9824ef91 --- /dev/null +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2025. Lorem ipsum dolor sit amet, consectetur adipiscing elit. + * Morbi non lorem porttitor neque feugiat blandit. Ut vitae ipsum eget quam lacinia accumsan. + * Etiam sed turpis ac ipsum condimentum fringilla. Maecenas magna. + * Proin dapibus sapien vel ante. Aliquam erat volutpat. Pellentesque sagittis ligula eget metus. + * Vestibulum commodo. Ut rhoncus gravida arcu. + */ + +package com.suisung.mall.shop.order.listener; + +import cn.hutool.json.JSONObject; +import com.rabbitmq.client.Channel; +import com.suisung.mall.common.constant.CommonConstant; +import com.suisung.mall.common.constant.MqConstant; +import com.suisung.mall.shop.message.service.PushMessageService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.IOException; + +/** + * 延迟消息接收器,处理过期的消息 + *

+ * 该监听器处理RabbitMQ中的死信队列消息,主要处理以下类型的消息: + * 1. 订单超时消息(DEAD_EVENT_CATE_ORDER_EXPIRED) + * 2. 预订单消息(DEAD_EVENT_CATE_PRE_ORDER) + */ +@Component +@Slf4j +public class DelayMessageReceiver { + + @Resource + private PushMessageService pushMessageService; + + /** + * 监听死信队列,处理过期的延迟消息 + * + * @param message 消息内容(JSON格式) + * @param channel RabbitMQ通道,用于手动确认消息 + * @param msg 消息对象,包含消息属性等信息 + */ + @RabbitListener(queues = "dead.letter.queue") + public void handleExpiredMessage(JSONObject message, Channel channel, Message msg) { + log.info("收到过期消息,开始执行触发方法: {}", message); + + try { + // 处理死信消息 + boolean result = processDeadMessage(message); + + // 根据处理结果确认或拒绝消息 + if (result) { + ackMessage(channel, msg); + } else { + log.warn("处理过期消息失败,消息将重新入队,消息内容: {}", message); + rejectMessage(channel, msg); + } + } catch (Exception e) { + log.error("处理过期消息时发生异常,消息将重新入队,消息内容: {}", message, e); + rejectMessage(channel, msg); + } + } + + /** + * 处理死信队列消息 + * + * @param message 消息内容 + * @return 处理结果 true-成功 false-失败 + */ + private boolean processDeadMessage(JSONObject message) { + log.info("开始处理死信消息: {}", message); + + // 检查消息是否为空 + if (message == null) { + log.warn("收到空消息,无法处理"); + return false; + } + + try { + // 获取消息分类 + Integer category = message.getInt("category"); + if (category == null) { + log.warn("消息分类为空,无法处理消息: {}", message); + return false; + } + + // 根据消息分类处理不同类型的消息 + if (category == MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED) { + // 处理订单超时消息 + return handleOrderExpiredMessage(message); + } else if (category == MqConstant.DEAD_EVENT_CATE_PRE_ORDER) { + // 处理预订单消息 + return handlePreOrderMessage(message); + } else { + log.warn("未知的消息分类: {},消息内容: {}", category, message); + return false; + } + } catch (Exception e) { + log.error("处理死信消息时发生异常,消息内容: {}", message, e); + return false; + } + } + + /** + * 处理订单超时消息 + * + * @param message 消息内容 + * @return 处理结果 + */ + private boolean handleOrderExpiredMessage(JSONObject message) { + try { + String orderId = message.getStr("orderId"); + Integer storeId = message.getInt("storeId"); + + log.info("处理订单超时消息,订单号: {}, 店铺ID: {}", orderId, storeId); + + // 构造推送消息内容 + JSONObject payload = new JSONObject(); + payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ABNORMAL_ORDER_LIST); + payload.put("orderId", orderId); + + // 发送推送消息给商家员工 + pushMessageService.noticeMerchantEmployeeOrderAction( + storeId, orderId, message.getStr("title"), + message.getStr("message"), payload); + + log.info("订单超时消息处理完成,订单号: {}, 店铺ID: {}", orderId, storeId); + return true; + } catch (Exception e) { + log.error("处理订单超时消息时发生异常,消息内容: {}", message, e); + return false; + } + } + + /** + * 处理预订单消息 + * + * @param message 消息内容 + * @return 处理结果 + */ + private boolean handlePreOrderMessage(JSONObject message) { + try { + log.info("处理预订单消息: {}", message); + // 预订单处理逻辑(待实现) + return true; + } catch (Exception e) { + log.error("处理预订单消息时发生异常,消息内容: {}", message, e); + return false; + } + } + + /** + * 确认消息处理成功 + * + * @param channel RabbitMQ通道 + * @param message 消息对象 + */ + private void ackMessage(Channel channel, Message message) { + try { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.debug("消息确认成功,消息ID: {}", message.getMessageProperties().getMessageId()); + } catch (IOException e) { + log.error("确认消息失败,消息ID: {},异常原因:", message.getMessageProperties().getMessageId(), e); + } + } + + /** + * 拒绝消息处理(消息重新入队) + * + * @param channel RabbitMQ通道 + * @param message 消息对象 + */ + private void rejectMessage(Channel channel, Message message) { + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + log.debug("消息拒绝成功,消息将重新入队,消息ID: {}", message.getMessageProperties().getMessageId()); + Thread.sleep(1000); + } catch (IOException | InterruptedException e) { + log.error("拒绝消息失败,消息ID: {},异常原因:", message.getMessageProperties().getMessageId(), e); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/MessageListener.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/MessageListener.java index 5b4a4f3d..02ab3ad9 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/MessageListener.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/MessageListener.java @@ -23,7 +23,7 @@ public class MessageListener { private MessageService messageService; @RabbitHandler - public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { + public void listener(String data, Channel channel, Message message) { MsgTO msgTO = JSONUtil.toBean(data, MsgTO.class); String messageId = message.getMessageProperties().getMessageId(); @@ -31,18 +31,47 @@ public class MessageListener { log.debug("消息监听到:{} ### {}", messageId, msgTO); boolean flag = messageService.sendNoticeMsg(msgTO.getUser_id(), msgTO.getStore_id(), msgTO.getMessage_id(), msgTO.getArgs()); if (flag) { - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + ackMessage(channel, message); } else { log.error("消息消费失败,执行sendNoticeMsg异常"); - channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); - Thread.sleep(1000); + rejectMessage(channel, message); } } catch (Exception e) { log.error("消息消费失败,执行sendNoticeMsg异常, 异常原因:", e); - channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); - Thread.sleep(1000); + rejectMessage(channel, message); } } -} + /** + * 确认消息处理成功 + * + * @param channel RabbitMQ通道 + * @param message 消息对象 + */ + private void ackMessage(Channel channel, Message message) { + try { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (IOException e) { + log.error("确认消息失败,异常原因:", e); + } + } + /** + * 拒绝消息处理 + * + * @param channel RabbitMQ通道 + * @param message 消息对象 + */ + private void rejectMessage(Channel channel, Message message) { + try { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + Thread.sleep(1000); + } catch (IOException | InterruptedException e) { + log.error("处理消息失败,异常原因:", e); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } + +} \ No newline at end of file diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/OrderPayedListener.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/OrderPayedListener.java index 7ca0325a..738d76eb 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/OrderPayedListener.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/OrderPayedListener.java @@ -132,6 +132,10 @@ public class OrderPayedListener { payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ONLINE_ORDER_LIST); payload.put("orderId", orderId); pushMessageService.noticeMerchantEmployeeOrderAction(orderInfoOld.getStore_id(), orderId, title, content, payload); + + // 发送 预过期 MQ 的推送消息 + shopOrderBaseService.preSendExpiredSFOrderPushMessage(orderInfoOld.getStore_id(), orderId, 1500L); // 25分钟发出过期消息 + } } } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/service/ShopOrderBaseService.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/service/ShopOrderBaseService.java index 5097f9c6..83bdcead 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/order/service/ShopOrderBaseService.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/service/ShopOrderBaseService.java @@ -596,4 +596,14 @@ public interface ShopOrderBaseService extends IBaseService { * @return */ WxOrderBaseInfoDTO getWxOrderBaseInfo(String orderId); + + /** + * 预处理发货订单超时消息(发到 mq 里,触发超时事件,发出推送消息) + * + * @param storeId 店铺Id + * @param orderId 订单Id + * @param expireSeconds 配送超时的秒数,单位秒 + * @return + */ + Boolean preSendExpiredSFOrderPushMessage(Integer storeId, String orderId, Long expireSeconds); } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/service/impl/ShopOrderBaseServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/service/impl/ShopOrderBaseServiceImpl.java index d578da54..6308bb6d 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/order/service/impl/ShopOrderBaseServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/service/impl/ShopOrderBaseServiceImpl.java @@ -101,6 +101,7 @@ import io.seata.core.exception.TransactionException; import io.seata.spring.annotation.GlobalTransactional; import io.seata.tm.api.GlobalTransaction; import io.seata.tm.api.GlobalTransactionContext; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.ibatis.annotations.Param; import org.slf4j.Logger; @@ -110,6 +111,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.data.util.Pair; import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -138,6 +140,7 @@ import static com.suisung.mall.common.utils.ContextUtil.getCurrentUser; * @author Xinze * @since 2021-04-30 */ +@Slf4j @Service public class ShopOrderBaseServiceImpl extends BaseServiceImpl implements ShopOrderBaseService { @@ -8805,6 +8808,37 @@ public class ShopOrderBaseServiceImpl extends BaseServiceImplJ0XivNvAcR14}pA6Cysm.E17--> - - - - - 42.194.196.179 + 114.132.210.208 15 - 6480 - hwe9EgqgMAwY + 6379 + Gpff654321 + + + + 114.132.210.208 5672