消息队列全部出错,接收类型,修复

This commit is contained in:
Jack 2025-07-28 21:46:44 +08:00
parent bcd7df0394
commit 34583e3137
12 changed files with 137 additions and 78 deletions

View File

@ -199,6 +199,7 @@ public class AccountUserBaseController extends BaseControllerImpl {
params.put("password", "");
params.put("user_mobile", userMobile);
params.put("is_merch", "1"); // 是否为商家入驻 1-其他-
params.put("user_is_admin", "2"); // //user_is_admin 用户类型0-普通用户1-管理员2-入驻商户默认是普通用户
if (StrUtil.isNotBlank(cid)) {
params.put("cid", cid); // 个推客户端Id

View File

@ -229,8 +229,10 @@ public class AccountUserBaseServiceImpl extends BaseServiceImpl<AccountUserBaseM
String cid = params.get("cid");
if (StrUtil.isNotBlank(cid)) {
AccountUserBindGeTui accountUserBindGeTui = new AccountUserBindGeTui();
accountUserBindGeTui.setUserId(user_id).setUserType(Convert.toInt(params.get("user_is_admin")))
.setCid(cid).setOsType(Convert.toInt(params.get("os_type")));
accountUserBindGeTui.setUserId(user_id)
.setUserType(Convert.toInt(params.get("user_is_admin")))
.setCid(cid)
.setOsType(Convert.toInt(params.get("os_type")));
accountUserBindGeTuiService.saveAccountUserBindGeTui(accountUserBindGeTui);
}

View File

@ -11,7 +11,7 @@ server:
no-request-timeout: 60000
connection-timeout: 60000
max-http-header-size: 8192
max-http-post-size: 200MB
max-http-post-size: 500MB
spring:
servlet:
multipart:

View File

@ -142,7 +142,7 @@ public interface PayService {
* @param orderReturn
* @return
*/
@PostMapping(value = "/lklPayRefund")
@PostMapping(value = "/admin/pay/payController/lklPayRefund")
ShopOrderReturn lklPayRefund(@RequestBody ShopOrderReturn orderReturn);
// 次卡处理发卡

View File

@ -122,6 +122,9 @@ public class ProductSearchDTO {
@ApiModelProperty(value = "商品id")
private List<Long> product_ids;
@ApiModelProperty(value = "商品id")
private List<Long> item_ids;
@ApiModelProperty(value = "讲师编号")
private Integer lecturer_id;

File diff suppressed because one or more lines are too long

View File

@ -7,7 +7,7 @@ import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -27,8 +27,10 @@ public class RabbitMqConfig {
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setAssumeSupportedContentType(true);
// Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
// converter.setAssumeSupportedContentType(true);
// MessageConverter converter = new StringMessageConverter();
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
@ -49,7 +51,7 @@ public class RabbitMqConfig {
mqMessageService.setMessageStatus(msgId, MqConstant.FAILURE);
});
// 配置序列化配置
rabbitTemplate.setMessageConverter(converter);
rabbitTemplate.setMessageConverter(new SimpleMessageConverter());
return rabbitTemplate;
}
@ -60,7 +62,8 @@ public class RabbitMqConfig {
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMessageConverter(new SimpleMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@ -221,4 +224,4 @@ public class RabbitMqConfig {
.with(MqConstant.DEAD_LETTER_ROUTING_KEY);
}
}
}

View File

@ -1,19 +1,17 @@
package com.suisung.mall.shop.message.service.impl;
import cn.hutool.core.util.IdUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.suisung.mall.common.api.ResultCode;
import com.suisung.mall.common.constant.MqConstant;
import com.suisung.mall.common.exception.ApiException;
import com.suisung.mall.common.modules.mq.MqMessage;
import com.suisung.mall.common.utils.JsonUtil;
import com.suisung.mall.core.web.service.impl.BaseServiceImpl;
import com.suisung.mall.shop.message.mapper.MqMessageMapper;
import com.suisung.mall.shop.message.service.MqMessageService;
import com.suisung.mall.shop.message.vo.MqMessageVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,6 +27,8 @@ import java.util.List;
@Service
public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMessage> implements MqMessageService {
private static final Logger log = LoggerFactory.getLogger(MqMessageServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@ -44,13 +44,14 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
MqMessage mqMessage = new MqMessage();
String uuid = IdUtil.simpleUUID();
mqMessage.setMessage_id(uuid);
mqMessage.setMessage_content(data);
mqMessage.setMessage_content(JsonUtil.toJSONString(data));
mqMessage.setMessage_to_exchane(exchange);
mqMessage.setMessage_routing_key(routing_key);
mqMessage.setMessage_class_type(this.getClass().getSimpleName());
mqMessage.setMessage_status(MqConstant.INIT);
if (!saveOrUpdate(mqMessage)) {
throw new ApiException(ResultCode.FAILED);
log.error("保存消息失败消息ID: {}", uuid);
return;
}
/*
@ -73,10 +74,15 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
// 使用 MessageBuilder 确保设置 messageId
Message message = MessageBuilder.withBody(data.toString().getBytes(StandardCharsets.UTF_8))
.setHeader("messageId", uuid) // 添加自定义头部存储消息ID
// .setContentType("text/plain") // 明确指定内容类型
.setMessageId(uuid) // 设置标准 messageId 属性
.build();
rabbitTemplate.send(exchange, routing_key, message, new CorrelationData(uuid));
try {
rabbitTemplate.send(exchange, routing_key, message, new CorrelationData(uuid));
} catch (Exception e) {
log.error("发送消息失败,交换机: {},路由键: {}消息ID: {}", exchange, routing_key, uuid, e);
}
}
/**
@ -97,8 +103,10 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
msgList.add(mqMessage);
}
if (!saveOrUpdate(msgList)) {
throw new ApiException(ResultCode.FAILED);
log.error("批量保存消息失败");
return;
}
for (MqMessage msgVo : msgList) {
/*
String jsonString = null;
@ -116,14 +124,20 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
rabbitTemplate.convertAndSend(msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), message, new CorrelationData(msgVo.getMessage_id()));
*/
// 使用 MessageBuilder 确保设置 messageId
Message message = MessageBuilder.withBody(msgVo.getMessage_content().toString().getBytes(StandardCharsets.UTF_8))
.setHeader("messageId", msgVo.getMessage_id()) // 添加自定义头部存储消息ID
// .setContentType("text/plain") // 明确指定内容类型
.setMessageId(msgVo.getMessage_id()) // 设置标准 messageId 属性
.build();
rabbitTemplate.send(msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), message, new CorrelationData(msgVo.getMessage_id()));
try {
rabbitTemplate.send(msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), message, new CorrelationData(msgVo.getMessage_id()));
} catch (Exception e) {
log.error("批量发送消息失败,交换机: {},路由键: {}消息ID: {}",
msgVo.getMessage_to_exchane(), msgVo.getMessage_routing_key(), msgVo.getMessage_id(), e);
}
}
}
@ -168,12 +182,13 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
public void sendDelayMessage(String message, long delayMillis, String exchange, String routingKey) {
// 验证延迟时间是否有效
if (delayMillis <= 0) {
throw new IllegalArgumentException("延迟时间必须大于0");
log.warn("延迟时间必须大于0当前延迟时间: {}", delayMillis);
return;
}
// 生成消息ID
String uuid = IdUtil.simpleUUID();
// 创建消息并设置TTL和MessageId
Message delayMessage = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
.setExpiration(String.valueOf(delayMillis)) // 设置消息过期时间(毫秒)
@ -182,7 +197,11 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
.build();
// 发送消息到延迟队列
rabbitTemplate.send(exchange, routingKey, delayMessage, new CorrelationData(uuid));
try {
rabbitTemplate.send(exchange, routingKey, delayMessage, new CorrelationData(uuid));
} catch (Exception e) {
log.error("发送延迟消息失败,交换机: {},路由键: {}消息ID: {}", exchange, routingKey, uuid, e);
}
}
/**
@ -196,22 +215,27 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
public <T> void sendDelayObjectMessage(T obj, long delayMillis) {
// 验证延迟时间是否有效
if (delayMillis <= 0) {
throw new IllegalArgumentException("延迟时间必须大于0");
log.warn("延迟时间必须大于0当前延迟时间: {}", delayMillis);
return;
}
// 发送对象消息设置消息属性
rabbitTemplate.convertAndSend(
MqConstant.DELAY_EXCHANGE_NAME,
MqConstant.DELAY_ROUTING_KEY,
obj,
message -> {
// 为延迟消息设置 messageId
String uuid = IdUtil.simpleUUID();
message.getMessageProperties().setMessageId(uuid);
message.getMessageProperties().setHeader("messageId", uuid);
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return message;
}
);
try {
rabbitTemplate.convertAndSend(
MqConstant.DELAY_EXCHANGE_NAME,
MqConstant.DELAY_ROUTING_KEY,
obj,
message -> {
// 为延迟消息设置 messageId
String uuid = IdUtil.simpleUUID();
message.getMessageProperties().setMessageId(uuid);
message.getMessageProperties().setHeader("messageId", uuid);
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return message;
}
);
} catch (Exception e) {
log.error("发送延迟对象消息失败,对象类型: {},延迟时间: {}", obj.getClass().getSimpleName(), delayMillis, e);
}
}
}

View File

@ -23,6 +23,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 延迟消息接收器处理过期的消息
@ -37,54 +38,64 @@ public class DelayMessageReceiver {
// 消息最大重试次数
private static final int MAX_RETRY_COUNT = 5;
@Lazy
@Resource
private PushMessageService pushMessageService;
@RabbitListener(queues = MqConstant.DEAD_LETTER_QUEUE_NAME)
public void handleExpiredMessage(byte[] data, Channel channel, Message message) {
// 将byte[]转换为String然后调用现有的处理逻辑
String dataStr = new String(data, StandardCharsets.UTF_8);
handleExpiredMessage(dataStr, channel, message);
}
/**
* 监听死信队列处理过期的延迟消息
*
* @param message 消息内容JSON格式
* @param data 消息内容JSON格式
* @param channel RabbitMQ通道用于手动确认消息
* @param msg 消息对象包含消息属性等信息
* @param message 消息对象包含消息属性等信息
*/
@RabbitListener(queues = MqConstant.DEAD_LETTER_QUEUE_NAME)
public void handleExpiredMessage(JSONObject message, Channel channel, Message msg) {
log.info("收到过期消息,开始执行触发方法: {}", message);
public void handleExpiredMessage(String data, Channel channel, Message message) {
log.info("收到过期消息,开始执行触发方法: {}", data);
try {
// 处理死信消息
boolean success = processDeadMessage(message, channel, msg);
boolean success = processDeadMessage(data, channel, message);
if (!success) {
log.warn("处理死信消息失败,消息将重新入队,消息内容: {}", message);
rejectMessage(channel, msg); // 处理失败时拒绝消息并重新入队
log.warn("处理死信消息失败,消息将重新入队,消息内容: {}", data);
rejectMessage(channel, message); // 处理失败时拒绝消息并重新入队
}
ackMessage(channel, msg);
ackMessage(channel, message);
} catch (Exception e) {
log.error("处理过期消息时发生异常,消息将重新入队,消息内容: {}", message, e);
rejectMessage(channel, msg); // 出现异常时拒绝消息并重新入队
log.error("处理过期消息时发生异常,消息将重新入队,消息内容: {}", data, e);
rejectMessage(channel, message); // 出现异常时拒绝消息并重新入队
}
}
/**
* 处理死信队列消息
*
* @param message 消息内容
* @param messageStr 消息内容
* @return 处理结果 true-成功 false-失败
*/
private boolean processDeadMessage(JSONObject message, Channel channel, Message msg) {
log.info("开始处理死信消息: {}", message);
private boolean processDeadMessage(String messageStr, Channel channel, Message msg) {
log.info("开始处理死信消息: {}", messageStr);
// 检查消息是否为空
if (message == null) {
if (StrUtil.isBlank(messageStr)) {
log.warn("收到空消息,无法处理");
return false;
}
try {
// 获取消息分类
JSONObject message = new JSONObject(messageStr);
Integer category = message.getInt("category");
if (category == null) {
log.warn("消息分类为空,无法处理消息: {}", message);
@ -111,7 +122,7 @@ public class DelayMessageReceiver {
return false;
}
} catch (Exception e) {
log.error("处理死信消息时发生异常,消息内容: {}", message, e);
log.error("处理死信消息时发生异常,消息内容: {}, err: {}", messageStr, e);
return false;
}
}

View File

@ -25,6 +25,7 @@ import org.springframework.data.util.Pair;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
@ -56,6 +57,13 @@ public class OrderPayedListener {
@Autowired
private MqMessageService mqMessageService;
@RabbitHandler
public void listener(byte[] data, Channel channel, Message message) throws IOException, InterruptedException {
// 将byte[]转换为String然后调用现有的处理逻辑
String dataStr = new String(data, StandardCharsets.UTF_8);
listener(dataStr, channel, message);
}
@RabbitHandler
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
String messageId = message.getMessageProperties().getMessageId();
@ -128,26 +136,29 @@ public class OrderPayedListener {
}
// 同城配送或普通快递都发送 unipush 推送您有一个新的订单请查收
String orderType = orderInfoOld.getDelivery_type_id() == StateCode.DELIVERY_TYPE_SAME_CITY ? "同城" : "";
String title = String.format("您有一个新的%s订单请注意查收", orderType);
String content = String.format("新%s订单号%s用户在%s下的单请注意查收", orderId, DateTimeUtils.formatDateTime(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
JSONObject payload = new JSONObject();
payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ONLINE_ORDER_LIST);
payload.put("orderId", orderId);
pushMessageService.noticeMerchantEmployeeOrderAction(orderInfoOld.getStore_id(), orderId, title, content, payload);
// 处理异常不抛出以免影响到主流程
try {
// 同城配送或普通快递都发送 unipush 推送您有一个新的订单请查收
String orderType = orderInfoOld.getDelivery_type_id() == StateCode.DELIVERY_TYPE_SAME_CITY ? "同城" : "";
String title = String.format("您有一个新的%s订单请注意查收", orderType);
String content = String.format("新%s订单号%s用户在%s下的单请注意查收", orderId, DateTimeUtils.formatDateTime(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
JSONObject payload = new JSONObject();
payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ONLINE_ORDER_LIST);
payload.put("orderId", orderId);
pushMessageService.noticeMerchantEmployeeOrderAction(orderInfoOld.getStore_id(), orderId, title, content, payload);
// 发送 预过期 MQ 的推送消息
// shopOrderBaseService.preSendExpiredSFOrderPushMessage(orderInfoOld.getStore_id(), orderId, 1500L); // 25分钟发出过期消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("category", MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED); // 消息分类1-订单超时消息
jsonObject.put("orderId", orderId); // 订单ID
jsonObject.put("storeId", orderInfoOld.getStore_id()); // 店铺ID
jsonObject.put("title", "有一笔已超时的订单!"); // 消息标题
jsonObject.put("message", "您有一笔已超时的订单[" + orderId + "],请及时处理。"); // 消息内容
// 发送延迟消息
mqMessageService.sendDelayMessage(jsonObject.toString(), 1500L * 1000); // 转换为毫秒
// 发送 预过期 MQ 的推送消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("category", MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED); // 消息分类1-订单超时消息
jsonObject.put("orderId", orderId); // 订单ID
jsonObject.put("storeId", orderInfoOld.getStore_id()); // 店铺ID
jsonObject.put("title", "有一笔已超时的订单!"); // 消息标题
jsonObject.put("message", String.format("您有一笔已超时的订单%s请及时处理。", orderId)); // 消息内容
// 发送延迟消息
mqMessageService.sendDelayMessage(jsonObject.toString(), 1500L * 1000); // 转换为毫秒
} catch (Exception e) {
log.error("发送推送消息失败:{}", e.getMessage());
}
}
}

View File

@ -11,8 +11,13 @@ server:
no-request-timeout: 60000
connection-timeout: 60000
max-http-header-size: 8192
max-http-post-size: 200MB
max-http-post-size: 500MB
spring:
servlet:
multipart:
enabled: true #开启文件上传
max-file-size: 500MB # 单个文件限制上传大小为500MB
max-request-size: 500MB # 一次请求中所有上传文件总大小限制为500MB
messages:
basename: i18n/messages #配置国际化资源文件路径
encoding: UTF-8

View File

@ -4,7 +4,6 @@ import com.suisung.mall.common.constant.MqConstant;
import com.suisung.mall.shop.message.service.MqMessageService;
import com.suisung.mall.shop.order.service.ShopOrderBaseService;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;