diff --git a/mall-account/src/main/java/com/suisung/mall/account/config/RabbitMqConfig.java b/mall-account/src/main/java/com/suisung/mall/account/config/RabbitMqConfig.java index 52c9b67b..de810c98 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/config/RabbitMqConfig.java +++ b/mall-account/src/main/java/com/suisung/mall/account/config/RabbitMqConfig.java @@ -5,7 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -15,19 +15,29 @@ public class RabbitMqConfig { /** * 反序列化 - * */ @Bean - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); - factory.setMessageConverter(new Jackson2JsonMessageConverter()); + factory.setMessageConverter(new SimpleMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } + @Bean + public SimpleMessageConverter simpleMessageConverter() { + return new SimpleMessageConverter(); + } + + // @Bean +// public Jackson2JsonMessageConverter jackson2JsonMessageConverter() { +// return new Jackson2JsonMessageConverter(); +// } + /** * 默认交换机 + * * @return */ @Bean @@ -37,6 +47,7 @@ public class RabbitMqConfig { /** * 处理用户经验队列配置 + * * @return */ @Bean @@ -47,6 +58,7 @@ public class RabbitMqConfig { /** * 处理用户经验路由键配置 + * * @return */ @Bean @@ -60,6 +72,7 @@ public class RabbitMqConfig { /** * 处理用户详细信息队列配置 + * * @return */ @Bean @@ -70,6 +83,7 @@ public class RabbitMqConfig { /** * 处理用户详细信息路由键配置 + * * @return */ @Bean @@ -83,6 +97,7 @@ public class RabbitMqConfig { /** * 处理用户角色信息队列配置 + * * @return */ @Bean @@ -93,6 +108,7 @@ public class RabbitMqConfig { /** * 处理用户角色信息路由键配置 + * * @return */ @Bean @@ -106,6 +122,7 @@ public class RabbitMqConfig { /** * 处理用户升级信息队列配置 + * * @return */ @Bean @@ -116,6 +133,7 @@ public class RabbitMqConfig { /** * 处理用户升级信息路由键配置 + * * @return */ @Bean diff --git a/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java b/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java index 4385dc57..5e2ff6d5 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java +++ b/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserAnalyticsListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Map; @Service @@ -22,8 +23,15 @@ public class DealUserAnalyticsListener { @Autowired private AccountUserAnalyticsService accountUserAnalyticsService; - + @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { // 检查消息是否已达到最大重试次数 Map headers = message.getMessageProperties().getHeaders(); diff --git a/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserInfoListener.java b/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserInfoListener.java index fa7d4358..29da5a00 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserInfoListener.java +++ b/mall-account/src/main/java/com/suisung/mall/account/listener/DealUserInfoListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; @Service @Slf4j @@ -22,14 +23,14 @@ public class DealUserInfoListener { @Autowired private AccountUserInfoService accountUserInfoService; -// @RabbitHandler -// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { -// // 将byte[]转换为String,然后调用现有的处理逻辑 -// String dataStr = new String(data, StandardCharsets.UTF_8); -// listener(dataStr, channel, message); -// } - @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { AccountUserInfo accountUserInfo = JSONUtil.toBean(data, AccountUserInfo.class); // String messageId = message.getMessageProperties().getMessageId(); diff --git a/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java b/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java index f8b0b3e5..5dbd5d20 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java +++ b/mall-account/src/main/java/com/suisung/mall/account/listener/ExperienceListener.java @@ -13,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; -import java.util.Map; +import java.nio.charset.StandardCharsets; @Service @Slf4j @@ -23,14 +23,14 @@ public class ExperienceListener { @Autowired private AccountUserInfoService accountUserInfoService; -// @RabbitHandler -// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { -// // 将byte[]转换为String,然后调用现有的处理逻辑 -// String dataStr = new String(data, StandardCharsets.UTF_8); -// listener(dataStr, channel, message); -// } - @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { ExperienceTO experienceTO = JSONUtil.toBean(data, ExperienceTO.class); // String messageId = message.getMessageProperties().getMessageId(); diff --git a/mall-account/src/main/java/com/suisung/mall/account/listener/UpgradeUserLevelListener.java b/mall-account/src/main/java/com/suisung/mall/account/listener/UpgradeUserLevelListener.java index fcead46e..57a79e11 100644 --- a/mall-account/src/main/java/com/suisung/mall/account/listener/UpgradeUserLevelListener.java +++ b/mall-account/src/main/java/com/suisung/mall/account/listener/UpgradeUserLevelListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; @Service @Slf4j @@ -22,14 +23,14 @@ public class UpgradeUserLevelListener { @Autowired private AccountUserInfoService accountUserInfoService; -// @RabbitHandler -// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { -// // 将byte[]转换为String,然后调用现有的处理逻辑 -// String dataStr = new String(data, StandardCharsets.UTF_8); -// listener(dataStr, channel, message); -// } - @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { UserLevelTO userLevelTO = JSONUtil.toBean(data, UserLevelTO.class); // String messageId = message.getMessageProperties().getMessageId(); diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/config/RabbitMqConfig.java b/mall-pay/src/main/java/com/suisung/mall/pay/config/RabbitMqConfig.java index f91d4ae8..60aa55fe 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/config/RabbitMqConfig.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/config/RabbitMqConfig.java @@ -5,7 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -15,19 +15,25 @@ public class RabbitMqConfig { /** * 反序列化 - * */ @Bean - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); - factory.setMessageConverter(new Jackson2JsonMessageConverter()); + // factory.setMessageConverter(new Jackson2JsonMessageConverter()); + factory.setMessageConverter(new SimpleMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } + @Bean + public SimpleMessageConverter simpleMessageConverter() { + return new SimpleMessageConverter(); + } + /** * 默认交换机 + * * @return */ @Bean @@ -37,6 +43,7 @@ public class RabbitMqConfig { /** * 处理用户积分队列配置 + * * @return */ @Bean @@ -47,6 +54,7 @@ public class RabbitMqConfig { /** * 处理用户积分路由键配置 + * * @return */ @Bean @@ -60,6 +68,7 @@ public class RabbitMqConfig { /** * 处理用户余额队列配置 + * * @return */ @Bean @@ -70,6 +79,7 @@ public class RabbitMqConfig { /** * 处理用户余额路由键配置 + * * @return */ @Bean @@ -83,6 +93,7 @@ public class RabbitMqConfig { /** * 处理订单交易队列配置 + * * @return */ @Bean @@ -93,6 +104,7 @@ public class RabbitMqConfig { /** * 处理订单交易路由键配置 + * * @return */ @Bean @@ -106,6 +118,7 @@ public class RabbitMqConfig { /** * 处理退款队列配置 + * * @return */ @Bean @@ -116,6 +129,7 @@ public class RabbitMqConfig { /** * 处理退款路由键配置 + * * @return */ @Bean diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayMoneyListener.java b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayMoneyListener.java index 150c020d..51f08918 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayMoneyListener.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayMoneyListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; @Service @Slf4j @@ -23,6 +24,13 @@ public class PayMoneyListener { private PayUserResourceService payUserResourceService; @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { PayMoneyTO payMoneyTO = JSONUtil.toBean(data, PayMoneyTO.class); String messageId = message.getMessageProperties().getMessageId(); diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayPointListener.java b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayPointListener.java index f740f275..698a2c57 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayPointListener.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayPointListener.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; @Service @Slf4j @@ -23,6 +24,13 @@ public class PayPointListener { private PayUserResourceService payUserResourceService; @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { PayPointTO payPointTO = JSONUtil.toBean(data, PayPointTO.class); String messageId = message.getMessageProperties().getMessageId(); @@ -31,15 +39,15 @@ public class PayPointListener { boolean flag = payUserResourceService.points(payPointTO.getUser_id(), payPointTO.getPoints(), payPointTO.getPoints_type_id(), payPointTO.getDesc(), payPointTO.getStore_id(), payPointTO.getUser_id_other(), payPointTO.getOrder_id()); if (flag) { - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { log.error("消息消费失败,执行points异常,当前用户编号:{}", payPointTO.getUser_id()); - channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); Thread.sleep(1000); } } catch (Exception e) { log.error("消息消费失败,执行points异常,当前用户编号:{},失败原因:", payPointTO.getUser_id(), e); - channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); Thread.sleep(1000); } } diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayRefundListener.java b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayRefundListener.java index ce49e739..5d90aee1 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayRefundListener.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayRefundListener.java @@ -14,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; @Service @@ -28,6 +29,13 @@ public class PayRefundListener { private ShopService shopService; @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + doRefund(dataStr, channel, message); + } + + // @RabbitHandler public void doRefund(String data, Channel channel, Message message) throws IOException, InterruptedException { List order_ids = Convert.toList(String.class, data); List returnList = shopService.findByOrderIds(order_ids); diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayTradeListener.java b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayTradeListener.java index 8fd09367..723d9f45 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayTradeListener.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/listener/PayTradeListener.java @@ -14,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Map; @Service @@ -25,6 +26,13 @@ public class PayTradeListener { private PayConsumeTradeService payConsumeTradeService; @RabbitHandler + public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException { + // 将byte[]转换为String,然后调用现有的处理逻辑 + String dataStr = new String(data, StandardCharsets.UTF_8); + listener(dataStr, channel, message); + } + + // @RabbitHandler public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException { Map params = JSONUtil.toBean(data, Map.class); String messageId = message.getMessageProperties().getMessageId(); @@ -36,15 +44,15 @@ public class PayTradeListener { params.remove("order_state_id"); boolean flag = payConsumeTradeService.edit(trade, MybatisPlusQueryUtil.getQueryWrapper(PayConsumeTrade.class, params)); if (flag) { - channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { log.error("消息消费失败,执行payTrade异常,当前订单编号:{}", params.get("order_id:in")); - channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); Thread.sleep(1000); } } catch (Exception e) { log.error("消息消费失败,执行payTrade异常,当前订单编号:{},异常原因:", params.get("order_id:in"), e); - channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); Thread.sleep(1000); } } diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/LakalaPayServiceImpl.java b/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/LakalaPayServiceImpl.java index e30fc49e..306c1458 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/LakalaPayServiceImpl.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/LakalaPayServiceImpl.java @@ -262,10 +262,10 @@ public class LakalaPayServiceImpl implements LakalaPayService { accBusiFields.put("user_id", openId); // 用户openid reqData.put("acc_busi_fields", accBusiFields); - // 重要约定,订单号规则:商品订单:ORD_订单号,运费订单:DF_订单号 + // 重要约定,订单号规则:商品订单:ORD-订单号,运费订单:DF-订单号 // 分单信息 JSONObject goodsSplitInfo = new JSONObject(); - goodsSplitInfo.put("out_sub_trade_no", "ORD_" + orderId); // 子订单号 + goodsSplitInfo.put("out_sub_trade_no", "ORD-" + orderId); // 子订单号 goodsSplitInfo.put("merchant_no", merchantNo); // 分账商户号 goodsSplitInfo.put("term_no", termNo); // 分账终端号 int totalAmountInt = Convert.toInt(totalAmount) - Convert.toInt(agentAmount); @@ -274,7 +274,7 @@ public class LakalaPayServiceImpl implements LakalaPayService { goodsSplitInfo.put("sub_remark", "商品订单金额"); // 子单备注信息 JSONObject deliverySplitInfo = new JSONObject(); - deliverySplitInfo.put("out_sub_trade_no", "DF_" + orderId); // 子订单号 + deliverySplitInfo.put("out_sub_trade_no", "DF-" + orderId); // 子订单号 deliverySplitInfo.put("merchant_no", agentMerchantNo); // 分账商户号 deliverySplitInfo.put("term_no", agentTermNo); // 分账终端号 deliverySplitInfo.put("amount", agentAmount); // 分账金额 diff --git a/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/PayConsumeDepositServiceImpl.java b/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/PayConsumeDepositServiceImpl.java index b845cb23..8543c023 100644 --- a/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/PayConsumeDepositServiceImpl.java +++ b/mall-pay/src/main/java/com/suisung/mall/pay/service/impl/PayConsumeDepositServiceImpl.java @@ -198,7 +198,7 @@ public class PayConsumeDepositServiceImpl extends BaseServiceImpl authMap = LakalaUtil.getLakalaAuthorizationMap(authorization); @@ -1421,8 +1424,14 @@ public class PayUserPayServiceImpl extends BaseServiceImpl order_id_row = Convert.toList(String.class, data); try {