订单过期发推送
This commit is contained in:
parent
958786cdf1
commit
38f0feb5b5
@ -41,4 +41,25 @@ public class MqConstant {
|
|||||||
public static final String ACCOUNT_UPGRADE_QUEUE = "account.upgrade.queue"; // 用户升级处理队列
|
public static final String ACCOUNT_UPGRADE_QUEUE = "account.upgrade.queue"; // 用户升级处理队列
|
||||||
public static final String ACCOUNT_UPGRADE_ROUTING_KEY = "account.upgrade_routing_key"; // 用户用户升级队列路由键
|
public static final String ACCOUNT_UPGRADE_ROUTING_KEY = "account.upgrade_routing_key"; // 用户用户升级队列路由键
|
||||||
|
|
||||||
|
// RabbitMQ 配置类,实现基于 TTL + 死信队列的延迟任务
|
||||||
|
// 延迟交换机名称
|
||||||
|
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
|
||||||
|
// 延迟队列名称
|
||||||
|
public static final String DELAY_QUEUE_NAME = "delay.queue";
|
||||||
|
// 延迟队列路由键
|
||||||
|
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
|
||||||
|
|
||||||
|
// 死信交换机名称
|
||||||
|
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
|
||||||
|
// 死信队列名称
|
||||||
|
public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
|
||||||
|
// 死信队列路由键
|
||||||
|
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
|
||||||
|
|
||||||
|
// 死信队列事件类型:1-顺丰订单超时;2-预下单;
|
||||||
|
public static final Integer DEAD_EVENT_CATE_ORDER_EXPIRED = 1;
|
||||||
|
// 死信队列事件类型:1-顺丰订单超时;2-预下单;
|
||||||
|
public static final Integer DEAD_EVENT_CATE_PRE_ORDER = 2;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -306,7 +306,7 @@ public class ShopActivityCutpriceServiceImpl extends BaseServiceImpl<ShopActivit
|
|||||||
|
|
||||||
activity_row.put("activity_rule", JSONUtil.parseObj(activity_row.get("activity_rule")));
|
activity_row.put("activity_rule", JSONUtil.parseObj(activity_row.get("activity_rule")));
|
||||||
|
|
||||||
// 砍价订单有效期时长
|
// // 砍价订单有效期时长
|
||||||
// if (cutprice_row != null && cutprice_row.getAc_datetime() != null) {
|
// if (cutprice_row != null && cutprice_row.getAc_datetime() != null) {
|
||||||
// Float order_cutprice_time = accountBaseConfigService.getConfig("order_cutprice_time", 3f);
|
// Float order_cutprice_time = accountBaseConfigService.getConfig("order_cutprice_time", 3f);
|
||||||
// int second = NumberUtil.mul(order_cutprice_time, 24, 60, 60).intValue();
|
// int second = NumberUtil.mul(order_cutprice_time, 24, 60, 60).intValue();
|
||||||
|
|||||||
@ -1,120 +0,0 @@
|
|||||||
/*
|
|
||||||
package com.suisung.mall.shop.config;
|
|
||||||
|
|
||||||
import cn.hutool.core.convert.Convert;
|
|
||||||
import com.suisung.mall.shop.page.service.OssService;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
|
|
||||||
import java.io.*;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
*/
|
|
||||||
/**
|
|
||||||
* Author: Lwg
|
|
||||||
* Description: Batch replace project fixed path address
|
|
||||||
*//*
|
|
||||||
|
|
||||||
public class PathReplaceUtil {
|
|
||||||
|
|
||||||
@Value("${aliyun.oss.dir.prefix}")
|
|
||||||
private static String ALIYUN_OSS_DIR_PREFIX;
|
|
||||||
|
|
||||||
private static OssService ossService;
|
|
||||||
|
|
||||||
static {
|
|
||||||
ossService = SpringUtil.getBean(OssService.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
String directoryPath = "E:\\xxx\\xxx"; // 目录路径
|
|
||||||
String regex = "https://test.lancerdt.com/.*?.jpg"; // 正则匹配格式
|
|
||||||
File directory = new File(directoryPath);
|
|
||||||
if (directory.exists() && directory.isDirectory()) { // 判断目录是否存在并且是一个目录
|
|
||||||
recursiveExtract(directory, regex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 递归提取目录及其子目录中的所有文件中的图片地址
|
|
||||||
private static void recursiveExtract(File directory, String regex) {
|
|
||||||
File[] files = directory.listFiles();
|
|
||||||
for (File file : files) {
|
|
||||||
if (file.getName().equals(".svn") || file.getName().equals(".git") || file.getName().equals(".vscode") || file.getName().equals(".idea")) { // 排除目录
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (file.isFile()) {
|
|
||||||
try {
|
|
||||||
extractImageUrls(file, regex);
|
|
||||||
} catch (IOException e) {
|
|
||||||
// 读取文件失败,忽略此文件并继续处理下一个文件
|
|
||||||
System.err.println("Failed to read file: " + file.getAbsolutePath());
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
} else if (file.isDirectory()) {
|
|
||||||
recursiveExtract(file, regex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 提取文件中的图片地址,并上传图片并返回新的图片地址替换
|
|
||||||
private static String extractImageUrls(File file, String regex) throws IOException {
|
|
||||||
BufferedReader reader = new BufferedReader(new FileReader(file));
|
|
||||||
StringBuilder content = new StringBuilder();
|
|
||||||
String line;
|
|
||||||
while ((line = reader.readLine()) != null) {
|
|
||||||
Matcher matcher = Pattern.compile(regex).matcher(line);
|
|
||||||
while (matcher.find()) {
|
|
||||||
String newUrl = uploadFile(matcher.group());
|
|
||||||
if (newUrl != null && newUrl.equals("error_continue")) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
line = line.replace(matcher.group(), newUrl);
|
|
||||||
}
|
|
||||||
content.append(line).append("\n");
|
|
||||||
}
|
|
||||||
reader.close();
|
|
||||||
String newContent = content.toString();
|
|
||||||
FileWriter writer = new FileWriter(file);
|
|
||||||
writer.write(newContent);
|
|
||||||
writer.close();
|
|
||||||
return newContent;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String uploadFile(String urlImage) {
|
|
||||||
//上传文件测试
|
|
||||||
InputStream inputStream = null;
|
|
||||||
try {
|
|
||||||
inputStream = OssUtils.urlToInputSteam(urlImage);
|
|
||||||
} catch (Exception e) {
|
|
||||||
System.err.println("图片已失效...请先移除文件或恢复此文件地址" + urlImage);
|
|
||||||
return "error_continue";
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
URI uri = new URI(urlImage);
|
|
||||||
String dir = uri.getPath(); // 获取路径
|
|
||||||
String uploadName = dir.substring(dir.lastIndexOf("/") + 1); // 获取文件名
|
|
||||||
|
|
||||||
// 获取字节大小
|
|
||||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
|
||||||
int nRead;
|
|
||||||
byte[] data = new byte[1024];
|
|
||||||
while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
|
|
||||||
buffer.write(data, 0, nRead);
|
|
||||||
}
|
|
||||||
inputStream.close();
|
|
||||||
byte[] byteArray = buffer.toByteArray();
|
|
||||||
Long file_size = Convert.toLong(byteArray.length);
|
|
||||||
|
|
||||||
return ossService.uploadObject2OSS(null, ALIYUN_OSS_DIR_PREFIX.concat("/").concat(dir).concat(uploadName), inputStream, uploadName, file_size);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
return "error_continue";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
@ -12,6 +12,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RabbitMqConfig {
|
public class RabbitMqConfig {
|
||||||
@ -147,4 +150,75 @@ public class RabbitMqConfig {
|
|||||||
null);
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明延迟交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public DirectExchange delayExchange() {
|
||||||
|
// durable: true 持久化交换机
|
||||||
|
return ExchangeBuilder.directExchange(MqConstant.DELAY_EXCHANGE_NAME)
|
||||||
|
.durable(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明死信交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public DirectExchange deadLetterExchange() {
|
||||||
|
return ExchangeBuilder.directExchange(MqConstant.DEAD_LETTER_EXCHANGE_NAME)
|
||||||
|
.durable(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明延迟队列,并绑定死信配置
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Queue delayQueue() {
|
||||||
|
// 设置延迟队列的参数
|
||||||
|
Map<String, Object> arguments = new HashMap<>(3);
|
||||||
|
// 绑定死信交换机
|
||||||
|
arguments.put("x-dead-letter-exchange", MqConstant.DEAD_LETTER_EXCHANGE_NAME);
|
||||||
|
// 绑定死信路由键
|
||||||
|
arguments.put("x-dead-letter-routing-key", MqConstant.DEAD_LETTER_ROUTING_KEY);
|
||||||
|
// 可选:设置队列级别的默认TTL(毫秒),如果消息没有设置TTL则使用此值
|
||||||
|
arguments.put("x-message-ttl", 60000);
|
||||||
|
|
||||||
|
// 创建队列
|
||||||
|
return QueueBuilder.durable(MqConstant.DELAY_QUEUE_NAME)
|
||||||
|
.withArguments(arguments)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 声明死信队列,用于接收过期的消息并处理
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Queue deadLetterQueue() {
|
||||||
|
return QueueBuilder.durable(MqConstant.DEAD_LETTER_QUEUE_NAME)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 绑定延迟队列到延迟交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding delayQueueBinding() {
|
||||||
|
return BindingBuilder.bind(delayQueue())
|
||||||
|
.to(delayExchange())
|
||||||
|
.with(MqConstant.DELAY_ROUTING_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 绑定死信队列到死信交换机
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Binding deadLetterQueueBinding() {
|
||||||
|
return BindingBuilder.bind(deadLetterQueue())
|
||||||
|
.to(deadLetterExchange())
|
||||||
|
.with(MqConstant.DEAD_LETTER_ROUTING_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -14,6 +14,7 @@ import com.suisung.mall.common.api.CommonResult;
|
|||||||
import com.suisung.mall.common.service.impl.BaseControllerImpl;
|
import com.suisung.mall.common.service.impl.BaseControllerImpl;
|
||||||
import com.suisung.mall.shop.lakala.service.LakalaApiService;
|
import com.suisung.mall.shop.lakala.service.LakalaApiService;
|
||||||
import com.suisung.mall.shop.library.service.LibraryProductService;
|
import com.suisung.mall.shop.library.service.LibraryProductService;
|
||||||
|
import com.suisung.mall.shop.message.service.MqMessageService;
|
||||||
import com.suisung.mall.shop.message.service.PushMessageService;
|
import com.suisung.mall.shop.message.service.PushMessageService;
|
||||||
import com.suisung.mall.shop.order.service.ShopOrderReturnService;
|
import com.suisung.mall.shop.order.service.ShopOrderReturnService;
|
||||||
import com.suisung.mall.shop.store.service.ShopStoreSameCityTransportBaseService;
|
import com.suisung.mall.shop.store.service.ShopStoreSameCityTransportBaseService;
|
||||||
@ -50,10 +51,13 @@ public class LakalaController extends BaseControllerImpl {
|
|||||||
@Resource
|
@Resource
|
||||||
private PushMessageService pushMessageService;
|
private PushMessageService pushMessageService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private MqMessageService mqMessageService;
|
||||||
|
|
||||||
@ApiOperation(value = "测试案例", notes = "测试案例")
|
@ApiOperation(value = "测试案例", notes = "测试案例")
|
||||||
@RequestMapping(value = "/testcase", method = RequestMethod.POST)
|
@RequestMapping(value = "/testcase", method = RequestMethod.POST)
|
||||||
public Object testcase(@RequestBody JSONObject paramsJSON) {
|
public Object testcase(@RequestBody JSONObject paramsJSON) {
|
||||||
return shopOrderReturnService.sfExpressExpiredForceRefund(paramsJSON.getStr("orderId"));
|
// return shopOrderReturnService.sfExpressExpiredForceRefund(paramsJSON.getStr("orderId"));
|
||||||
// return lakalaPayService.applyLedgerMerEc(paramsJSON.getStr("mchMobile"));
|
// return lakalaPayService.applyLedgerMerEc(paramsJSON.getStr("mchMobile"));
|
||||||
// return lakalaPayService.LedgerMerEcDownload(975790666910121984L);
|
// return lakalaPayService.LedgerMerEcDownload(975790666910121984L);
|
||||||
|
|
||||||
@ -72,6 +76,16 @@ public class LakalaController extends BaseControllerImpl {
|
|||||||
// List<String> clientIds = JSONUtil.toList(paramsJSON.getJSONArray("clientIds"), String.class);
|
// List<String> clientIds = JSONUtil.toList(paramsJSON.getJSONArray("clientIds"), String.class);
|
||||||
// return pushMessageService.sendMessage(clientIds, paramsJSON.getStr("title"), paramsJSON.getStr("content"), paramsJSON.getJSONObject("payload"));
|
// return pushMessageService.sendMessage(clientIds, paramsJSON.getStr("title"), paramsJSON.getStr("content"), paramsJSON.getJSONObject("payload"));
|
||||||
|
|
||||||
|
JSONObject jsonObject = new JSONObject();
|
||||||
|
String orderId = "DD-20250725-1";
|
||||||
|
jsonObject.put("category", 1);
|
||||||
|
jsonObject.put("orderId", "DD-20250725-1");
|
||||||
|
jsonObject.put("storeId", 12);
|
||||||
|
jsonObject.put("title", "有一笔已超时的订单!");
|
||||||
|
jsonObject.put("message", "您有一笔已超时的订单[" + orderId + "],请及时处理。");
|
||||||
|
mqMessageService.sendDelayMessage(jsonObject.toString(), 10000);
|
||||||
|
return jsonObject;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "批量发送推送消息 - 测试案例", notes = "批量发送推送消息 - 测试案例")
|
@ApiOperation(value = "批量发送推送消息 - 测试案例", notes = "批量发送推送消息 - 测试案例")
|
||||||
|
|||||||
@ -22,4 +22,32 @@ public interface MqMessageService extends IBaseService<MqMessage> {
|
|||||||
* 设置消息状态
|
* 设置消息状态
|
||||||
*/
|
*/
|
||||||
boolean setMessageStatus(String messageId, Integer messageStatus);
|
boolean setMessageStatus(String messageId, Integer messageStatus);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送延迟消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @param delayMillis 延迟时间(毫秒)
|
||||||
|
*/
|
||||||
|
void sendDelayMessage(String message, long delayMillis);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送延迟消息(重载方法,可指定交换机和路由键)
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @param delayMillis 延迟时间(毫秒)
|
||||||
|
* @param exchange 交换机名称
|
||||||
|
* @param routingKey 路由键
|
||||||
|
*/
|
||||||
|
void sendDelayMessage(String message, long delayMillis, String exchange, String routingKey);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送延迟消息(泛型方法,支持对象消息)
|
||||||
|
*
|
||||||
|
* @param obj 消息对象
|
||||||
|
* @param delayMillis 延迟时间(毫秒)
|
||||||
|
* @param <T> 消息类型
|
||||||
|
*/
|
||||||
|
<T> void sendDelayObjectMessage(T obj, long delayMillis);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,11 +9,14 @@ import com.suisung.mall.core.web.service.impl.BaseServiceImpl;
|
|||||||
import com.suisung.mall.shop.message.mapper.MqMessageMapper;
|
import com.suisung.mall.shop.message.mapper.MqMessageMapper;
|
||||||
import com.suisung.mall.shop.message.service.MqMessageService;
|
import com.suisung.mall.shop.message.service.MqMessageService;
|
||||||
import com.suisung.mall.shop.message.vo.MqMessageVo;
|
import com.suisung.mall.shop.message.vo.MqMessageVo;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.core.MessageBuilder;
|
||||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -125,4 +128,65 @@ public class MqMessageServiceImpl extends BaseServiceImpl<MqMessageMapper, MqMes
|
|||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送延迟消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @param delayMillis 延迟时间(毫秒)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void sendDelayMessage(String message, long delayMillis) {
|
||||||
|
sendDelayMessage(message, delayMillis, MqConstant.DELAY_EXCHANGE_NAME, MqConstant.DELAY_ROUTING_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送延迟消息(重载方法,可指定交换机和路由键)
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @param delayMillis 延迟时间(毫秒)
|
||||||
|
* @param exchange 交换机名称
|
||||||
|
* @param routingKey 路由键
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void sendDelayMessage(String message, long delayMillis, String exchange, String routingKey) {
|
||||||
|
// 验证延迟时间是否有效
|
||||||
|
if (delayMillis <= 0) {
|
||||||
|
throw new IllegalArgumentException("延迟时间必须大于0");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建消息并设置TTL
|
||||||
|
Message delayMessage = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
|
||||||
|
.setExpiration(String.valueOf(delayMillis)) // 设置消息过期时间(毫秒)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 发送消息到延迟队列
|
||||||
|
rabbitTemplate.send(exchange, routingKey, delayMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送延迟消息(泛型方法,支持对象消息)
|
||||||
|
*
|
||||||
|
* @param obj 消息对象
|
||||||
|
* @param delayMillis 延迟时间(毫秒)
|
||||||
|
* @param <T> 消息类型
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public <T> void sendDelayObjectMessage(T obj, long delayMillis) {
|
||||||
|
// 验证延迟时间是否有效
|
||||||
|
if (delayMillis <= 0) {
|
||||||
|
throw new IllegalArgumentException("延迟时间必须大于0");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送对象消息,设置消息属性
|
||||||
|
rabbitTemplate.convertAndSend(
|
||||||
|
MqConstant.DELAY_EXCHANGE_NAME,
|
||||||
|
MqConstant.DELAY_ROUTING_KEY,
|
||||||
|
obj,
|
||||||
|
message -> {
|
||||||
|
message.getMessageProperties().setExpiration(String.valueOf(delayMillis));
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,187 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2025. Lorem ipsum dolor sit amet, consectetur adipiscing elit.
|
||||||
|
* Morbi non lorem porttitor neque feugiat blandit. Ut vitae ipsum eget quam lacinia accumsan.
|
||||||
|
* Etiam sed turpis ac ipsum condimentum fringilla. Maecenas magna.
|
||||||
|
* Proin dapibus sapien vel ante. Aliquam erat volutpat. Pellentesque sagittis ligula eget metus.
|
||||||
|
* Vestibulum commodo. Ut rhoncus gravida arcu.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.suisung.mall.shop.order.listener;
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONObject;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import com.suisung.mall.common.constant.CommonConstant;
|
||||||
|
import com.suisung.mall.common.constant.MqConstant;
|
||||||
|
import com.suisung.mall.shop.message.service.PushMessageService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 延迟消息接收器,处理过期的消息
|
||||||
|
* <p>
|
||||||
|
* 该监听器处理RabbitMQ中的死信队列消息,主要处理以下类型的消息:
|
||||||
|
* 1. 订单超时消息(DEAD_EVENT_CATE_ORDER_EXPIRED)
|
||||||
|
* 2. 预订单消息(DEAD_EVENT_CATE_PRE_ORDER)
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class DelayMessageReceiver {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private PushMessageService pushMessageService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监听死信队列,处理过期的延迟消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容(JSON格式)
|
||||||
|
* @param channel RabbitMQ通道,用于手动确认消息
|
||||||
|
* @param msg 消息对象,包含消息属性等信息
|
||||||
|
*/
|
||||||
|
@RabbitListener(queues = "dead.letter.queue")
|
||||||
|
public void handleExpiredMessage(JSONObject message, Channel channel, Message msg) {
|
||||||
|
log.info("收到过期消息,开始执行触发方法: {}", message);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 处理死信消息
|
||||||
|
boolean result = processDeadMessage(message);
|
||||||
|
|
||||||
|
// 根据处理结果确认或拒绝消息
|
||||||
|
if (result) {
|
||||||
|
ackMessage(channel, msg);
|
||||||
|
} else {
|
||||||
|
log.warn("处理过期消息失败,消息将重新入队,消息内容: {}", message);
|
||||||
|
rejectMessage(channel, msg);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理过期消息时发生异常,消息将重新入队,消息内容: {}", message, e);
|
||||||
|
rejectMessage(channel, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理死信队列消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @return 处理结果 true-成功 false-失败
|
||||||
|
*/
|
||||||
|
private boolean processDeadMessage(JSONObject message) {
|
||||||
|
log.info("开始处理死信消息: {}", message);
|
||||||
|
|
||||||
|
// 检查消息是否为空
|
||||||
|
if (message == null) {
|
||||||
|
log.warn("收到空消息,无法处理");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 获取消息分类
|
||||||
|
Integer category = message.getInt("category");
|
||||||
|
if (category == null) {
|
||||||
|
log.warn("消息分类为空,无法处理消息: {}", message);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据消息分类处理不同类型的消息
|
||||||
|
if (category == MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED) {
|
||||||
|
// 处理订单超时消息
|
||||||
|
return handleOrderExpiredMessage(message);
|
||||||
|
} else if (category == MqConstant.DEAD_EVENT_CATE_PRE_ORDER) {
|
||||||
|
// 处理预订单消息
|
||||||
|
return handlePreOrderMessage(message);
|
||||||
|
} else {
|
||||||
|
log.warn("未知的消息分类: {},消息内容: {}", category, message);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理死信消息时发生异常,消息内容: {}", message, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理订单超时消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @return 处理结果
|
||||||
|
*/
|
||||||
|
private boolean handleOrderExpiredMessage(JSONObject message) {
|
||||||
|
try {
|
||||||
|
String orderId = message.getStr("orderId");
|
||||||
|
Integer storeId = message.getInt("storeId");
|
||||||
|
|
||||||
|
log.info("处理订单超时消息,订单号: {}, 店铺ID: {}", orderId, storeId);
|
||||||
|
|
||||||
|
// 构造推送消息内容
|
||||||
|
JSONObject payload = new JSONObject();
|
||||||
|
payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ABNORMAL_ORDER_LIST);
|
||||||
|
payload.put("orderId", orderId);
|
||||||
|
|
||||||
|
// 发送推送消息给商家员工
|
||||||
|
pushMessageService.noticeMerchantEmployeeOrderAction(
|
||||||
|
storeId, orderId, message.getStr("title"),
|
||||||
|
message.getStr("message"), payload);
|
||||||
|
|
||||||
|
log.info("订单超时消息处理完成,订单号: {}, 店铺ID: {}", orderId, storeId);
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理订单超时消息时发生异常,消息内容: {}", message, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理预订单消息
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @return 处理结果
|
||||||
|
*/
|
||||||
|
private boolean handlePreOrderMessage(JSONObject message) {
|
||||||
|
try {
|
||||||
|
log.info("处理预订单消息: {}", message);
|
||||||
|
// 预订单处理逻辑(待实现)
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理预订单消息时发生异常,消息内容: {}", message, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 确认消息处理成功
|
||||||
|
*
|
||||||
|
* @param channel RabbitMQ通道
|
||||||
|
* @param message 消息对象
|
||||||
|
*/
|
||||||
|
private void ackMessage(Channel channel, Message message) {
|
||||||
|
try {
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
log.debug("消息确认成功,消息ID: {}", message.getMessageProperties().getMessageId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("确认消息失败,消息ID: {},异常原因:", message.getMessageProperties().getMessageId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 拒绝消息处理(消息重新入队)
|
||||||
|
*
|
||||||
|
* @param channel RabbitMQ通道
|
||||||
|
* @param message 消息对象
|
||||||
|
*/
|
||||||
|
private void rejectMessage(Channel channel, Message message) {
|
||||||
|
try {
|
||||||
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
|
log.debug("消息拒绝成功,消息将重新入队,消息ID: {}", message.getMessageProperties().getMessageId());
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (IOException | InterruptedException e) {
|
||||||
|
log.error("拒绝消息失败,消息ID: {},异常原因:", message.getMessageProperties().getMessageId(), e);
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -23,7 +23,7 @@ public class MessageListener {
|
|||||||
private MessageService messageService;
|
private MessageService messageService;
|
||||||
|
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
public void listener(String data, Channel channel, Message message) throws IOException, InterruptedException {
|
public void listener(String data, Channel channel, Message message) {
|
||||||
MsgTO msgTO = JSONUtil.toBean(data, MsgTO.class);
|
MsgTO msgTO = JSONUtil.toBean(data, MsgTO.class);
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
|
||||||
@ -31,18 +31,47 @@ public class MessageListener {
|
|||||||
log.debug("消息监听到:{} ### {}", messageId, msgTO);
|
log.debug("消息监听到:{} ### {}", messageId, msgTO);
|
||||||
boolean flag = messageService.sendNoticeMsg(msgTO.getUser_id(), msgTO.getStore_id(), msgTO.getMessage_id(), msgTO.getArgs());
|
boolean flag = messageService.sendNoticeMsg(msgTO.getUser_id(), msgTO.getStore_id(), msgTO.getMessage_id(), msgTO.getArgs());
|
||||||
if (flag) {
|
if (flag) {
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
ackMessage(channel, message);
|
||||||
} else {
|
} else {
|
||||||
log.error("消息消费失败,执行sendNoticeMsg异常");
|
log.error("消息消费失败,执行sendNoticeMsg异常");
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
rejectMessage(channel, message);
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("消息消费失败,执行sendNoticeMsg异常, 异常原因:", e);
|
log.error("消息消费失败,执行sendNoticeMsg异常, 异常原因:", e);
|
||||||
|
rejectMessage(channel, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 确认消息处理成功
|
||||||
|
*
|
||||||
|
* @param channel RabbitMQ通道
|
||||||
|
* @param message 消息对象
|
||||||
|
*/
|
||||||
|
private void ackMessage(Channel channel, Message message) {
|
||||||
|
try {
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("确认消息失败,异常原因:", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 拒绝消息处理
|
||||||
|
*
|
||||||
|
* @param channel RabbitMQ通道
|
||||||
|
* @param message 消息对象
|
||||||
|
*/
|
||||||
|
private void rejectMessage(Channel channel, Message message) {
|
||||||
|
try {
|
||||||
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
} catch (IOException | InterruptedException e) {
|
||||||
|
log.error("处理消息失败,异常原因:", e);
|
||||||
|
if (e instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -132,6 +132,10 @@ public class OrderPayedListener {
|
|||||||
payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ONLINE_ORDER_LIST);
|
payload.put("category", CommonConstant.PUSH_MSG_CATE_MCH_ONLINE_ORDER_LIST);
|
||||||
payload.put("orderId", orderId);
|
payload.put("orderId", orderId);
|
||||||
pushMessageService.noticeMerchantEmployeeOrderAction(orderInfoOld.getStore_id(), orderId, title, content, payload);
|
pushMessageService.noticeMerchantEmployeeOrderAction(orderInfoOld.getStore_id(), orderId, title, content, payload);
|
||||||
|
|
||||||
|
// 发送 预过期 MQ 的推送消息
|
||||||
|
shopOrderBaseService.preSendExpiredSFOrderPushMessage(orderInfoOld.getStore_id(), orderId, 1500L); // 25分钟发出过期消息
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -596,4 +596,14 @@ public interface ShopOrderBaseService extends IBaseService<ShopOrderBase> {
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
WxOrderBaseInfoDTO getWxOrderBaseInfo(String orderId);
|
WxOrderBaseInfoDTO getWxOrderBaseInfo(String orderId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 预处理发货订单超时消息(发到 mq 里,触发超时事件,发出推送消息)
|
||||||
|
*
|
||||||
|
* @param storeId 店铺Id
|
||||||
|
* @param orderId 订单Id
|
||||||
|
* @param expireSeconds 配送超时的秒数,单位秒
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
Boolean preSendExpiredSFOrderPushMessage(Integer storeId, String orderId, Long expireSeconds);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -101,6 +101,7 @@ import io.seata.core.exception.TransactionException;
|
|||||||
import io.seata.spring.annotation.GlobalTransactional;
|
import io.seata.spring.annotation.GlobalTransactional;
|
||||||
import io.seata.tm.api.GlobalTransaction;
|
import io.seata.tm.api.GlobalTransaction;
|
||||||
import io.seata.tm.api.GlobalTransactionContext;
|
import io.seata.tm.api.GlobalTransactionContext;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.ibatis.annotations.Param;
|
import org.apache.ibatis.annotations.Param;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -110,6 +111,7 @@ import org.springframework.beans.factory.annotation.Value;
|
|||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.data.util.Pair;
|
import org.springframework.data.util.Pair;
|
||||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.transaction.TransactionStatus;
|
||||||
@ -138,6 +140,7 @@ import static com.suisung.mall.common.utils.ContextUtil.getCurrentUser;
|
|||||||
* @author Xinze
|
* @author Xinze
|
||||||
* @since 2021-04-30
|
* @since 2021-04-30
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class ShopOrderBaseServiceImpl extends BaseServiceImpl<ShopOrderBaseMapper, ShopOrderBase> implements ShopOrderBaseService {
|
public class ShopOrderBaseServiceImpl extends BaseServiceImpl<ShopOrderBaseMapper, ShopOrderBase> implements ShopOrderBaseService {
|
||||||
|
|
||||||
@ -8805,6 +8808,37 @@ public class ShopOrderBaseServiceImpl extends BaseServiceImpl<ShopOrderBaseMappe
|
|||||||
return shopOrderBaseMapper.getWxOrderBaseInfo(orderId);
|
return shopOrderBaseMapper.getWxOrderBaseInfo(orderId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 预处理发货订单超时消息(发到 mq 里,触发超时事件,发出推送消息)
|
||||||
|
*
|
||||||
|
* @param storeId 店铺ID
|
||||||
|
* @param orderId 订单ID
|
||||||
|
* @param expireSeconds 过期时间(秒)
|
||||||
|
* @return 是否发送成功
|
||||||
|
*/
|
||||||
|
@Async
|
||||||
|
@Override
|
||||||
|
public Boolean preSendExpiredSFOrderPushMessage(Integer storeId, String orderId, Long expireSeconds) {
|
||||||
|
try {
|
||||||
|
// 构建延迟消息内容
|
||||||
|
JSONObject jsonObject = new JSONObject();
|
||||||
|
jsonObject.put("category", MqConstant.DEAD_EVENT_CATE_ORDER_EXPIRED); // 消息分类:1-订单超时消息
|
||||||
|
jsonObject.put("orderId", orderId); // 订单ID
|
||||||
|
jsonObject.put("storeId", storeId); // 店铺ID
|
||||||
|
jsonObject.put("title", "有一笔已超时的订单!"); // 消息标题
|
||||||
|
jsonObject.put("message", "您有一笔已超时的订单[" + orderId + "],请及时处理。"); // 消息内容
|
||||||
|
|
||||||
|
// 发送延迟消息
|
||||||
|
mqMessageService.sendDelayMessage(jsonObject.toString(), expireSeconds * 1000); // 转换为毫秒
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("发送延迟订单超时消息失败,店铺ID:{},订单ID:{},过期时间:{}秒",
|
||||||
|
storeId, orderId, expireSeconds, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 取货单号格式化
|
* 取货单号格式化
|
||||||
*
|
*
|
||||||
|
|||||||
14
pom.xml
14
pom.xml
@ -330,14 +330,14 @@
|
|||||||
<!-- <mysql.pwd>J0XivNvAcR14}pA6Cysm.E17</mysql.pwd>-->
|
<!-- <mysql.pwd>J0XivNvAcR14}pA6Cysm.E17</mysql.pwd>-->
|
||||||
<!-- <mysql.driver>com.mysql.cj.jdbc.Driver</mysql.driver>-->
|
<!-- <mysql.driver>com.mysql.cj.jdbc.Driver</mysql.driver>-->
|
||||||
<!-- redis配置 -->
|
<!-- redis配置 -->
|
||||||
<!-- <redis.host>114.132.210.208</redis.host>-->
|
<redis.host>114.132.210.208</redis.host>
|
||||||
<!-- <redis.database>15</redis.database>-->
|
|
||||||
<!-- <redis.port>6379</redis.port>-->
|
|
||||||
<!-- <redis.password>Gpff654321</redis.password>-->
|
|
||||||
<redis.host>42.194.196.179</redis.host>
|
|
||||||
<redis.database>15</redis.database>
|
<redis.database>15</redis.database>
|
||||||
<redis.port>6480</redis.port>
|
<redis.port>6379</redis.port>
|
||||||
<redis.password>hwe9EgqgMAwY</redis.password>
|
<redis.password>Gpff654321</redis.password>
|
||||||
|
<!-- <redis.host>42.194.196.179</redis.host>-->
|
||||||
|
<!-- <redis.database>15</redis.database>-->
|
||||||
|
<!-- <redis.port>6480</redis.port>-->
|
||||||
|
<!-- <redis.password>hwe9EgqgMAwY</redis.password>-->
|
||||||
<!-- rabbitmq配置 -->
|
<!-- rabbitmq配置 -->
|
||||||
<rabbitmq.host>114.132.210.208</rabbitmq.host>
|
<rabbitmq.host>114.132.210.208</rabbitmq.host>
|
||||||
<rabbitmq.port>5672</rabbitmq.port>
|
<rabbitmq.port>5672</rabbitmq.port>
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user