新增redisson锁和redisson锁切面,保证分布式锁,分布式id

This commit is contained in:
liyj 2025-11-12 18:25:45 +08:00
parent b04f2d095d
commit ad7f89cb42
5 changed files with 499 additions and 255 deletions

View File

@ -0,0 +1,39 @@
package com.suisung.mall.common.annotation;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
/**
* 锁的key支持SpEL表达式,下面是示例
* key = "'ACCOUNT_LOCK:' + #batchSize", // 使用SpEL表达式锁key包含参数
* public List<Integer> getBatchUserAccountBaseId(int batchSize)
*/
String key();
/**
* 等待时间默认0-不等待
*/
long waitTime() default 0;
/**
* 锁持有时间默认30秒-1表示使用看门狗机制
*/
long leaseTime() default 30;
/**
* 时间单位默认秒
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/**
* 获取锁失败时的错误消息
*/
String errorMsg() default "系统繁忙,请稍后再试";
}

View File

@ -0,0 +1,110 @@
package com.suisung.mall.shop.Aspect;
import com.suisung.mall.common.annotation.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* 分布式锁切面
*/
@Slf4j
@Aspect
@Component
public class DistributedLockAspect {
@Autowired
private RedissonClient redissonClient;
private final ExpressionParser expressionParser = new SpelExpressionParser();
private final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
@Around("@annotation(distributedLock)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
// 解析锁的key支持SpEL表达式
String lockKey = parseLockKey(joinPoint, distributedLock);
RLock lock = redissonClient.getLock(lockKey);
boolean isLocked = false;
try {
// 尝试获取锁
if (distributedLock.leaseTime() == -1) {
// 使用看门狗机制
isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.timeUnit());
} else {
isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
}
if (!isLocked) {
log.warn("获取分布式锁失败lockKey: {}", lockKey);
throw new RuntimeException(distributedLock.errorMsg());
}
log.debug("成功获取分布式锁lockKey: {}", lockKey);
// 执行原方法
return joinPoint.proceed();
} finally {
// 释放锁
if (isLocked && lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug("释放分布式锁lockKey: {}", lockKey);
}
}
}
/**
* 解析锁的key支持SpEL表达式
*/
private String parseLockKey(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) {
String keyExpression = distributedLock.key();
// 如果key不包含SpEL表达式直接返回
if (!keyExpression.contains("#")) {
return keyExpression;
}
try {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Object[] args = joinPoint.getArgs();
String[] parameterNames = parameterNameDiscoverer.getParameterNames(method);
EvaluationContext context = new StandardEvaluationContext();
if (parameterNames != null) {
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
}
// 设置一些常用变量
context.setVariable("methodName", method.getName());
context.setVariable("className", method.getDeclaringClass().getSimpleName());
Expression expression = expressionParser.parseExpression(keyExpression);
String result = expression.getValue(context, String.class);
return StringUtils.isNotBlank(result) ? result : keyExpression;
} catch (Exception e) {
log.warn("解析锁key表达式失败使用原表达式: {}, error: {}", keyExpression, e.getMessage());
return keyExpression;
}
}
}

View File

@ -37,7 +37,13 @@ public class RedissionConfig {
config.useSingleServer().setAddress("redis://" + host + ":" + port);
config.useSingleServer().setDatabase(database);
config.useSingleServer().setPassword(password);
config.useSingleServer().setTimeout(1000*120);// 命令超时时间毫秒
config.useSingleServer().setRetryAttempts(5);// 重试次数
config.useSingleServer().setRetryInterval(2000); // 重试间隔毫秒
// 设置netty线程数
config.setNettyThreads(32);
// 增加处理线程数
config.setThreads(16);
//2根据Config创建出RedissonClient实例
return Redisson.create(config);
}

View File

