新增库存扣减中间表

This commit is contained in:
liyj 2025-10-11 14:22:08 +08:00
parent 61228d9c35
commit 704b9df74d
10 changed files with 244 additions and 37 deletions

View File

@ -0,0 +1,52 @@
package com.suisung.mall.common.modules.sync;
import com.baomidou.mybatisplus.annotation.*;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
* 同步消费中间表同步数据时保存数据到这个表中消费完更新已消费证明数据已经同步给思迅客户端
*/
@Data
@TableName("product_quantity_consumption")
@ApiModel("商品数量消费表")
public class ProductQuantityConsumption {
@TableId(value = "consume_id", type = IdType.INPUT)
@ApiModelProperty("自定义主键ID")
private String consumeId;
@TableField(value = "order_id",updateStrategy = FieldStrategy.NOT_EMPTY)
@ApiModelProperty("订单编号")
private String orderId;
@TableField(value = "product_number", updateStrategy = FieldStrategy.NOT_EMPTY)
@ApiModelProperty("商品编号")
private String productNumber;
@ApiModelProperty("数量(正数表示入库/增加,负数表示出库/减少)")
@TableField(value = "quantity",updateStrategy = FieldStrategy.NOT_EMPTY)
private BigDecimal quantity;
@ApiModelProperty("消费状态0-未消费1-已消费")
@TableField(value = "status",updateStrategy = FieldStrategy.NOT_EMPTY)
private Integer status;
@TableField(value = "store_id",updateStrategy = FieldStrategy.NOT_EMPTY)
@ApiModelProperty("店铺ID")
private Integer storeId;
@TableField(value = "update_time",updateStrategy = FieldStrategy.NOT_EMPTY)
@ApiModelProperty("创建时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
@TableField(value = "update_time",updateStrategy = FieldStrategy.NOT_EMPTY)
@ApiModelProperty(value = "更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
}

View File

@ -3300,7 +3300,7 @@ public class ShopOrderBaseServiceImpl extends BaseServiceImpl<ShopOrderBaseMappe
// RMK 第三方数据同步相关redis 给这个商品减去对应的库存
Map<String, Integer> stockDeltaMap = new HashMap<>();
stockDeltaMap.put(item_src_id, -order_item_quantity);
stockDeltaMap.put(item_src_id+"_"+order_item_row.getOrder_id(), -order_item_quantity);
syncThirdDataService.incrProductStockToRedis(stockDeltaMap);
}
}
@ -4223,7 +4223,7 @@ public class ShopOrderBaseServiceImpl extends BaseServiceImpl<ShopOrderBaseMappe
log.debug("释放库存Item_src_id:{},数量:{}",shopProductItem.getItem_src_id(),order_item_quantity);
// RMK 第三方数据同步相关redis 给这个商品加上对应的库存
Map<String, Integer> stockDeltaMap = new HashMap<>();
stockDeltaMap.put(Convert.toStr(shopProductItem.getItem_src_id()), order_item_quantity);
stockDeltaMap.put(shopProductItem.getItem_src_id()+"_"+order_item_row.getOrder_id(), order_item_quantity);
syncThirdDataService.incrProductStockToRedis(stockDeltaMap);
}
}
@ -7236,7 +7236,7 @@ public class ShopOrderBaseServiceImpl extends BaseServiceImpl<ShopOrderBaseMappe
// RMK 第三方数据同步相关redis 给这个商品减去上对应的库存
log.debug("减库存Item_src_id:{},数量:{}",item_src_id,cart_quantity);
Map<String, Integer> stockDeltaMap = new HashMap<>();
stockDeltaMap.put(Convert.toStr(item_src_id), -cart_quantity);
stockDeltaMap.put(item_src_id+"_"+order_id, -cart_quantity);
syncThirdDataService.incrProductStockToRedis(stockDeltaMap);
}

View File

@ -137,6 +137,15 @@ public class SyncThirdDataController {
return syncThirdDataService.getStoreDataRelease(appKey,sign);
}
@ApiOperation(value = "扣完库存之后做确认", notes = "扣完库存之后做确认")
@RequestMapping(value = "/syncStoreDataReleaseResponse", method = RequestMethod.POST)
public ThirdApiRes syncStoreDataReleaseResponse(@RequestBody List<String> consumIds,
@RequestParam String appKey,
@RequestParam String sign) {
syncThirdDataService.getStoreDataReleaseResponse(appKey,sign,consumIds);
return new ThirdApiRes().success("请求成功!");
}
@ApiOperation(value = "通知上传商品文件到cos", notes = "通知上传商品文件到cos")
@RequestMapping(value = "/uploudToCos", method = RequestMethod.POST)

