From e0f7e56c9031b8072c3c765f44ac465ccf72c391 Mon Sep 17 00:00:00 2001 From: Jack <46790855@qq.com> Date: Fri, 29 Aug 2025 23:31:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=AD=BB=E4=BF=A1=E6=B6=88?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/DealUserAnalyticsListener.java | 46 ++++++++--- .../account/listener/ExperienceListener.java | 1 + .../mall/shop/config/RabbitMqConfig.java | 80 +++++++++++-------- .../controller/mobile/LakalaController.java | 20 ++--- .../lakala/service/impl/LklTkServiceImpl.java | 2 +- .../order/listener/DelayMessageReceiver.java | 72 +++++++++++------ pom.xml | 12 ++- 7 files changed, 150 insertions(+), 83 deletions(-) diff --git a/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java b/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java index 062bca31..4385dc57 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java +++ b/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.util.Map; @Service @Slf4j @@ -21,16 +22,18 @@ public class DealUserAnalyticsListener { @Autowired private AccountUserAnalyticsService accountUserAnalyticsService; - -// @RabbitHandler -// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { -// // 将byte[]转换为String,然后调用现有的处理逻辑 -// String dataStr = new String(data, StandardCharsets.UTF_8); -// listener(dataStr, channel, message); -// } - + @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { + // 检查消息是否已达到最大重试次数 + Map headers = message.getMessageProperties().getHeaders(); + Object xDeath = headers.get("x-death"); + if (xDeath != null) { + log.warn("消息已进入死信队列,可能已达到最大重试次数,直接确认: {}", data); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + return; + } + AccountUserAnalytics accountUserAnalytics = JSONUtil.toBean(data, AccountUserAnalytics.class); // String messageId = message.getMessageProperties().getMessageId(); @@ -40,14 +43,33 @@ public class DealUserAnalyticsListener { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { log.error("消息消费失败,执行dealUserAnalytics异常,当前用户编号:{}", accountUserAnalytics.getUser_id()); - channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + // 检查重试次数,避免无限循环 + Long retryCount = (Long) headers.getOrDefault("retryCount", 0L); + if (retryCount < 3) { + // 设置重试次数并重新入队 + headers.put("retryCount", retryCount + 1); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + } else { + // 达到最大重试次数,直接确认消息 + log.error("消息重试次数已达上限,直接确认消息,用户编号:{}", accountUserAnalytics.getUser_id()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } Thread.sleep(1000); } } catch (Exception e) { log.error("消息消费失败,执行dealUserAnalytics异常,当前用户编号:{},失败原因", accountUserAnalytics.getUser_id(), e); - channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + // 检查重试次数,避免无限循环 + Long retryCount = (Long) headers.getOrDefault("retryCount", 0L); + if (retryCount < 3) { + // 设置重试次数并重新入队 + headers.put("retryCount", retryCount + 1); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); + } else { + // 达到最大重试次数,直接确认消息 + log.error("消息重试次数已达上限,直接确认消息,用户编号:{}", accountUserAnalytics.getUser_id()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } Thread.sleep(1000); } } -} - +} \ No newline at end of file diff --git a/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java b/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java index efee4275..f8b0b3e5 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java +++ b/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.util.Map; @Service @Slf4j diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/config/RabbitMqConfig.java b/mall-shop/src/main/java/com/suisung/mall/shop/config/RabbitMqConfig.java index cedf35b9..de842b7b 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/config/RabbitMqConfig.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/config/RabbitMqConfig.java @@ -6,8 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -22,38 +21,42 @@ public class RabbitMqConfig { @Autowired private MqMessageService mqMessageService; + /** * 序列化 */ - @Bean - public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { - // Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); - // converter.setAssumeSupportedContentType(true); -// MessageConverter converter = new StringMessageConverter(); - - RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); - // 消息抵达确认通知 - rabbitTemplate.setConfirmCallback((data, ack, cause) -> { - if (ack) { - String msgId = data.getId(); - mqMessageService.setMessageStatus(msgId, MqConstant.DELIVERED); - } else { - log.error("消息未能发送成功,消息编号:{},失败原因:{}", data.getId(), cause); - - String msgId = data.getId(); - mqMessageService.setMessageStatus(msgId, MqConstant.FAILURE); - } - }); - // 消息投递失败通知 - rabbitTemplate.setReturnCallback((msg, respCode, respText, exchange, routingKey) -> { - log.error("交换机抵达队列失败,消息编号:{},状态码:{},失败原因:{},当前交换机:{},当前路由键: {}", msg, respCode, respText, exchange, routingKey); - String msgId = msg.getMessageProperties().getMessageId(); - mqMessageService.setMessageStatus(msgId, MqConstant.FAILURE); - }); - // 配置序列化配置 - rabbitTemplate.setMessageConverter(new SimpleMessageConverter()); - return rabbitTemplate; - } +// @Bean +// public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { +// +// RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); +// // 消息抵达确认通知 +// rabbitTemplate.setConfirmCallback((data, ack, cause) -> { +// if (ack) { +// String msgId = data.getId(); +// mqMessageService.setMessageStatus(msgId, MqConstant.DELIVERED); +// } else { +// log.error("消息未能发送成功,消息编号:{},失败原因:{}", data.getId(), cause); +// +// String msgId = data.getId(); +// mqMessageService.setMessageStatus(msgId, MqConstant.FAILURE); +// } +// }); +// // 消息投递失败通知 +// rabbitTemplate.setReturnCallback((msg, respCode, respText, exchange, routingKey) -> { +// log.error("交换机抵达队列失败,消息编号:{},状态码:{},失败原因:{},当前交换机:{},当前路由键: {}", msg, respCode, respText, exchange, routingKey); +// String msgId = msg.getMessageProperties().getMessageId(); +// mqMessageService.setMessageStatus(msgId, MqConstant.FAILURE); +// }); +// +// // 配置序列化配置 +// // rabbitTemplate.setMessageConverter(new SimpleMessageConverter()); +// +// Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); +// converter.setAssumeSupportedContentType(true); +// rabbitTemplate.setMessageConverter(converter); +// +// return rabbitTemplate; +// } /** * 反序列化 @@ -62,12 +65,23 @@ public class RabbitMqConfig { public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); -// factory.setMessageConverter(new Jackson2JsonMessageConverter()); - factory.setMessageConverter(new SimpleMessageConverter()); + factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } + /** + * 反序列化 + */ +// @Bean +// public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { +// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); +// factory.setConnectionFactory(connectionFactory); +// factory.setMessageConverter(new SimpleMessageConverter()); +// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); +// return factory; +// } + /** * 默认交换机 * diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java b/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java index bf8a55ed..df5a5325 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/lakala/controller/mobile/LakalaController.java @@ -93,19 +93,19 @@ public class LakalaController extends BaseControllerImpl { // List clientIds = JSONUtil.toList(paramsJSON.getJSONArray("clientIds"), String.class); // return pushMessageService.sendMessage(clientIds, paramsJSON.getStr("title"), paramsJSON.getStr("content"), paramsJSON.getJSONObject("payload")); -// JSONObject jsonObject = new JSONObject(); -// String orderId = "DD-20250725-1"; -// jsonObject.put("category", 1); -// jsonObject.put("orderId", "DD-20250725-1"); -// jsonObject.put("storeId", 12); -// jsonObject.put("title", "有一笔已超时的订单!"); -// jsonObject.put("message", "您有一笔已超时的订单[" + orderId + "],请及时处理。"); -// mqMessageService.sendDelayMessage(jsonObject.toString(), 10000); -// return jsonObject; + JSONObject jsonObject = new JSONObject(); + String orderId = "DD-20250725-1"; + jsonObject.put("category", 1); + jsonObject.put("orderId", "DD-20250725-1"); + jsonObject.put("storeId", 12); + jsonObject.put("title", "有一笔已超时的订单!"); + jsonObject.put("message", "您有一笔已超时的订单[" + orderId + "],请及时处理。"); + mqMessageService.sendDelayMessage(jsonObject.toString(), 10000); + return jsonObject; // return shopOrderBaseService.sameCityOrderExpireSeconds(10000L); - return sfExpressApiService.createSfExpressShop(66, "能辉超市", "桂平市", "广西壮族自治区贵港市桂平市广佰汇超市(桂平店)", "谢能坤", "17777525395", "110.07165452271", "23.369069486251"); +// return sfExpressApiService.createSfExpressShop(66, "能辉超市", "桂平市", "广西壮族自治区贵港市桂平市广佰汇超市(桂平店)", "谢能坤", "17777525395", "110.07165452271", "23.369069486251"); } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/lakala/service/impl/LklTkServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/lakala/service/impl/LklTkServiceImpl.java index 906694e8..92d6dde7 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/lakala/service/impl/LklTkServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/lakala/service/impl/LklTkServiceImpl.java @@ -402,7 +402,7 @@ public class LklTkServiceImpl { String settleType = String.format("D%d", 1); reqJsonBody.put("settleType", settleType); //结算类型:0-秒到(不分账);1-次日结算(需要分账) - // formData.put("settlementType", "AUTOMATIC"); // 结算方式:MANUAL:手动结算(结算至拉卡拉APP钱包),AUTOMATIC:自动结算到银行卡,REGULAR:定时结算(仅企业商户支持) + reqJsonBody.put("settlementType", "AUTOMATIC"); // 结算方式:MANUAL:手动结算(结算至拉卡拉APP钱包),AUTOMATIC:自动结算到银行卡,REGULAR:定时结算(仅企业商户支持) // 店铺省市区信息 Map areaCode = getAreaCode(shopMchEntry.getStore_area(), false); diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java index 417952b6..7f452d00 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/order/listener/DelayMessageReceiver.java @@ -41,23 +41,32 @@ public class DelayMessageReceiver { @Lazy @Resource private PushMessageService pushMessageService; - + /** * 监听死信队列,处理过期的延迟消息 * - * @param data 消息内容(JSON格式) + * @param data 消息内容(可能是JSON对象或字符串) * @param channel RabbitMQ通道,用于手动确认消息 * @param message 消息对象,包含消息属性等信息 */ @RabbitListener(queues = MqConstant.DEAD_LETTER_QUEUE_NAME) - public void handleExpiredMessage(String data, Channel channel, Message message) { + public void handleExpiredMessage(Object data, Channel channel, Message message) { log.info("收到过期消息,开始执行触发方法: {}", data); try { + // 将消息内容转换为字符串格式 + String messageStr; + if (data instanceof String) { + messageStr = (String) data; + } else { + // 如果是对象,则转换为JSON字符串 + messageStr = new JSONObject(data).toString(); + } + // 处理死信消息 - boolean success = processDeadMessage(data, channel, message); + boolean success = processDeadMessage(messageStr, channel, message); if (!success) { - log.warn("处理死信消息失败,消息将重新入队,消息内容: {}", data); + log.warn("处理死信消息失败,消息将重新入队,消息内容: {}", messageStr); rejectMessage(channel, message); // 处理失败时拒绝消息并重新入队 } @@ -84,11 +93,12 @@ public class DelayMessageReceiver { } try { - // 获取消息分类 - + // 解析消息内容 JSONObject message = new JSONObject(messageStr); + log.debug("解析的消息内容: {}", message); - Integer category = message.getInt("category"); + // 获取消息分类 + Integer category = message.getInt("category", null); if (category == null) { log.warn("消息分类为空,无法处理消息: {}", message); ackMessage(channel, msg); @@ -101,20 +111,18 @@ public class DelayMessageReceiver { } // 根据消息分类处理不同类型的消息 - if (category == MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED) { - // 处理订单超时消息 - handleOrderExpiredMessage(message); - return true; - } else if (category == MqConstant.DEAD_EVENT_CATE_PRE_ORDER) { - // 处理预订单消息 - handlePreOrderMessage(message); - return true; + if (MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED.equals(category)) { + log.info("处理订单超时消息: {}", message); + return handleOrderExpiredMessage(message); + } else if (MqConstant.DEAD_EVENT_CATE_PRE_ORDER.equals(category)) { + log.info("处理预订单消息: {}", message); + return handlePreOrderMessage(message); } else { - log.warn("未知的消息分类: {},消息内容: {}", category, message); + log.warn("未知的消息分类: {}, 消息内容: {}", category, message); return false; } } catch (Exception e) { - log.error("处理死信消息时发生异常,消息内容: {}, err: {}", messageStr, e); + log.error("处理死信消息时发生异常,消息内容: {}, 错误详情: {}", messageStr, e.getMessage(), e); return false; } } @@ -219,12 +227,20 @@ public class DelayMessageReceiver { */ private void ackMessage(Channel channel, Message message) { try { + // 检查通道是否已关闭 + if (!channel.isOpen()) { + log.warn("RabbitMQ通道已关闭,无法确认消息,消息ID: {}", getMessageId(message)); + return; + } + // 获取消息ID,优先从标准属性获取,其次从自定义头部获取 String messageId = getMessageId(message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.debug("消息确认成功,消息ID: {}", messageId); } catch (IOException e) { - log.error("确认消息失败,异常原因:", e); + log.error("确认消息失败,IO异常:", e); + } catch (IllegalStateException e) { + log.warn("确认消息失败,通道已关闭:", e); } } @@ -236,16 +252,24 @@ public class DelayMessageReceiver { */ private void rejectMessage(Channel channel, Message message) { try { + // 检查通道是否已关闭 + if (!channel.isOpen()) { + log.warn("RabbitMQ通道已关闭,无法拒绝消息,消息ID: {}", getMessageId(message)); + return; + } + // 获取消息ID,优先从标准属性获取,其次从自定义头部获取 String messageId = getMessageId(message); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); log.debug("消息拒绝成功,消息将重新入队,消息ID: {}", messageId); Thread.sleep(1000); - } catch (IOException | InterruptedException e) { - log.error("拒绝消息失败,异常原因:", e); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } + } catch (IOException e) { + log.error("拒绝消息失败,IO异常:", e); + } catch (InterruptedException e) { + log.error("线程中断异常:", e); + Thread.currentThread().interrupt(); + } catch (IllegalStateException e) { + log.warn("拒绝消息失败,通道已关闭:", e); } } diff --git a/pom.xml b/pom.xml index 0deccbb1..2f8428cf 100644 --- a/pom.xml +++ b/pom.xml @@ -321,12 +321,18 @@ 114.132.210.208:8718 - 127.0.0.1 + 114.132.210.208 3306 mall_dev - root - 12345678 + web_dev + Abc654321$^ com.mysql.cj.jdbc.Driver + + + + + + 114.132.210.208 15