@ -2,6 +2,8 @@ package com.suisung.mall.shop.number.service.impl;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.suisung.mall.common.annotation.DistributedLock;
import com.suisung.mall.common.exception.ApiException;
import com.suisung.mall.common.feignService.AccountService;
import com.suisung.mall.common.modules.base.ShopBaseProductSpec;
import com.suisung.mall.common.modules.library.LibraryProduct;
@ -19,6 +21,8 @@ import com.suisung.mall.shop.product.service.ShopProductBaseService;
import com.suisung.mall.shop.product.service.ShopProductItemService;
import com.suisung.mall.shop.product.service.ShopProductSpecItemService;
import com.suisung.mall.shop.sync.keymanage.RedisKey;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -29,6 +33,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@ -71,6 +76,9 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
@Lazy
private ShopProductItemService shopProductItemService;
@Autowired
private RedissonClient redissonClient;
public static void main(String[] args) {
System.out.printf(IntStream.rangeClosed(1, 1).boxed().collect(Collectors.toList()).toString());
@ -85,8 +93,13 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
*/
@Override
@Transactional(propagation = Propagation.NOT_SUPPORTED)
@DistributedLock(
key = "CREATENEXTSEQ_LOCK", // 锁的key
waitTime = 3, // 等待3秒
leaseTime = 10, // 锁持有10秒
errorMsg = "生成ID繁忙请稍后重试" // 自定义错误消息
)
public synchronized String createNextSeq(String prefix) {
String ymd = DateUtil.format(new Date(), "yyyyMMdd");
String id = String.format("%s_%s_", prefix, ymd);
ShopNumberSeq shopNumberSeq = this.baseMapper.selectById(id);
@ -148,25 +161,6 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
return null;
}
/**
* 校验同步数据是否同步完成就是两个库的主键是否一致
*
* @return
*/
private void checkPrimaryKey() {
boolean flag = shopNumberSeqMapper.findNumberFromShopBaseAndItem(new ShopNumberSeq()).size() > 1;
while (!flag) {
try {
Thread.sleep(3600);
} catch (InterruptedException e) {
log.error("checkPrimaryKey枷锁失败--" + e.getMessage());
break;
}
flag = shopNumberSeqMapper.findNumberFromShopBaseAndItem(new ShopNumberSeq()).size() > 1;
}
}
/**
* 批量获取id
*
@ -175,11 +169,18 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
* @return
*/
@Transactional(propagation = Propagation.NOT_SUPPORTED)
@DistributedLock(
key = "'CREATENEXTSEQ_LOCK:'+#seqName", // 锁的key
waitTime = 3, // 等待3秒
leaseTime = 10, // 锁持有10秒
errorMsg = "生成ID繁忙请稍后重试" // 自定义错误消息
)
public synchronized List<Long> batchCreateNextNo(String seqName, int batchSize) {
if (batchSize <= 0) {
return Collections.emptyList();
}
long number = 0;
if (null == redisService.get(String.format(CACHE_PREFIX, seqName))) {
syncPrimaryKey();
// 1. 获取当前序列值
@ -200,7 +201,6 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
number = numberCache;
}
// 2. 计算新值范围
long start = number;
long end = start + batchSize - 1;
@ -269,25 +269,47 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
* @return
*/
@Override
public synchronized List<Integer> getBatchSpecId(int batchSize) {
public List<Integer> getBatchSpecId(int batchSize) {
// 定义锁的key这个key在所有服务实例中必须一致
String lockKey = "LOCK:" + RedisKey.STOREDATASPECID;
// 2. 获取分布式锁对象
RLock lock = redissonClient.getLock(lockKey);
boolean isLocked = false;
int start = 0;
if (null != redisService.get(RedisKey.STOREDATASPECID)) {
start = (Integer) redisService.get(RedisKey.STOREDATASPECID);
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
try {
isLocked = lock.tryLock(2, 30, TimeUnit.SECONDS);
if (!isLocked) {
// 获取锁失败可以根据业务逻辑进行重试或抛出异常
throw new RuntimeException("系统繁忙,请稍后再试");
}
log.info("成功获得锁:{}",lockKey);
if (null != redisService.get(RedisKey.STOREDATASPECID)) {
start = (Integer) redisService.get(RedisKey.STOREDATASPECID);
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
QueryWrapper<ShopBaseProductSpec> queryWrapper = new QueryWrapper<>();
queryWrapper.select("max(spec_id) as spec_id");
ShopBaseProductSpec shopBaseProductSpec = shopBaseProductSpecService.getOne(queryWrapper);
if (null != shopBaseProductSpec) {
start = shopBaseProductSpec.getSpec_id();
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
} catch (Exception e) {
throw new ApiException(e);
}finally {
// 5. 最终检查并释放锁确保锁一定被释放
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
log.info("成功释放锁:{}",lockKey);
}
QueryWrapper<ShopBaseProductSpec> queryWrapper = new QueryWrapper<>();
queryWrapper.select("max(spec_id) as spec_id");
ShopBaseProductSpec shopBaseProductSpec = shopBaseProductSpecService.getOne(queryWrapper);
if (null != shopBaseProductSpec) {
start = shopBaseProductSpec.getSpec_id();
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
/**
@ -297,25 +319,46 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
* @return
*/
@Override
public synchronized List<Integer> getBatchSpecItemId(int batchSize) {
public List<Integer> getBatchSpecItemId(int batchSize) {
// 定义锁的key这个key在所有服务实例中必须一致
String lockKey = "LOCK:" + RedisKey.STOREDATASPECITEMID;
// 2. 获取分布式锁对象
RLock lock = redissonClient.getLock(lockKey);
boolean isLocked = false;
int start = 0;
if (null != redisService.get(RedisKey.STOREDATASPECITEMID)) {
start = (Integer) redisService.get(RedisKey.STOREDATASPECITEMID);
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
try {
isLocked=lock.tryLock(2,30,TimeUnit.SECONDS);
if (!isLocked) {
// 获取锁失败可以根据业务逻辑进行重试或抛出异常
throw new ApiException("系统繁忙,请稍后再试");
}
log.info("成功获得锁:{}",lockKey);
if (null != redisService.get(RedisKey.STOREDATASPECITEMID)) {
start = (Integer) redisService.get(RedisKey.STOREDATASPECITEMID);
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
QueryWrapper<ShopProductSpecItem> queryWrapper = new QueryWrapper<>();
queryWrapper.select("max(spec_item_id) as spec_item_id");
ShopProductSpecItem shopProductSpecItem = shopProductSpecItemService.getOne(queryWrapper);
if (null != shopProductSpecItem) {
start = shopProductSpecItem.getSpec_item_id();
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
} catch (InterruptedException e) {
throw new ApiException(e);
}finally {
// 5. 最终检查并释放锁确保锁一定被释放
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
log.info("成功释放锁:{}",lockKey);
}
QueryWrapper<ShopProductSpecItem> queryWrapper = new QueryWrapper<>();
queryWrapper.select("max(spec_item_id) as spec_item_id");
ShopProductSpecItem shopProductSpecItem = shopProductSpecItemService.getOne(queryWrapper);
if (null != shopProductSpecItem) {
start = shopProductSpecItem.getSpec_item_id();
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
/**
@ -325,23 +368,46 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
* @return
*/
@Override
public synchronized List<Integer> getBatchUserAccountBaseId(int batchSize) {
int start = 0;
if (null != redisService.get(RedisKey.STOREDATACCOUNTBASEID)) {
start = (Integer) redisService.get(RedisKey.STOREDATACCOUNTBASEID);
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
public List<Integer> getBatchUserAccountBaseId(int batchSize) {
// 定义锁的key这个key在所有服务实例中必须一致
String lockKey = "LOCK:" + RedisKey.STOREDATACCOUNTBASEID;
// 2. 获取分布式锁对象
RLock lock = redissonClient.getLock(lockKey);
boolean isLocked = false;
try {
isLocked = lock.tryLock(2, 10, TimeUnit.SECONDS);
if (!isLocked) {
// 获取锁失败可以根据业务逻辑进行重试或抛出异常
throw new RuntimeException("系统繁忙,请稍后再试");
}
log.info("成功获得锁:{}",lockKey);
int start = 0;
if (null != redisService.get(RedisKey.STOREDATACCOUNTBASEID)) {
start = (Integer) redisService.get(RedisKey.STOREDATACCOUNTBASEID);
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
Integer maxId = accountService.getAccountMaxId();
if (null != maxId) {
start = maxId;
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("getBatchUserAccountBaseId获取锁时被中断", e);
}finally {
// 5. 最终检查并释放锁确保锁一定被释放
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
log.info("成功释放锁:{}",lockKey);
}
Integer maxId = accountService.getAccountMaxId();
if (null != maxId) {
start = maxId;
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
@ -352,25 +418,47 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
* @return
*/
@Override
public synchronized List<Integer> getBatchLibraryProductId(int batchSize) {
int start = 0;
if (null != redisService.get(RedisKey.STOREDATALIBRARYID)) {
start = (Integer) redisService.get(RedisKey.STOREDATALIBRARYID);
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
public List<Integer> getBatchLibraryProductId(int batchSize) {
// 定义锁的key这个key在所有服务实例中必须一致
String lockKey = "LOCK:" + RedisKey.STOREDATALIBRARYID;
// 2. 获取分布式锁对象
RLock lock = redissonClient.getLock(lockKey);
boolean isLocked = false;
try {
isLocked = lock.tryLock(2, 10, TimeUnit.SECONDS);
int start = 0;
if (!isLocked) {
// 获取锁失败可以根据业务逻辑进行重试或抛出异常
throw new RuntimeException("系统繁忙,请稍后再试");
}
log.info("成功获得锁:{}",lockKey);
if (null != redisService.get(RedisKey.STOREDATALIBRARYID)) {
start = (Integer) redisService.get(RedisKey.STOREDATALIBRARYID);
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
QueryWrapper<LibraryProduct> queryWrapper = new QueryWrapper<>();
queryWrapper.select("max(id) as id");
LibraryProduct libraryProduct = libraryProductService.getOne(queryWrapper);
if (null != libraryProduct) {
start = Math.toIntExact(libraryProduct.getId());
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
// 5. 最终检查并释放锁确保锁一定被释放
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
log.info("成功释放锁:{}",lockKey);
}
QueryWrapper<LibraryProduct> queryWrapper = new QueryWrapper<>();
queryWrapper.select("max(id) as id");
LibraryProduct libraryProduct = libraryProductService.getOne(queryWrapper);
if (null != libraryProduct) {
start = Math.toIntExact(libraryProduct.getId());
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
}
if (start == 0) {
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
}
@Override

View File

@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder;
import com.qcloud.cos.model.COSObjectSummary;
import com.suisung.mall.common.adapter.BigDecimalTypeAdapter;
import com.suisung.mall.common.api.CommonResult;
import com.suisung.mall.common.api.ResultCode;
import com.suisung.mall.common.api.StateCode;
import com.suisung.mall.common.enums.DicEnum;
import com.suisung.mall.common.exception.ApiException;
@ -72,10 +71,11 @@ import com.suisung.mall.shop.sync.dto.ActiveShopInfo;
import com.suisung.mall.shop.sync.dto.BrandModel;
import com.suisung.mall.shop.sync.keymanage.RedisKey;
import com.suisung.mall.shop.sync.service.*;
import com.sun.corba.se.impl.orbutil.concurrent.Sync;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.ListUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -189,8 +189,11 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
@Autowired
private SyncStoreDataService syncStoreDataService;
@Autowired
private RedisService redisService;
private RedissonClient redissonClient;
/**
* 批量保存商品的分类
*
@ -579,153 +582,170 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
logger.info("没有商品数据");
return;
}
String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
// redisService.del(key);
if(!checkeckIsLock(storeId)){
logger.info("批量同步商品等待时间异常结束");
return;
}
setLock(storeId,"true");
List<String> newFolders = new ArrayList<>();
folders.forEach(page -> {
String newfolder = new FileUtils().getSyncTypeFlag(syncType, clientPath) + storeId + FileUtils.pathSeparator + page + FileUtils.pathSeparator;
newFolders.add(newfolder);
});
//upLoadZipToOss(newFolders.get(0));//上传文件到cos
// dowloadAndUnZip(newFolders.get(0));//读取cos文件回本地
syncPrimaryKey();
shopBaseProductCategoryService.clearCategoryCache(storeId);
// shopProductSpecItemService.clearExistItem(Integer.valueOf(storeId));
ExecutorService executor = Executors.newFixedThreadPool(6);
List<Future<?>> futures = new ArrayList<>();
// 提交任务
AtomicInteger success = new AtomicInteger(0);
AtomicInteger fails = new AtomicInteger(0);
List<String> failFolders = new ArrayList<>();
List<String> failMessage = new ArrayList<>();
shopBaseProductCategoryService.getCategoryListByStoreId(storeId);
// getBrandMapByStoreId()
Map<String, Integer> brandMaps = productBrandService.getBrandMapByStoreId(storeId);
QueryWrapper<StoreDbConfig> storeDbConfigQueryWrapper = new QueryWrapper<>();
storeDbConfigQueryWrapper.eq("store_id", storeId);
StoreDbConfig storeDbConfig = storeDbConfigService.getOne(storeDbConfigQueryWrapper);
// Map shopProductSpecItemMap = shopProductSpecItemService.getExistItem(Integer.valueOf(storeId));//切割缓存
// Map productMappingMap = productMappingService.getProductMapping(Integer.valueOf(storeId));//切割缓存
// Map ShopBaseProductSpecMap = baseProductSpecService.getShopBaseProductSpecMap(Integer.valueOf(storeId));//切割商品缓存
// long seconds=System.currentTimeMillis();
// Date productSaleTime=Date.from(Instant.now().plusSeconds(seconds));
String fileIndex=folders.get(0);
String fileEndFix;
if (fileIndex.length()>1){
fileEndFix=fileIndex.split("_")[1];
} else {
fileEndFix = "";
RLock lock =null;
String lockKey=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
try {
redissonClient.getKeys().count();
}catch (Exception e){
logger.info("锁异常");
return;
}
List<String> fileNames=new ArrayList<>();
for(int i=0;i<newFolders.size();i++){
String fileName = "goods_" + (i + 1)+"_"+fileEndFix+ ".txt";
fileNames.add(fileName);
lock = redissonClient.getLock(lockKey); // 锁名称通常与业务关联"ORDER_LOCK_123"
//尝试三次刚好2分分钟
if(!checkeckIsLock(lock)){
logger.info("系统繁忙,无法获取锁");
return;
}
QueryWrapper<SyncStoreData> syncStoreDataQueryWrapper = new QueryWrapper<>();
syncStoreDataQueryWrapper.eq("store_id", storeId);
syncStoreDataQueryWrapper.eq("status", 0);
syncStoreDataQueryWrapper.in("file_name",fileNames);
List<SyncStoreData> syncStoreDataList= syncStoreDataService.list(syncStoreDataQueryWrapper);
Map<String,String> syncDataMap= syncStoreDataList.stream().collect(Collectors.toMap(SyncStoreData::getSyncStoreDataId, SyncStoreData::getContent));
for (int i = 0; i < newFolders.size(); i++) {
final int taskId = i;
final String isNegativeAllowed = storeDbConfig.getIsNegativeAllowed();
final Integer automatic=storeDbConfig.getAutomatic();
//String priorityMode = storeDbConfig.getPriorityMode();
//boolean isUpdatePrice= ObjectUtil.isNotEmpty(storeDbConfig.getRefreshTime());//是否更新所有切割价格
threadNum.incrementAndGet();
Map<String, String> finalSyncDataMap = syncDataMap;
futures.add(executor.submit(() -> {
int count = 0;//失败重试机制当失败重试一次再次失败则记录到数据库中
while (true) {
count++;
// String taskName = newFolders.get(taskId);
String fileName = "goods_" + (taskId + 1) +"_"+fileEndFix+ ".txt";
String sycnDataId=DigestUtils.md5Hex(newFolders.get(taskId) + fileName);
JSONArray jsonArray = getSyncDataContent(finalSyncDataMap,sycnDataId);
try {
baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps,automatic);
success.getAndIncrement();
threadNum.decrementAndGet();
return "成功" + taskId;
} catch (Exception e) {
if (count < 2) {
//Thread.sleep(100);
continue;
try {
List<String> newFolders = new ArrayList<>();
folders.forEach(page -> {
String newfolder = new FileUtils().getSyncTypeFlag(syncType, clientPath) + storeId + FileUtils.pathSeparator + page + FileUtils.pathSeparator;
newFolders.add(newfolder);
});
//upLoadZipToOss(newFolders.get(0));//上传文件到cos
// dowloadAndUnZip(newFolders.get(0));//读取cos文件回本地
syncPrimaryKey();
shopBaseProductCategoryService.clearCategoryCache(storeId);
// shopProductSpecItemService.clearExistItem(Integer.valueOf(storeId));
ExecutorService executor = Executors.newFixedThreadPool(6);
List<Future<?>> futures = new ArrayList<>();
// 提交任务
AtomicInteger success = new AtomicInteger(0);
AtomicInteger fails = new AtomicInteger(0);
List<String> failFolders = new ArrayList<>();
List<String> failMessage = new ArrayList<>();
shopBaseProductCategoryService.getCategoryListByStoreId(storeId);
// getBrandMapByStoreId()
Map<String, Integer> brandMaps = productBrandService.getBrandMapByStoreId(storeId);
QueryWrapper<StoreDbConfig> storeDbConfigQueryWrapper = new QueryWrapper<>();
storeDbConfigQueryWrapper.eq("store_id", storeId);
StoreDbConfig storeDbConfig = storeDbConfigService.getOne(storeDbConfigQueryWrapper);
// Map shopProductSpecItemMap = shopProductSpecItemService.getExistItem(Integer.valueOf(storeId));//切割缓存
// Map productMappingMap = productMappingService.getProductMapping(Integer.valueOf(storeId));//切割缓存
// Map ShopBaseProductSpecMap = baseProductSpecService.getShopBaseProductSpecMap(Integer.valueOf(storeId));//切割商品缓存
// long seconds=System.currentTimeMillis();
// Date productSaleTime=Date.from(Instant.now().plusSeconds(seconds));
String fileIndex=folders.get(0);
String fileEndFix;
if (fileIndex.length()>1){
fileEndFix=fileIndex.split("_")[1];
} else {
fileEndFix = "";
}
List<String> fileNames=new ArrayList<>();
for(int i=0;i<newFolders.size();i++){
String fileName = "goods_" + (i + 1)+"_"+fileEndFix+ ".txt";
fileNames.add(fileName);
}
QueryWrapper<SyncStoreData> syncStoreDataQueryWrapper = new QueryWrapper<>();
syncStoreDataQueryWrapper.eq("store_id", storeId);
syncStoreDataQueryWrapper.eq("status", 0);
syncStoreDataQueryWrapper.in("file_name",fileNames);
List<SyncStoreData> syncStoreDataList= syncStoreDataService.list(syncStoreDataQueryWrapper);
Map<String,String> syncDataMap= syncStoreDataList.stream().collect(Collectors.toMap(SyncStoreData::getSyncStoreDataId, SyncStoreData::getContent));
for (int i = 0; i < newFolders.size(); i++) {
final int taskId = i;
final String isNegativeAllowed = storeDbConfig.getIsNegativeAllowed();
final Integer automatic=storeDbConfig.getAutomatic();
//String priorityMode = storeDbConfig.getPriorityMode();
//boolean isUpdatePrice= ObjectUtil.isNotEmpty(storeDbConfig.getRefreshTime());//是否更新所有切割价格
threadNum.incrementAndGet();
Map<String, String> finalSyncDataMap = syncDataMap;
futures.add(executor.submit(() -> {
int count = 0;//失败重试机制当失败重试一次再次失败则记录到数据库中
while (true) {
count++;
// String taskName = newFolders.get(taskId);
String fileName = "goods_" + (taskId + 1) +"_"+fileEndFix+ ".txt";
String sycnDataId=DigestUtils.md5Hex(newFolders.get(taskId) + fileName);
JSONArray jsonArray = getSyncDataContent(finalSyncDataMap,sycnDataId);
try {
baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps,automatic);
success.getAndIncrement();
threadNum.decrementAndGet();
return "成功" + taskId;
} catch (Exception e) {
if (count < 2) {
//Thread.sleep(100);
continue;
}
fails.getAndIncrement();
failFolders.add(newFolders.get(taskId) + fileName);
failMessage.add(taskId + "_" + e.getMessage());
return "失败" + newFolders.get(taskId);
}
}
fails.getAndIncrement();
failFolders.add(newFolders.get(taskId) + fileName);
failMessage.add(taskId + "_" + e.getMessage());
return "失败" + newFolders.get(taskId);
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
System.out.println("任务结果: " + future.get());
} catch (Exception e) {
System.err.println("任务执行异常: " + e.getMessage());
}
}
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
System.out.println("任务结果: " + future.get());
} catch (Exception e) {
System.err.println("任务执行异常: " + e.getMessage());
executor.shutdown();
//记录到数据库
syncPrimaryKey();
shopBaseProductCategoryService.clearCategoryCache(storeId);
productBrandService.clearBrandMapByStoreId(storeId);
List<SyncFileLog> syncFileLogs = new ArrayList<>();
for (int i = 0; i < failFolders.size(); i++) {
String path = failFolders.get(i);
String taskId = failMessage.get(i).split("_")[0];
SyncFileLog syncFileLog = new SyncFileLog();
syncFileLog.setSyncType(syncType);
syncFileLog.setFileName(path.substring(path.lastIndexOf(FileUtils.pathSeparator) + 1));
syncFileLog.setSyncStatus(DicEnum.FAILED.getCode());
syncFileLog.setSyncTaskId(taskId);
syncFileLog.setSyncStoreId(storeId);
syncFileLog.setFilePath(path);
syncFileLog.setErrorMessage(failMessage.get(i));
syncFileLog.setSourceSystem(DicEnum.SOURCE_SYSTEM_TYPE_1005.getCode());
syncFileLog.setTargetSystem(DicEnum.SOURCE_SYSTEM_TYPE_SELF.getCode());
syncFileLogs.add(syncFileLog);
}
if (CollUtil.isNotEmpty(syncFileLogs)) {
syncFileLogService.saveBatch(syncFileLogs, syncFileLogs.size());
}
syncStoreDataList=syncStoreDataList.stream().peek(syncStoreData -> {
syncStoreData.setStatus("1");
}).collect(Collectors.toList());
if(!syncStoreDataList.isEmpty()){
syncStoreDataService.updateBatchById(syncStoreDataList,syncStoreDataList.size());//处理结束后更新已处理
}
//todo 定时清理文件建议用服务器脚本
logger.info("执行成功{}个文件,失败{}个文件", success, fails);
logger.info("同步商品数据执行结束");
//更新当前的获取时间用户客户端获取
// try {
//
// if (ObjectUtil.isNotEmpty(storeDbConfig)) {
// storeDbConfig.setRefreshTime(date);
// storeDbConfigService.saveOrUpdate(storeDbConfig);
// }
// } catch (RuntimeException e) {
// logger.error("同步时间失败" + e.getMessage());
// }
productMappingService.syncAllProductMapping(Integer.valueOf(storeId),DicEnum.YESORNO_0.getCode());
if(ObjectUtil.isNull(storeDbConfig.getRefreshTime())){
syncShopImages(Integer.valueOf(storeId));//同时商品图库数据
}
}catch (Exception e){
logger.info("执行失败:{}",e.getMessage());
}finally {
// 最终检查并释放锁确保锁一定被释放
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
logger.info("商品同步锁释放");
lock.unlock();
}
}
executor.shutdown();
//记录到数据库
syncPrimaryKey();
shopBaseProductCategoryService.clearCategoryCache(storeId);
productBrandService.clearBrandMapByStoreId(storeId);
List<SyncFileLog> syncFileLogs = new ArrayList<>();
for (int i = 0; i < failFolders.size(); i++) {
String path = failFolders.get(i);
String taskId = failMessage.get(i).split("_")[0];
SyncFileLog syncFileLog = new SyncFileLog();
syncFileLog.setSyncType(syncType);
syncFileLog.setFileName(path.substring(path.lastIndexOf(FileUtils.pathSeparator) + 1));
syncFileLog.setSyncStatus(DicEnum.FAILED.getCode());
syncFileLog.setSyncTaskId(taskId);
syncFileLog.setSyncStoreId(storeId);
syncFileLog.setFilePath(path);
syncFileLog.setErrorMessage(failMessage.get(i));
syncFileLog.setSourceSystem(DicEnum.SOURCE_SYSTEM_TYPE_1005.getCode());
syncFileLog.setTargetSystem(DicEnum.SOURCE_SYSTEM_TYPE_SELF.getCode());
syncFileLogs.add(syncFileLog);
}
if (CollUtil.isNotEmpty(syncFileLogs)) {
syncFileLogService.saveBatch(syncFileLogs, syncFileLogs.size());
}
syncStoreDataList=syncStoreDataList.stream().peek(syncStoreData -> {
syncStoreData.setStatus("1");
}).collect(Collectors.toList());
if(!syncStoreDataList.isEmpty()){
syncStoreDataService.updateBatchById(syncStoreDataList,syncStoreDataList.size());//处理结束后更新已处理
}
//todo 定时清理文件建议用服务器脚本
logger.info("执行成功{}个文件,失败{}个文件", success, fails);
logger.info("同步商品数据执行结束");
//更新当前的获取时间用户客户端获取
// try {
//
// if (ObjectUtil.isNotEmpty(storeDbConfig)) {
// storeDbConfig.setRefreshTime(date);
// storeDbConfigService.saveOrUpdate(storeDbConfig);
// }
// } catch (RuntimeException e) {
// logger.error("同步时间失败" + e.getMessage());
// }
productMappingService.syncAllProductMapping(Integer.valueOf(storeId),DicEnum.YESORNO_0.getCode());
if(ObjectUtil.isNull(storeDbConfig.getRefreshTime())){
syncShopImages(Integer.valueOf(storeId));//同时商品图库数据
}
redisService.del(key);
}
@Override
@ -736,38 +756,19 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
/**
* 保证单线程执行同步数据
* @param storeId
* @param lock
* @return
*/
public synchronized boolean checkeckIsLock(String storeId){
logger.info("等待时间开始");
String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
Map isLockMap= (Map) redisService.get(key);
if(isLockMap!=null){
String result= (String) isLockMap.get("batchGoodIsLock");
if("true".equals(result)){
try {
logger.info("进入等待时间");
Thread.sleep(1000 * 60 * 1);//五分钟
return checkeckIsLock(storeId);
} catch (InterruptedException e) {
logger.info("等待异常:{}",e.getMessage());
return false;
}
}
public synchronized boolean checkeckIsLock(RLock lock){
try {
boolean resul=lock.tryLock(60, 120, TimeUnit.SECONDS);
logger.info("等待获取锁成功");
return resul;
}catch (InterruptedException e){
return false;
}
logger.info("等待时间结束");
return true;
}
public synchronized void setLock(String storeId,String value){
String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
Map<String,String> map= new HashMap<>();
map.put("batchGoodIsLock",value);
redisService.set(key,map,600000);
}
@Override
public ResponseEntity<Resource> downloadToClient(String primaryKey, String clienVersionName) {
logger.info("primaryKey:{}", primaryKey);