View File

@ -4,7 +4,7 @@ public class RedisKey {
//public static final String SXCLIENTKEYVERSION="sxclientKey:version";//客户端版本
public static final String STOREDATARELEASE="storedata:release";
public static final String STOREDATARELEASE="shopQuality:release";
public static final String STOREDATAPRODUCTMAPING="storedata:productMaping";

View File

@ -0,0 +1,11 @@
package com.suisung.mall.shop.sync.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.suisung.mall.common.modules.sync.ProductQuantityConsumption;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductQuantityConsumptionMapper extends BaseMapper<ProductQuantityConsumption> {
}

View File

@ -0,0 +1,8 @@
package com.suisung.mall.shop.sync.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.suisung.mall.common.modules.sync.ProductQuantityConsumption;
public interface ProductQuantityConsumptionService extends IService<ProductQuantityConsumption> {
}

View File

@ -12,6 +12,7 @@ import cn.hutool.json.JSONArray;
import com.suisung.mall.common.api.CommonResult;
import com.suisung.mall.common.modules.base.ShopBaseProductBrand;
import com.suisung.mall.common.modules.base.ShopBaseProductCategory;
import com.suisung.mall.common.modules.sync.ProductQuantityConsumption;
import com.suisung.mall.common.pojo.req.SyncThirdMemberReq;
import com.suisung.mall.common.pojo.res.ThirdApiRes;
import org.springframework.core.io.Resource;
@ -110,6 +111,14 @@ public interface SyncThirdDataService {
*/
ThirdApiRes getStoreDataRelease(String appKey, String sign);
/**
* 扣减完库存之后做确认确认完成更新队列表为已消费
* @param appKey
* @param sign
* @return
*/
ThirdApiRes getStoreDataReleaseResponse(String appKey, String sign,List<String> consumIds);
/**
* 保存一个或多个商品刚刚变化的库存到 redis hash 缓存
*
@ -122,7 +131,7 @@ public interface SyncThirdDataService {
*
* @return
*/
Map<String, Double> getProductStockFromRedis(String storeId);
List<ProductQuantityConsumption> getProductStockFromRedis(String storeId,Map<String, Double> redisHash);
/**
* 下单或支付后批量累加减商品库存使用 Redis Hash 的原子自增操作保证并发安全

View File

@ -0,0 +1,18 @@
package com.suisung.mall.shop.sync.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.suisung.mall.common.modules.sync.ProductQuantityConsumption;
import com.suisung.mall.shop.sync.mapper.ProductQuantityConsumptionMapper;
import com.suisung.mall.shop.sync.service.ProductQuantityConsumptionService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@lombok.extern.slf4j.Slf4j
@Transactional
public class ProductQuantityConsumptionServiceImpl extends ServiceImpl<ProductQuantityConsumptionMapper, ProductQuantityConsumption> implements ProductQuantityConsumptionService {
}

View File

@ -13,10 +13,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.ZipUtil;
import cn.hutool.core.util.*;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
@ -27,6 +24,7 @@ import com.google.gson.GsonBuilder;
import com.qcloud.cos.model.COSObjectSummary;
import com.suisung.mall.common.adapter.BigDecimalTypeAdapter;
import com.suisung.mall.common.api.CommonResult;
import com.suisung.mall.common.api.ResultCode;
import com.suisung.mall.common.api.StateCode;
import com.suisung.mall.common.enums.DicEnum;
import com.suisung.mall.common.exception.ApiException;
@ -42,10 +40,7 @@ import com.suisung.mall.common.modules.sixun.SxSyncGoods;
import com.suisung.mall.common.modules.sixun.SxSyncVip;
import com.suisung.mall.common.modules.store.ShopStoreActivityBase;
import com.suisung.mall.common.modules.store.ShopStoreActivityItem;
import com.suisung.mall.common.modules.sync.StoreDbConfig;
import com.suisung.mall.common.modules.sync.SyncApp;
import com.suisung.mall.common.modules.sync.SyncConfig;
import com.suisung.mall.common.modules.sync.SyncFileLog;
import com.suisung.mall.common.modules.sync.*;
import com.suisung.mall.common.pojo.req.SyncThirdMemberReq;
import com.suisung.mall.common.pojo.res.ThirdApiRes;
import com.suisung.mall.common.utils.I18nUtil;
@ -77,6 +72,7 @@ import com.suisung.mall.shop.sync.dto.BrandModel;
import com.suisung.mall.shop.sync.keymanage.RedisKey;
import com.suisung.mall.shop.sync.service.*;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -180,10 +176,12 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
@Autowired
private ProductMappingService productMappingService;
@Autowired
private ShopProductIndexService shopProductIndexService;
@Autowired
private ProductQuantityConsumptionService productQuantityConsumptionService;
/**
* 批量保存商品的分类
*
@ -780,18 +778,91 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
if (syncAppO == null) {
return new ThirdApiRes().fail(1001, I18nUtil._("签名有误!"));
}
//持久化保存数据到数据库队列
String key=RedisKey.STOREDATARELEASE+":"+syncAppO.getStore_id();
Map<String, Double> redisHash = redisTemplate.opsForHash().entries(key);
List<ProductQuantityConsumption> productQuantityConsumptionList=getProductStockFromRedis(syncAppO.getStore_id(),redisHash);
List<ProductQuantityConsumption> productQuantityConsumptions=new ArrayList<>();
if (!productQuantityConsumptionList.isEmpty()) {
boolean result= productQuantityConsumptionService.saveBatch(productQuantityConsumptionList,productQuantityConsumptionList.size());
if(result){
Set<String> fields = redisHash.keySet();
redisTemplate.opsForHash().delete(key, fields.toArray());//清除持久化数据
}
}
//查询未消费数据
QueryWrapper<ProductQuantityConsumption> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("store_id", syncAppO.getStore_id());
queryWrapper.eq("status",0);//未消费数据
productQuantityConsumptions=productQuantityConsumptionService.list(queryWrapper);
return new ThirdApiRes().success("success", productQuantityConsumptions);
}
// Object obRst = redisService.get(RedisKey.STOREDATARELEASE);//商品库存扣减
// Map storeDataResultMap = new HashMap();
// if (obRst != null) {
// Map resultMap = (Map) obRst;
// Set<Map.Entry> sme = resultMap.entrySet();
// storeDataResultMap = sme.stream().filter(m -> !(m.getValue().equals((double) 0))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// }
@Override
public ThirdApiRes getStoreDataReleaseResponse(String appKey, String sign,List<String> consumIds) {
if (StrUtil.isBlank(appKey) || StrUtil.isBlank(sign)) {
logger.info("缺少必要参数!");
}
Gson gson = new Gson();
String jsonStr = gson.toJson(consumIds);
SyncApp syncApp = syncAppService.checkAppSign(appKey, sign, jsonStr); // 验签appid必要参数判断
if (syncApp == null) {
logger.info("签名有误!");
return new ThirdApiRes().fail(250,"签名有误");
}
if(consumIds!=null){
ExecutorService executor = Executors.newFixedThreadPool(6);
List<Future<?>> futures = new ArrayList<>();
Integer pages=CommonUtil.getPagesCount(consumIds.size(),limitCnt);
List<List<String>> pagesPatition = ListUtils.partition(consumIds, limitCnt);
AtomicInteger success = new AtomicInteger();
AtomicInteger fails = new AtomicInteger();
for (int i = 1; i <=pages ; i++) {
int finalI = i;
int finalI1 = i;
futures.add(executor.submit(() -> {
try {
List<String> pageData = pagesPatition.get(finalI - 1);
productQuantityConsumptionPageSave(pageData);
success.getAndIncrement();
return "成功执行"+ finalI1;
}catch (Exception e){
fails.getAndIncrement();
return "执行失败:"+finalI+":"+e.getMessage();
}
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
logger.info("任务结果: {}", future.get());
} catch (Exception e) {
logger.error("任务执行异常: {}", e.getMessage());
}
}
executor.shutdown();
logger.info("总任务数据:{}", pagesPatition.size());
logger.info("成功数据:{}", success.get());
logger.info("失败数据:{}", fails.get());
}
return new ThirdApiRes().success("success");
}
Map<String, Double> storeDataResultMap = getProductStockFromRedis(syncAppO.getStore_id());
return new ThirdApiRes().success("success", storeDataResultMap);
/**
* 更新消费状态
* @param consumIds
*/
private void productQuantityConsumptionPageSave(List<String> consumIds){
List<ProductQuantityConsumption> productQuantityConsumptionList=new ArrayList<>();
for (String consumeId : consumIds) {
ProductQuantityConsumption productQuantityConsumption=new ProductQuantityConsumption();
productQuantityConsumption.setConsumeId(consumeId);
productQuantityConsumption.setStatus(1);
productQuantityConsumptionList.add(productQuantityConsumption);
}
if(!productQuantityConsumptionList.isEmpty()){
productQuantityConsumptionService.updateBatchById(productQuantityConsumptionList,productQuantityConsumptionList.size());
}
}
// @Override
@ -813,23 +884,34 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
@Override
public Map<String, Double> getProductStockFromRedis(String storeId) {
public List<ProductQuantityConsumption> getProductStockFromRedis(String storeId,Map<String, Double> redisHash) {
try {
// Redis 获取 hash 结构的所有键值对
String key=RedisKey.STOREDATARELEASE+":"+storeId;
Map<Object, Object> redisHash = redisTemplate.opsForHash().entries(key);
if (redisHash.isEmpty()) {
return Collections.emptyMap();
return Collections.emptyList();
}
// 转换为 Map<String, Integer>
return redisHash.entrySet().stream()
.collect(Collectors.toMap(
entry -> String.valueOf(entry.getKey()),
entry -> Convert.toDouble(entry.getValue(), 0.00) // 转换失败时默认为 0
));
// 转换为 list
List<ProductQuantityConsumption> productQuantityConsumptionList=new ArrayList<>();
redisHash.forEach((k, v)->{
ProductQuantityConsumption productQuantityConsumption=new ProductQuantityConsumption();
String[] productKeyArrys=k.split("_");
if(productKeyArrys.length!=2){
return;
}
String productNumber=productKeyArrys[0];
String orderId=productKeyArrys[1];
productQuantityConsumption.setConsumeId(IdUtil.getSnowflakeNextIdStr());
productQuantityConsumption.setOrderId(orderId);
productQuantityConsumption.setProductNumber(productNumber);
productQuantityConsumption.setQuantity(Convert.toBigDecimal(v));
productQuantityConsumption.setStoreId(Integer.valueOf(storeId));
productQuantityConsumption.setStatus(0);
productQuantityConsumptionList.add(productQuantityConsumption);
});
return productQuantityConsumptionList;
} catch (Exception e) {
logger.error("从 Redis 获取商品库存失败: {}", e.getMessage(), e);
return Collections.emptyMap();
return Collections.emptyList();
}
}
@ -847,9 +929,15 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
if (StrUtil.isBlank(productKey) || delta == null) {
continue;
}
String[] productKeyArrys=productKey.split("_");
if(productKeyArrys.length!=2){
continue;
}
try {
String item_src_id=productKeyArrys[0];
String orderId=productKeyArrys[1];
QueryWrapper<ShopProductItem> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("item_src_id", productKey);
queryWrapper.eq("item_src_id", item_src_id);
List<ShopProductItem> shopProductItems=shopProductItemService.list(queryWrapper);
if(shopProductItems==null || shopProductItems.isEmpty()){
continue;
@ -871,7 +959,7 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
String name = Convert.toStr(item.get("name"));
BigDecimal itemQuaryty = getBigDecimal(delta, name);
// 使用 Redis HINCRBY 保证原子性和高性能
redisTemplate.opsForHash().increment(key, itemId, itemQuaryty.doubleValue());
redisTemplate.opsForHash().increment(key, itemId+"_"+orderId, itemQuaryty.doubleValue());
} catch (Exception e) {
logger.error("库存累计失败productKey={}, delta={}, error={}", productKey, delta, e.getMessage(), e);
}

View File

@ -0,0 +1,12 @@
CREATE TABLE `product_quantity_consumption` (
`consume_id` varchar(32) NOT NULL COMMENT '自定义主键ID',
`product_number` varchar(50) NOT NULL COMMENT '商品编号',
`order_id` varchar(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT '' COMMENT '订单编号',
`quantity` decimal(15,3) NOT NULL COMMENT '数量(正数表示入库/增加,负数表示出库/减少)',
`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '消费状态0-未消费1-已消费',
`store_id` int(20) NOT NULL COMMENT '店铺ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`consume_id`),
KEY `idx_shop_product` (`store_id`, `product_number`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品数量消费表';