更改消息推送的类型为 字符串 数据
This commit is contained in:
parent
bfa4b3669d
commit
4f4aff603d
@ -5,7 +5,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.core.*;
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
import org.springframework.amqp.support.converter.SimpleMessageConverter;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
@ -15,19 +15,29 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 反序列化
|
* 反序列化
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
|
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
|
||||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
factory.setConnectionFactory(connectionFactory);
|
factory.setConnectionFactory(connectionFactory);
|
||||||
factory.setMessageConverter(new Jackson2JsonMessageConverter());
|
factory.setMessageConverter(new SimpleMessageConverter());
|
||||||
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleMessageConverter simpleMessageConverter() {
|
||||||
|
return new SimpleMessageConverter();
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Bean
|
||||||
|
// public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
|
||||||
|
// return new Jackson2JsonMessageConverter();
|
||||||
|
// }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 默认交换机
|
* 默认交换机
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -37,6 +47,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户经验队列配置
|
* 处理用户经验队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -47,6 +58,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户经验路由键配置
|
* 处理用户经验路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -60,6 +72,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户详细信息队列配置
|
* 处理用户详细信息队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -70,6 +83,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户详细信息路由键配置
|
* 处理用户详细信息路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -83,6 +97,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户角色信息队列配置
|
* 处理用户角色信息队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -93,6 +108,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户角色信息路由键配置
|
* 处理用户角色信息路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -106,6 +122,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户升级信息队列配置
|
* 处理用户升级信息队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -116,6 +133,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户升级信息路由键配置
|
* 处理用户升级信息路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -22,8 +23,15 @@ public class DealUserAnalyticsListener {
|
|||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private AccountUserAnalyticsService accountUserAnalyticsService;
|
private AccountUserAnalyticsService accountUserAnalyticsService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
// 检查消息是否已达到最大重试次数
|
// 检查消息是否已达到最大重试次数
|
||||||
Map<String, Object> headers = message.getMessageProperties().getHeaders();
|
Map<String, Object> headers = message.getMessageProperties().getHeaders();
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -22,14 +23,14 @@ public class DealUserInfoListener {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private AccountUserInfoService accountUserInfoService;
|
private AccountUserInfoService accountUserInfoService;
|
||||||
|
|
||||||
// @RabbitHandler
|
|
||||||
// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
|
||||||
// // 将byte[]转换为String,然后调用现有的处理逻辑
|
|
||||||
// String dataStr = new String(data, StandardCharsets.UTF_8);
|
|
||||||
// listener(dataStr, channel, message);
|
|
||||||
// }
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
AccountUserInfo accountUserInfo = JSONUtil.toBean(data, AccountUserInfo.class);
|
AccountUserInfo accountUserInfo = JSONUtil.toBean(data, AccountUserInfo.class);
|
||||||
// String messageId = message.getMessageProperties().getMessageId();
|
// String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -23,14 +23,14 @@ public class ExperienceListener {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private AccountUserInfoService accountUserInfoService;
|
private AccountUserInfoService accountUserInfoService;
|
||||||
|
|
||||||
// @RabbitHandler
|
|
||||||
// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
|
||||||
// // 将byte[]转换为String,然后调用现有的处理逻辑
|
|
||||||
// String dataStr = new String(data, StandardCharsets.UTF_8);
|
|
||||||
// listener(dataStr, channel, message);
|
|
||||||
// }
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
ExperienceTO experienceTO = JSONUtil.toBean(data, ExperienceTO.class);
|
ExperienceTO experienceTO = JSONUtil.toBean(data, ExperienceTO.class);
|
||||||
// String messageId = message.getMessageProperties().getMessageId();
|
// String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -22,14 +23,14 @@ public class UpgradeUserLevelListener {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private AccountUserInfoService accountUserInfoService;
|
private AccountUserInfoService accountUserInfoService;
|
||||||
|
|
||||||
// @RabbitHandler
|
|
||||||
// public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
|
||||||
// // 将byte[]转换为String,然后调用现有的处理逻辑
|
|
||||||
// String dataStr = new String(data, StandardCharsets.UTF_8);
|
|
||||||
// listener(dataStr, channel, message);
|
|
||||||
// }
|
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
UserLevelTO userLevelTO = JSONUtil.toBean(data, UserLevelTO.class);
|
UserLevelTO userLevelTO = JSONUtil.toBean(data, UserLevelTO.class);
|
||||||
// String messageId = message.getMessageProperties().getMessageId();
|
// String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
|||||||
@ -5,7 +5,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.core.*;
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
import org.springframework.amqp.support.converter.SimpleMessageConverter;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
@ -15,19 +15,25 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 反序列化
|
* 反序列化
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
|
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
|
||||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
factory.setConnectionFactory(connectionFactory);
|
factory.setConnectionFactory(connectionFactory);
|
||||||
factory.setMessageConverter(new Jackson2JsonMessageConverter());
|
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
|
||||||
|
factory.setMessageConverter(new SimpleMessageConverter());
|
||||||
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleMessageConverter simpleMessageConverter() {
|
||||||
|
return new SimpleMessageConverter();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 默认交换机
|
* 默认交换机
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -37,6 +43,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户积分队列配置
|
* 处理用户积分队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -47,6 +54,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户积分路由键配置
|
* 处理用户积分路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -60,6 +68,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户余额队列配置
|
* 处理用户余额队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -70,6 +79,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户余额路由键配置
|
* 处理用户余额路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -83,6 +93,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理订单交易队列配置
|
* 处理订单交易队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -93,6 +104,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理订单交易路由键配置
|
* 处理订单交易路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -106,6 +118,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理退款队列配置
|
* 处理退款队列配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
@ -116,6 +129,7 @@ public class RabbitMqConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理退款路由键配置
|
* 处理退款路由键配置
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -23,6 +24,13 @@ public class PayMoneyListener {
|
|||||||
private PayUserResourceService payUserResourceService;
|
private PayUserResourceService payUserResourceService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
PayMoneyTO payMoneyTO = JSONUtil.toBean(data, PayMoneyTO.class);
|
PayMoneyTO payMoneyTO = JSONUtil.toBean(data, PayMoneyTO.class);
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
|||||||
@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -23,6 +24,13 @@ public class PayPointListener {
|
|||||||
private PayUserResourceService payUserResourceService;
|
private PayUserResourceService payUserResourceService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
PayPointTO payPointTO = JSONUtil.toBean(data, PayPointTO.class);
|
PayPointTO payPointTO = JSONUtil.toBean(data, PayPointTO.class);
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
@ -31,15 +39,15 @@ public class PayPointListener {
|
|||||||
boolean flag = payUserResourceService.points(payPointTO.getUser_id(), payPointTO.getPoints(), payPointTO.getPoints_type_id(),
|
boolean flag = payUserResourceService.points(payPointTO.getUser_id(), payPointTO.getPoints(), payPointTO.getPoints_type_id(),
|
||||||
payPointTO.getDesc(), payPointTO.getStore_id(), payPointTO.getUser_id_other(), payPointTO.getOrder_id());
|
payPointTO.getDesc(), payPointTO.getStore_id(), payPointTO.getUser_id_other(), payPointTO.getOrder_id());
|
||||||
if (flag) {
|
if (flag) {
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
} else {
|
} else {
|
||||||
log.error("消息消费失败,执行points异常,当前用户编号:{}", payPointTO.getUser_id());
|
log.error("消息消费失败,执行points异常,当前用户编号:{}", payPointTO.getUser_id());
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("消息消费失败,执行points异常,当前用户编号:{},失败原因:", payPointTO.getUser_id(), e);
|
log.error("消息消费失败,执行points异常,当前用户编号:{},失败原因:", payPointTO.getUser_id(), e);
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -28,6 +29,13 @@ public class PayRefundListener {
|
|||||||
private ShopService shopService;
|
private ShopService shopService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
doRefund(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void doRefund(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void doRefund(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
List<String> order_ids = Convert.toList(String.class, data);
|
List<String> order_ids = Convert.toList(String.class, data);
|
||||||
List<ShopOrderReturn> returnList = shopService.findByOrderIds(order_ids);
|
List<ShopOrderReturn> returnList = shopService.findByOrderIds(order_ids);
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -25,6 +26,13 @@ public class PayTradeListener {
|
|||||||
private PayConsumeTradeService payConsumeTradeService;
|
private PayConsumeTradeService payConsumeTradeService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
|
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
|
// 将byte[]转换为String,然后调用现有的处理逻辑
|
||||||
|
String dataStr = new String(data, StandardCharsets.UTF_8);
|
||||||
|
listener(dataStr, channel, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
Map params = JSONUtil.toBean(data, Map.class);
|
Map params = JSONUtil.toBean(data, Map.class);
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
@ -36,15 +44,15 @@ public class PayTradeListener {
|
|||||||
params.remove("order_state_id");
|
params.remove("order_state_id");
|
||||||
boolean flag = payConsumeTradeService.edit(trade, MybatisPlusQueryUtil.getQueryWrapper(PayConsumeTrade.class, params));
|
boolean flag = payConsumeTradeService.edit(trade, MybatisPlusQueryUtil.getQueryWrapper(PayConsumeTrade.class, params));
|
||||||
if (flag) {
|
if (flag) {
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
} else {
|
} else {
|
||||||
log.error("消息消费失败,执行payTrade异常,当前订单编号:{}", params.get("order_id:in"));
|
log.error("消息消费失败,执行payTrade异常,当前订单编号:{}", params.get("order_id:in"));
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("消息消费失败,执行payTrade异常,当前订单编号:{},异常原因:", params.get("order_id:in"), e);
|
log.error("消息消费失败,执行payTrade异常,当前订单编号:{},异常原因:", params.get("order_id:in"), e);
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -262,10 +262,10 @@ public class LakalaPayServiceImpl implements LakalaPayService {
|
|||||||
accBusiFields.put("user_id", openId); // 用户openid
|
accBusiFields.put("user_id", openId); // 用户openid
|
||||||
reqData.put("acc_busi_fields", accBusiFields);
|
reqData.put("acc_busi_fields", accBusiFields);
|
||||||
|
|
||||||
// 重要约定,订单号规则:商品订单:ORD_订单号,运费订单:DF_订单号
|
// 重要约定,订单号规则:商品订单:ORD-订单号,运费订单:DF-订单号
|
||||||
// 分单信息
|
// 分单信息
|
||||||
JSONObject goodsSplitInfo = new JSONObject();
|
JSONObject goodsSplitInfo = new JSONObject();
|
||||||
goodsSplitInfo.put("out_sub_trade_no", "ORD_" + orderId); // 子订单号
|
goodsSplitInfo.put("out_sub_trade_no", "ORD-" + orderId); // 子订单号
|
||||||
goodsSplitInfo.put("merchant_no", merchantNo); // 分账商户号
|
goodsSplitInfo.put("merchant_no", merchantNo); // 分账商户号
|
||||||
goodsSplitInfo.put("term_no", termNo); // 分账终端号
|
goodsSplitInfo.put("term_no", termNo); // 分账终端号
|
||||||
int totalAmountInt = Convert.toInt(totalAmount) - Convert.toInt(agentAmount);
|
int totalAmountInt = Convert.toInt(totalAmount) - Convert.toInt(agentAmount);
|
||||||
@ -274,7 +274,7 @@ public class LakalaPayServiceImpl implements LakalaPayService {
|
|||||||
goodsSplitInfo.put("sub_remark", "商品订单金额"); // 子单备注信息
|
goodsSplitInfo.put("sub_remark", "商品订单金额"); // 子单备注信息
|
||||||
|
|
||||||
JSONObject deliverySplitInfo = new JSONObject();
|
JSONObject deliverySplitInfo = new JSONObject();
|
||||||
deliverySplitInfo.put("out_sub_trade_no", "DF_" + orderId); // 子订单号
|
deliverySplitInfo.put("out_sub_trade_no", "DF-" + orderId); // 子订单号
|
||||||
deliverySplitInfo.put("merchant_no", agentMerchantNo); // 分账商户号
|
deliverySplitInfo.put("merchant_no", agentMerchantNo); // 分账商户号
|
||||||
deliverySplitInfo.put("term_no", agentTermNo); // 分账终端号
|
deliverySplitInfo.put("term_no", agentTermNo); // 分账终端号
|
||||||
deliverySplitInfo.put("amount", agentAmount); // 分账金额
|
deliverySplitInfo.put("amount", agentAmount); // 分账金额
|
||||||
|
|||||||
@ -198,7 +198,7 @@ public class PayConsumeDepositServiceImpl extends BaseServiceImpl<PayConsumeDepo
|
|||||||
}
|
}
|
||||||
|
|
||||||
String order_id = notify_row.getOrder_id();
|
String order_id = notify_row.getOrder_id();
|
||||||
String deposit_trade_no = notify_row.getDeposit_trade_no();
|
// String deposit_trade_no = notify_row.getDeposit_trade_no();
|
||||||
|
|
||||||
notify_row.setDeposit_subject(StrUtil.trim(notify_row.getDeposit_subject()));
|
notify_row.setDeposit_subject(StrUtil.trim(notify_row.getDeposit_subject()));
|
||||||
|
|
||||||
@ -574,6 +574,7 @@ public class PayConsumeDepositServiceImpl extends BaseServiceImpl<PayConsumeDepo
|
|||||||
}*/
|
}*/
|
||||||
if (CollUtil.isNotEmpty(paid_order_id_row)) {
|
if (CollUtil.isNotEmpty(paid_order_id_row)) {
|
||||||
// 发送消息给 RabbitMQ
|
// 发送消息给 RabbitMQ
|
||||||
|
// JSONObject jsonObj = new JSONObject().set("paid_order_id_row", paid_order_id_row);
|
||||||
CommonResult res = shopService.sendMqMsg(MqConstant.SHOP_EXCHANGE, MqConstant.SHOP_PAIDYES_ROUTING_KEY, paid_order_id_row.toString());
|
CommonResult res = shopService.sendMqMsg(MqConstant.SHOP_EXCHANGE, MqConstant.SHOP_PAIDYES_ROUTING_KEY, paid_order_id_row.toString());
|
||||||
res.checkFenResult();
|
res.checkFenResult();
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@ -1358,17 +1358,20 @@ public class PayUserPayServiceImpl extends BaseServiceImpl<PayUserPayMapper, Pay
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 敏感头信息脱敏打印
|
// 敏感头信息脱敏打印
|
||||||
logger.debug("拉卡拉支付异步通知回调 body:{} \n authorization: {}", body, authorization);
|
logger.info("拉卡拉支付异步通知回调 body:{} \n authorization: {}", body, authorization);
|
||||||
// 异步通知返回的body json数据:{"out_trade_no":"202203151637334864280014","trade_no":"2022031566210203291925","log_no":"66210203291925",
|
// 异步通知返回的body json数据:{"out_trade_no":"202203151637334864280014","trade_no":"2022031566210203291925","log_no":"66210203291925",
|
||||||
// "acc_trade_no":"2022031522001483661454130929 ","trade_status":"SUCCESS","trade_state":"SUCCESS","total_amount":"1",
|
// "acc_trade_no":"2022031522001483661454130929 ","trade_status":"SUCCESS","trade_state":"SUCCESS","total_amount":"1",
|
||||||
// "payer_amount":"1","acc_settle_amount":"1","trade_time":"20220315163808","user_id1":"app***@163.com",
|
// "payer_amount":"1","acc_settle_amount":"1","trade_time":"20220315163808","user_id1":"app***@163.com",
|
||||||
// "user_id2":"2088432881453660","notify_url":"https://www.baidu.com","account_type":"ALIPAY","card_type":"99"}
|
// "user_id2":"2088432881453660","notify_url":"https://www.baidu.com","account_type":"ALIPAY","card_type":"99"}
|
||||||
|
|
||||||
|
// 合单返回的数据:{"out_trade_no":"DD-20250830-10","trade_no":"20250830110113130266250034160499","log_no":"66250034160499","acc_trade_no":"4200002826202508306761393882","trade_status":"SUCCESS","trade_state":"SUCCESS","total_amount":"2","payer_amount":"2","acc_settle_amount":"2","acc_mdiscount_amount":"0","acc_discount_amount":"0","trade_time":"20250830180435","user_id1":"oDVKR7T0qxg6O8tqIL9SgY6LXqqQ","user_id2":"oVxsc1QRAqDRv_gAmXuLZwSVSL18","notify_url":"https://mall.gpxscs.cn/mobile/pay/index/lkl_wxPay_notify_url","account_type":"WECHAT","bank_type":"OTHERS","card_type":"02","merchant_no":"8226330541100GU","remark":"","sub_mch_id":"803819329","out_split_rsp_infos":[{"sub_trade_no":"20250830110113130266250034112794","sub_log_no":"66250034112794","out_sub_trade_no":"ORD_DD-20250830-10","merchant_no":"8226330541100GU","term_no":"N5817779","amount":"1","settle_type":"0"},{"sub_trade_no":"20250830110113130266250034160498","sub_log_no":"66250034160498","out_sub_trade_no":"DF_DD-20250830-10","merchant_no":"822584059990FYP","term_no":"N5811590","amount":"1","settle_type":"0"}],"trade_req_date":"20250830","gb_amount":"","qb_amount":""}
|
||||||
|
|
||||||
// 解析JSON格式响应
|
// 解析JSON格式响应
|
||||||
cn.hutool.json.JSONObject lklNotifyRespJSON = JSONUtil.parseObj(body);
|
cn.hutool.json.JSONObject lklNotifyRespJSON = JSONUtil.parseObj(body);
|
||||||
params = Convert.toMap(String.class, String.class, lklNotifyRespJSON);
|
params = Convert.toMap(String.class, String.class, lklNotifyRespJSON);
|
||||||
String orderId = params.getOrDefault("out_trade_no", "");
|
String orderId = params.getOrDefault("out_trade_no", "");
|
||||||
String accTradeNo = params.getOrDefault("acc_trade_no", ""); // 需要跟拉卡拉确认这个字段是原支付交易对应的微信订单号吗?
|
String accTradeNo = params.getOrDefault("acc_trade_no", ""); // 需要跟拉卡拉确认这个字段是原支付交易对应的微信订单号吗?
|
||||||
|
String outSplitRspInfos = params.getOrDefault("out_split_rsp_infos", ""); // 拉卡拉合单订单信息
|
||||||
|
|
||||||
// 提取授权签名信息
|
// 提取授权签名信息
|
||||||
Map<String, String> authMap = LakalaUtil.getLakalaAuthorizationMap(authorization);
|
Map<String, String> authMap = LakalaUtil.getLakalaAuthorizationMap(authorization);
|
||||||
@ -1421,8 +1424,14 @@ public class PayUserPayServiceImpl extends BaseServiceImpl<PayUserPayMapper, Pay
|
|||||||
payConsumeDeposit.setDeposit_body(orderSubject);
|
payConsumeDeposit.setDeposit_body(orderSubject);
|
||||||
payConsumeDeposit.setUser_id(userId);
|
payConsumeDeposit.setUser_id(userId);
|
||||||
|
|
||||||
|
// 拉卡拉订单合单信息
|
||||||
|
if (StrUtil.isNotBlank(outSplitRspInfos)) {
|
||||||
|
payConsumeDeposit.setLkl_combine_params(outSplitRspInfos);
|
||||||
|
}
|
||||||
|
|
||||||
// 判断是否联合支付
|
// 判断是否联合支付
|
||||||
PayConsumeTradeCombine tradeCombine = payConsumeTradeCombineService.get(orderId);
|
PayConsumeTradeCombine tradeCombine = payConsumeTradeCombineService.get(orderId);
|
||||||
|
|
||||||
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
|
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
|
||||||
try {
|
try {
|
||||||
// 修改 拉卡拉的订单记录 shop_order_lkl
|
// 修改 拉卡拉的订单记录 shop_order_lkl
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.core.*;
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
import org.springframework.amqp.support.converter.SimpleMessageConverter;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
@ -65,11 +65,22 @@ public class RabbitMqConfig {
|
|||||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
|
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
|
||||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
factory.setConnectionFactory(connectionFactory);
|
factory.setConnectionFactory(connectionFactory);
|
||||||
factory.setMessageConverter(new Jackson2JsonMessageConverter());
|
factory.setMessageConverter(new SimpleMessageConverter());
|
||||||
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleMessageConverter simpleMessageConverter() {
|
||||||
|
return new SimpleMessageConverter();
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Bean
|
||||||
|
// public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
|
||||||
|
// return new Jackson2JsonMessageConverter();
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 反序列化
|
* 反序列化
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import org.springframework.stereotype.Component;
|
|||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 延迟消息接收器,处理过期的消息
|
* 延迟消息接收器,处理过期的消息
|
||||||
@ -58,6 +59,8 @@ public class DelayMessageReceiver {
|
|||||||
String messageStr;
|
String messageStr;
|
||||||
if (data instanceof String) {
|
if (data instanceof String) {
|
||||||
messageStr = (String) data;
|
messageStr = (String) data;
|
||||||
|
} else if (data instanceof byte[]) {
|
||||||
|
messageStr = new String((byte[]) data, StandardCharsets.UTF_8);
|
||||||
} else {
|
} else {
|
||||||
// 如果是对象,则转换为JSON字符串
|
// 如果是对象,则转换为JSON字符串
|
||||||
messageStr = new JSONObject(data).toString();
|
messageStr = new JSONObject(data).toString();
|
||||||
@ -232,7 +235,7 @@ public class DelayMessageReceiver {
|
|||||||
log.warn("RabbitMQ通道已关闭,无法确认消息,消息ID: {}", getMessageId(message));
|
log.warn("RabbitMQ通道已关闭,无法确认消息,消息ID: {}", getMessageId(message));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取消息ID,优先从标准属性获取,其次从自定义头部获取
|
// 获取消息ID,优先从标准属性获取,其次从自定义头部获取
|
||||||
String messageId = getMessageId(message);
|
String messageId = getMessageId(message);
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
@ -257,7 +260,7 @@ public class DelayMessageReceiver {
|
|||||||
log.warn("RabbitMQ通道已关闭,无法拒绝消息,消息ID: {}", getMessageId(message));
|
log.warn("RabbitMQ通道已关闭,无法拒绝消息,消息ID: {}", getMessageId(message));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取消息ID,优先从标准属性获取,其次从自定义头部获取
|
// 获取消息ID,优先从标准属性获取,其次从自定义头部获取
|
||||||
String messageId = getMessageId(message);
|
String messageId = getMessageId(message);
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
|
|||||||
@ -30,7 +30,7 @@ public class MessageListener {
|
|||||||
listener(dataStr, channel, message);
|
listener(dataStr, channel, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
@RabbitHandler
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) {
|
public void listener(String data, Channel channel, Message message) {
|
||||||
MsgTO msgTO = JSONUtil.toBean(data, MsgTO.class);
|
MsgTO msgTO = JSONUtil.toBean(data, MsgTO.class);
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package com.suisung.mall.shop.order.listener;
|
package com.suisung.mall.shop.order.listener;
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
import cn.hutool.core.convert.Convert;
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.json.JSONObject;
|
import cn.hutool.json.JSONObject;
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import com.suisung.mall.common.api.StateCode;
|
import com.suisung.mall.common.api.StateCode;
|
||||||
@ -64,9 +65,13 @@ public class OrderPayedListener {
|
|||||||
listener(dataStr, channel, message);
|
listener(dataStr, channel, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
@RabbitHandler
|
// @RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
||||||
// String messageId = message.getMessageProperties().getMessageId();
|
// String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
if (StrUtil.isBlank(data)) {
|
||||||
|
logger.info("收到空订单消息");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
List<String> order_id_row = Convert.toList(String.class, data);
|
List<String> order_id_row = Convert.toList(String.class, data);
|
||||||
try {
|
try {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user