diff --git a/mall-common/src/main/java/com/suisung/mall/common/annotation/DistributedLock.java b/mall-common/src/main/java/com/suisung/mall/common/annotation/DistributedLock.java new file mode 100644 index 00000000..062b80c3 --- /dev/null +++ b/mall-common/src/main/java/com/suisung/mall/common/annotation/DistributedLock.java @@ -0,0 +1,39 @@ +package com.suisung.mall.common.annotation; + +import java.lang.annotation.*; +import java.util.concurrent.TimeUnit; + +/** + * 分布式锁注解 + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface DistributedLock { + /** + * 锁的key,支持SpEL表达式,下面是示例 + * key = "'ACCOUNT_LOCK:' + #batchSize", // 使用SpEL表达式,锁key包含参数 + * public List getBatchUserAccountBaseId(int batchSize) + */ + String key(); + + /** + * 等待时间(秒),默认0-不等待 + */ + long waitTime() default 0; + + /** + * 锁持有时间(秒),默认30秒,-1表示使用看门狗机制 + */ + long leaseTime() default 30; + + /** + * 时间单位,默认秒 + */ + TimeUnit timeUnit() default TimeUnit.SECONDS; + + /** + * 获取锁失败时的错误消息 + */ + String errorMsg() default "系统繁忙,请稍后再试"; +} diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/Aspect/DistributedLockAspect.java b/mall-shop/src/main/java/com/suisung/mall/shop/Aspect/DistributedLockAspect.java new file mode 100644 index 00000000..114a4c5b --- /dev/null +++ b/mall-shop/src/main/java/com/suisung/mall/shop/Aspect/DistributedLockAspect.java @@ -0,0 +1,110 @@ +package com.suisung.mall.shop.Aspect; + +import com.suisung.mall.common.annotation.DistributedLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.DefaultParameterNameDiscoverer; +import org.springframework.core.ParameterNameDiscoverer; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.stereotype.Component; + +import java.lang.reflect.Method; + +/** + * 分布式锁切面 + */ +@Slf4j +@Aspect +@Component +public class DistributedLockAspect { + @Autowired + private RedissonClient redissonClient; + + private final ExpressionParser expressionParser = new SpelExpressionParser(); + private final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer(); + + @Around("@annotation(distributedLock)") + public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable { + // 解析锁的key,支持SpEL表达式 + String lockKey = parseLockKey(joinPoint, distributedLock); + + RLock lock = redissonClient.getLock(lockKey); + boolean isLocked = false; + + try { + // 尝试获取锁 + if (distributedLock.leaseTime() == -1) { + // 使用看门狗机制 + isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.timeUnit()); + } else { + isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit()); + } + + if (!isLocked) { + log.warn("获取分布式锁失败,lockKey: {}", lockKey); + throw new RuntimeException(distributedLock.errorMsg()); + } + + log.debug("成功获取分布式锁,lockKey: {}", lockKey); + // 执行原方法 + return joinPoint.proceed(); + + } finally { + // 释放锁 + if (isLocked && lock.isHeldByCurrentThread()) { + lock.unlock(); + log.debug("释放分布式锁,lockKey: {}", lockKey); + } + } + } + + /** + * 解析锁的key,支持SpEL表达式 + */ + private String parseLockKey(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) { + String keyExpression = distributedLock.key(); + + // 如果key不包含SpEL表达式,直接返回 + if (!keyExpression.contains("#")) { + return keyExpression; + } + + try { + MethodSignature signature = (MethodSignature) joinPoint.getSignature(); + Method method = signature.getMethod(); + Object[] args = joinPoint.getArgs(); + String[] parameterNames = parameterNameDiscoverer.getParameterNames(method); + + EvaluationContext context = new StandardEvaluationContext(); + if (parameterNames != null) { + for (int i = 0; i < parameterNames.length; i++) { + context.setVariable(parameterNames[i], args[i]); + } + } + + // 设置一些常用变量 + context.setVariable("methodName", method.getName()); + context.setVariable("className", method.getDeclaringClass().getSimpleName()); + + Expression expression = expressionParser.parseExpression(keyExpression); + String result = expression.getValue(context, String.class); + + return StringUtils.isNotBlank(result) ? result : keyExpression; + + } catch (Exception e) { + log.warn("解析锁key表达式失败,使用原表达式: {}, error: {}", keyExpression, e.getMessage()); + return keyExpression; + } + } +} diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/config/RedissionConfig.java b/mall-shop/src/main/java/com/suisung/mall/shop/config/RedissionConfig.java index 7664aa85..bb3a69f8 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/config/RedissionConfig.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/config/RedissionConfig.java @@ -37,7 +37,13 @@ public class RedissionConfig { config.useSingleServer().setAddress("redis://" + host + ":" + port); config.useSingleServer().setDatabase(database); config.useSingleServer().setPassword(password); - + config.useSingleServer().setTimeout(1000*120);// 命令超时时间(毫秒) + config.useSingleServer().setRetryAttempts(5);// 重试次数 + config.useSingleServer().setRetryInterval(2000); // 重试间隔(毫秒) + // 设置netty线程数 + config.setNettyThreads(32); + // 增加处理线程数 + config.setThreads(16); //2、根据Config创建出RedissonClient实例 return Redisson.create(config); } diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/number/service/impl/ShopNumberSeqServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/number/service/impl/ShopNumberSeqServiceImpl.java index ea6923ee..4e8c55d8 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/number/service/impl/ShopNumberSeqServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/number/service/impl/ShopNumberSeqServiceImpl.java @@ -2,6 +2,8 @@ package com.suisung.mall.shop.number.service.impl; import cn.hutool.core.date.DateUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.suisung.mall.common.annotation.DistributedLock; +import com.suisung.mall.common.exception.ApiException; import com.suisung.mall.common.feignService.AccountService; import com.suisung.mall.common.modules.base.ShopBaseProductSpec; import com.suisung.mall.common.modules.library.LibraryProduct; @@ -19,6 +21,8 @@ import com.suisung.mall.shop.product.service.ShopProductBaseService; import com.suisung.mall.shop.product.service.ShopProductItemService; import com.suisung.mall.shop.product.service.ShopProductSpecItemService; import com.suisung.mall.shop.sync.keymanage.RedisKey; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -29,6 +33,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -71,6 +76,9 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl 1; - while (!flag) { - try { - Thread.sleep(3600); - } catch (InterruptedException e) { - log.error("checkPrimaryKey枷锁失败--" + e.getMessage()); - break; - } - flag = shopNumberSeqMapper.findNumberFromShopBaseAndItem(new ShopNumberSeq()).size() > 1; - } - - } - /** * 批量获取id * @@ -175,11 +169,18 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl batchCreateNextNo(String seqName, int batchSize) { if (batchSize <= 0) { return Collections.emptyList(); } long number = 0; + if (null == redisService.get(String.format(CACHE_PREFIX, seqName))) { syncPrimaryKey(); // 1. 获取当前序列值 @@ -200,7 +201,6 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl getBatchSpecId(int batchSize) { + public List getBatchSpecId(int batchSize) { + // 定义锁的key,这个key在所有服务实例中必须一致 + String lockKey = "LOCK:" + RedisKey.STOREDATASPECID; + // 2. 获取分布式锁对象 + RLock lock = redissonClient.getLock(lockKey); + boolean isLocked = false; int start = 0; - if (null != redisService.get(RedisKey.STOREDATASPECID)) { - start = (Integer) redisService.get(RedisKey.STOREDATASPECID); - redisService.set(RedisKey.STOREDATASPECID, start + batchSize); + try { + isLocked = lock.tryLock(2, 30, TimeUnit.SECONDS); + if (!isLocked) { + // 获取锁失败,可以根据业务逻辑进行重试或抛出异常 + throw new RuntimeException("系统繁忙,请稍后再试"); + } + log.info("成功获得锁:{}",lockKey); + if (null != redisService.get(RedisKey.STOREDATASPECID)) { + start = (Integer) redisService.get(RedisKey.STOREDATASPECID); + redisService.set(RedisKey.STOREDATASPECID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.select("max(spec_id) as spec_id"); + ShopBaseProductSpec shopBaseProductSpec = shopBaseProductSpecService.getOne(queryWrapper); + if (null != shopBaseProductSpec) { + start = shopBaseProductSpec.getSpec_id(); + redisService.set(RedisKey.STOREDATASPECID, start + batchSize); + } + if (start == 0) { + redisService.set(RedisKey.STOREDATASPECID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } catch (Exception e) { + throw new ApiException(e); + }finally { + // 5. 最终检查并释放锁,确保锁一定被释放 + if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + log.info("成功释放锁:{}",lockKey); } - QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.select("max(spec_id) as spec_id"); - ShopBaseProductSpec shopBaseProductSpec = shopBaseProductSpecService.getOne(queryWrapper); - if (null != shopBaseProductSpec) { - start = shopBaseProductSpec.getSpec_id(); - redisService.set(RedisKey.STOREDATASPECID, start + batchSize); - } - if (start == 0) { - redisService.set(RedisKey.STOREDATASPECID, start + batchSize); - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); - } - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } /** @@ -297,25 +319,46 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl getBatchSpecItemId(int batchSize) { + public List getBatchSpecItemId(int batchSize) { + // 定义锁的key,这个key在所有服务实例中必须一致 + String lockKey = "LOCK:" + RedisKey.STOREDATASPECITEMID; + // 2. 获取分布式锁对象 + RLock lock = redissonClient.getLock(lockKey); + boolean isLocked = false; int start = 0; - if (null != redisService.get(RedisKey.STOREDATASPECITEMID)) { - start = (Integer) redisService.get(RedisKey.STOREDATASPECITEMID); - redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize); + try { + isLocked=lock.tryLock(2,30,TimeUnit.SECONDS); + if (!isLocked) { + // 获取锁失败,可以根据业务逻辑进行重试或抛出异常 + throw new ApiException("系统繁忙,请稍后再试"); + } + log.info("成功获得锁:{}",lockKey); + if (null != redisService.get(RedisKey.STOREDATASPECITEMID)) { + start = (Integer) redisService.get(RedisKey.STOREDATASPECITEMID); + redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.select("max(spec_item_id) as spec_item_id"); + ShopProductSpecItem shopProductSpecItem = shopProductSpecItemService.getOne(queryWrapper); + if (null != shopProductSpecItem) { + start = shopProductSpecItem.getSpec_item_id(); + redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize); + } + if (start == 0) { + redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } catch (InterruptedException e) { + throw new ApiException(e); + }finally { + // 5. 最终检查并释放锁,确保锁一定被释放 + if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + log.info("成功释放锁:{}",lockKey); } - QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.select("max(spec_item_id) as spec_item_id"); - ShopProductSpecItem shopProductSpecItem = shopProductSpecItemService.getOne(queryWrapper); - if (null != shopProductSpecItem) { - start = shopProductSpecItem.getSpec_item_id(); - redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize); - } - if (start == 0) { - redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize); - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); - } - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); } /** @@ -325,23 +368,46 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl getBatchUserAccountBaseId(int batchSize) { - int start = 0; - if (null != redisService.get(RedisKey.STOREDATACCOUNTBASEID)) { - start = (Integer) redisService.get(RedisKey.STOREDATACCOUNTBASEID); - redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize); + public List getBatchUserAccountBaseId(int batchSize) { + // 定义锁的key,这个key在所有服务实例中必须一致 + String lockKey = "LOCK:" + RedisKey.STOREDATACCOUNTBASEID; + // 2. 获取分布式锁对象 + RLock lock = redissonClient.getLock(lockKey); + boolean isLocked = false; + try { + isLocked = lock.tryLock(2, 10, TimeUnit.SECONDS); + if (!isLocked) { + // 获取锁失败,可以根据业务逻辑进行重试或抛出异常 + throw new RuntimeException("系统繁忙,请稍后再试"); + } + log.info("成功获得锁:{}",lockKey); + int start = 0; + if (null != redisService.get(RedisKey.STOREDATACCOUNTBASEID)) { + start = (Integer) redisService.get(RedisKey.STOREDATACCOUNTBASEID); + redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } + Integer maxId = accountService.getAccountMaxId(); + if (null != maxId) { + start = maxId; + redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize); + } + if (start == 0) { + redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("getBatchUserAccountBaseId获取锁时被中断", e); + }finally { + // 5. 最终检查并释放锁,确保锁一定被释放 + if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + log.info("成功释放锁:{}",lockKey); } - Integer maxId = accountService.getAccountMaxId(); - if (null != maxId) { - start = maxId; - redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize); - } - if (start == 0) { - redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize); - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); - } - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } @@ -352,25 +418,47 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl getBatchLibraryProductId(int batchSize) { - int start = 0; - if (null != redisService.get(RedisKey.STOREDATALIBRARYID)) { - start = (Integer) redisService.get(RedisKey.STOREDATALIBRARYID); - redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize); + public List getBatchLibraryProductId(int batchSize) { + // 定义锁的key,这个key在所有服务实例中必须一致 + String lockKey = "LOCK:" + RedisKey.STOREDATALIBRARYID; + // 2. 获取分布式锁对象 + RLock lock = redissonClient.getLock(lockKey); + boolean isLocked = false; + try { + isLocked = lock.tryLock(2, 10, TimeUnit.SECONDS); + int start = 0; + if (!isLocked) { + // 获取锁失败,可以根据业务逻辑进行重试或抛出异常 + throw new RuntimeException("系统繁忙,请稍后再试"); + } + log.info("成功获得锁:{}",lockKey); + if (null != redisService.get(RedisKey.STOREDATALIBRARYID)) { + start = (Integer) redisService.get(RedisKey.STOREDATALIBRARYID); + redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.select("max(id) as id"); + LibraryProduct libraryProduct = libraryProductService.getOne(queryWrapper); + if (null != libraryProduct) { + start = Math.toIntExact(libraryProduct.getId()); + redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize); + } + if (start == 0) { + redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize); + return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + }finally { + // 5. 最终检查并释放锁,确保锁一定被释放 + if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + log.info("成功释放锁:{}",lockKey); } - QueryWrapper queryWrapper = new QueryWrapper<>(); - queryWrapper.select("max(id) as id"); - LibraryProduct libraryProduct = libraryProductService.getOne(queryWrapper); - if (null != libraryProduct) { - start = Math.toIntExact(libraryProduct.getId()); - redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize); - } - if (start == 0) { - redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize); - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); - } - return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList()); + } @Override diff --git a/mall-shop/src/main/java/com/suisung/mall/shop/sync/service/impl/SyncThirdDataServiceImpl.java b/mall-shop/src/main/java/com/suisung/mall/shop/sync/service/impl/SyncThirdDataServiceImpl.java index 0a0ea35f..1144d7c8 100644 --- a/mall-shop/src/main/java/com/suisung/mall/shop/sync/service/impl/SyncThirdDataServiceImpl.java +++ b/mall-shop/src/main/java/com/suisung/mall/shop/sync/service/impl/SyncThirdDataServiceImpl.java @@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder; import com.qcloud.cos.model.COSObjectSummary; import com.suisung.mall.common.adapter.BigDecimalTypeAdapter; import com.suisung.mall.common.api.CommonResult; -import com.suisung.mall.common.api.ResultCode; import com.suisung.mall.common.api.StateCode; import com.suisung.mall.common.enums.DicEnum; import com.suisung.mall.common.exception.ApiException; @@ -72,10 +71,11 @@ import com.suisung.mall.shop.sync.dto.ActiveShopInfo; import com.suisung.mall.shop.sync.dto.BrandModel; import com.suisung.mall.shop.sync.keymanage.RedisKey; import com.suisung.mall.shop.sync.service.*; -import com.sun.corba.se.impl.orbutil.concurrent.Sync; import io.seata.spring.annotation.GlobalTransactional; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections4.ListUtils; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -189,8 +189,11 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements @Autowired private SyncStoreDataService syncStoreDataService; + @Autowired - private RedisService redisService; + private RedissonClient redissonClient; + + /** * 批量保存商品的分类 * @@ -579,153 +582,170 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements logger.info("没有商品数据"); return; } - String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId; + // redisService.del(key); - if(!checkeckIsLock(storeId)){ - logger.info("批量同步商品等待时间异常结束"); - return; - } - setLock(storeId,"true"); - List newFolders = new ArrayList<>(); - folders.forEach(page -> { - String newfolder = new FileUtils().getSyncTypeFlag(syncType, clientPath) + storeId + FileUtils.pathSeparator + page + FileUtils.pathSeparator; - newFolders.add(newfolder); - }); - - //upLoadZipToOss(newFolders.get(0));//上传文件到cos - // dowloadAndUnZip(newFolders.get(0));//读取cos文件回本地 - syncPrimaryKey(); - shopBaseProductCategoryService.clearCategoryCache(storeId); - // shopProductSpecItemService.clearExistItem(Integer.valueOf(storeId)); - ExecutorService executor = Executors.newFixedThreadPool(6); - List> futures = new ArrayList<>(); - // 提交任务 - AtomicInteger success = new AtomicInteger(0); - AtomicInteger fails = new AtomicInteger(0); - List failFolders = new ArrayList<>(); - List failMessage = new ArrayList<>(); - shopBaseProductCategoryService.getCategoryListByStoreId(storeId); - // getBrandMapByStoreId() - Map brandMaps = productBrandService.getBrandMapByStoreId(storeId); - QueryWrapper storeDbConfigQueryWrapper = new QueryWrapper<>(); - storeDbConfigQueryWrapper.eq("store_id", storeId); - StoreDbConfig storeDbConfig = storeDbConfigService.getOne(storeDbConfigQueryWrapper); -// Map shopProductSpecItemMap = shopProductSpecItemService.getExistItem(Integer.valueOf(storeId));//切割缓存 -// Map productMappingMap = productMappingService.getProductMapping(Integer.valueOf(storeId));//切割缓存 -// Map ShopBaseProductSpecMap = baseProductSpecService.getShopBaseProductSpecMap(Integer.valueOf(storeId));//切割商品缓存 -// long seconds=System.currentTimeMillis(); -// Date productSaleTime=Date.from(Instant.now().plusSeconds(seconds)); - - String fileIndex=folders.get(0); - String fileEndFix; - if (fileIndex.length()>1){ - fileEndFix=fileIndex.split("_")[1]; - } else { - fileEndFix = ""; + RLock lock =null; + String lockKey=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId; + try { + redissonClient.getKeys().count(); + }catch (Exception e){ + logger.info("锁异常"); + return; } - - List fileNames=new ArrayList<>(); - for(int i=0;i syncStoreDataQueryWrapper = new QueryWrapper<>(); - syncStoreDataQueryWrapper.eq("store_id", storeId); - syncStoreDataQueryWrapper.eq("status", 0); - syncStoreDataQueryWrapper.in("file_name",fileNames); - List syncStoreDataList= syncStoreDataService.list(syncStoreDataQueryWrapper); - Map syncDataMap= syncStoreDataList.stream().collect(Collectors.toMap(SyncStoreData::getSyncStoreDataId, SyncStoreData::getContent)); - for (int i = 0; i < newFolders.size(); i++) { - final int taskId = i; - final String isNegativeAllowed = storeDbConfig.getIsNegativeAllowed(); - final Integer automatic=storeDbConfig.getAutomatic(); - //String priorityMode = storeDbConfig.getPriorityMode(); - //boolean isUpdatePrice= ObjectUtil.isNotEmpty(storeDbConfig.getRefreshTime());//是否更新所有切割价格 - threadNum.incrementAndGet(); - Map finalSyncDataMap = syncDataMap; - futures.add(executor.submit(() -> { - int count = 0;//失败重试机制,当失败重试一次,再次失败则记录到数据库中 - while (true) { - count++; - // String taskName = newFolders.get(taskId); - String fileName = "goods_" + (taskId + 1) +"_"+fileEndFix+ ".txt"; - String sycnDataId=DigestUtils.md5Hex(newFolders.get(taskId) + fileName); - JSONArray jsonArray = getSyncDataContent(finalSyncDataMap,sycnDataId); - try { - baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps,automatic); - success.getAndIncrement(); - threadNum.decrementAndGet(); - return "成功" + taskId; - } catch (Exception e) { - if (count < 2) { - //Thread.sleep(100); - continue; + try { + List newFolders = new ArrayList<>(); + folders.forEach(page -> { + String newfolder = new FileUtils().getSyncTypeFlag(syncType, clientPath) + storeId + FileUtils.pathSeparator + page + FileUtils.pathSeparator; + newFolders.add(newfolder); + }); + + //upLoadZipToOss(newFolders.get(0));//上传文件到cos + // dowloadAndUnZip(newFolders.get(0));//读取cos文件回本地 + syncPrimaryKey(); + shopBaseProductCategoryService.clearCategoryCache(storeId); + // shopProductSpecItemService.clearExistItem(Integer.valueOf(storeId)); + ExecutorService executor = Executors.newFixedThreadPool(6); + List> futures = new ArrayList<>(); + // 提交任务 + AtomicInteger success = new AtomicInteger(0); + AtomicInteger fails = new AtomicInteger(0); + List failFolders = new ArrayList<>(); + List failMessage = new ArrayList<>(); + shopBaseProductCategoryService.getCategoryListByStoreId(storeId); + // getBrandMapByStoreId() + Map brandMaps = productBrandService.getBrandMapByStoreId(storeId); + QueryWrapper storeDbConfigQueryWrapper = new QueryWrapper<>(); + storeDbConfigQueryWrapper.eq("store_id", storeId); + StoreDbConfig storeDbConfig = storeDbConfigService.getOne(storeDbConfigQueryWrapper); + // Map shopProductSpecItemMap = shopProductSpecItemService.getExistItem(Integer.valueOf(storeId));//切割缓存 + // Map productMappingMap = productMappingService.getProductMapping(Integer.valueOf(storeId));//切割缓存 + // Map ShopBaseProductSpecMap = baseProductSpecService.getShopBaseProductSpecMap(Integer.valueOf(storeId));//切割商品缓存 + // long seconds=System.currentTimeMillis(); + // Date productSaleTime=Date.from(Instant.now().plusSeconds(seconds)); + + String fileIndex=folders.get(0); + String fileEndFix; + if (fileIndex.length()>1){ + fileEndFix=fileIndex.split("_")[1]; + } else { + fileEndFix = ""; + } + + List fileNames=new ArrayList<>(); + for(int i=0;i syncStoreDataQueryWrapper = new QueryWrapper<>(); + syncStoreDataQueryWrapper.eq("store_id", storeId); + syncStoreDataQueryWrapper.eq("status", 0); + syncStoreDataQueryWrapper.in("file_name",fileNames); + List syncStoreDataList= syncStoreDataService.list(syncStoreDataQueryWrapper); + Map syncDataMap= syncStoreDataList.stream().collect(Collectors.toMap(SyncStoreData::getSyncStoreDataId, SyncStoreData::getContent)); + for (int i = 0; i < newFolders.size(); i++) { + final int taskId = i; + final String isNegativeAllowed = storeDbConfig.getIsNegativeAllowed(); + final Integer automatic=storeDbConfig.getAutomatic(); + //String priorityMode = storeDbConfig.getPriorityMode(); + //boolean isUpdatePrice= ObjectUtil.isNotEmpty(storeDbConfig.getRefreshTime());//是否更新所有切割价格 + threadNum.incrementAndGet(); + Map finalSyncDataMap = syncDataMap; + futures.add(executor.submit(() -> { + int count = 0;//失败重试机制,当失败重试一次,再次失败则记录到数据库中 + while (true) { + count++; + // String taskName = newFolders.get(taskId); + String fileName = "goods_" + (taskId + 1) +"_"+fileEndFix+ ".txt"; + String sycnDataId=DigestUtils.md5Hex(newFolders.get(taskId) + fileName); + JSONArray jsonArray = getSyncDataContent(finalSyncDataMap,sycnDataId); + try { + baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps,automatic); + success.getAndIncrement(); + threadNum.decrementAndGet(); + return "成功" + taskId; + } catch (Exception e) { + if (count < 2) { + //Thread.sleep(100); + continue; + } + fails.getAndIncrement(); + failFolders.add(newFolders.get(taskId) + fileName); + failMessage.add(taskId + "_" + e.getMessage()); + return "失败" + newFolders.get(taskId); + } } - fails.getAndIncrement(); - failFolders.add(newFolders.get(taskId) + fileName); - failMessage.add(taskId + "_" + e.getMessage()); - return "失败" + newFolders.get(taskId); + })); + } + // 等待所有任务完成 + for (Future future : futures) { + try { + System.out.println("任务结果: " + future.get()); + } catch (Exception e) { + System.err.println("任务执行异常: " + e.getMessage()); } } - })); - } - // 等待所有任务完成 - for (Future future : futures) { - try { - System.out.println("任务结果: " + future.get()); - } catch (Exception e) { - System.err.println("任务执行异常: " + e.getMessage()); + executor.shutdown(); + //记录到数据库 + syncPrimaryKey(); + shopBaseProductCategoryService.clearCategoryCache(storeId); + productBrandService.clearBrandMapByStoreId(storeId); + List syncFileLogs = new ArrayList<>(); + for (int i = 0; i < failFolders.size(); i++) { + String path = failFolders.get(i); + String taskId = failMessage.get(i).split("_")[0]; + SyncFileLog syncFileLog = new SyncFileLog(); + syncFileLog.setSyncType(syncType); + syncFileLog.setFileName(path.substring(path.lastIndexOf(FileUtils.pathSeparator) + 1)); + syncFileLog.setSyncStatus(DicEnum.FAILED.getCode()); + syncFileLog.setSyncTaskId(taskId); + syncFileLog.setSyncStoreId(storeId); + syncFileLog.setFilePath(path); + syncFileLog.setErrorMessage(failMessage.get(i)); + syncFileLog.setSourceSystem(DicEnum.SOURCE_SYSTEM_TYPE_1005.getCode()); + syncFileLog.setTargetSystem(DicEnum.SOURCE_SYSTEM_TYPE_SELF.getCode()); + syncFileLogs.add(syncFileLog); + } + if (CollUtil.isNotEmpty(syncFileLogs)) { + syncFileLogService.saveBatch(syncFileLogs, syncFileLogs.size()); + } + syncStoreDataList=syncStoreDataList.stream().peek(syncStoreData -> { + syncStoreData.setStatus("1"); + }).collect(Collectors.toList()); + if(!syncStoreDataList.isEmpty()){ + syncStoreDataService.updateBatchById(syncStoreDataList,syncStoreDataList.size());//处理结束后更新已处理 + } + //todo 定时清理文件,建议用服务器脚本 + logger.info("执行成功{}个文件,失败{}个文件", success, fails); + logger.info("同步商品数据执行结束"); + //更新当前的获取时间,用户客户端获取 + // try { + // + // if (ObjectUtil.isNotEmpty(storeDbConfig)) { + // storeDbConfig.setRefreshTime(date); + // storeDbConfigService.saveOrUpdate(storeDbConfig); + // } + // } catch (RuntimeException e) { + // logger.error("同步时间失败" + e.getMessage()); + // } + productMappingService.syncAllProductMapping(Integer.valueOf(storeId),DicEnum.YESORNO_0.getCode()); + if(ObjectUtil.isNull(storeDbConfig.getRefreshTime())){ + syncShopImages(Integer.valueOf(storeId));//同时商品图库数据 + } + }catch (Exception e){ + logger.info("执行失败:{}",e.getMessage()); + }finally { + // 最终检查并释放锁,确保锁一定被释放 + if (lock.isLocked() && lock.isHeldByCurrentThread()) { + logger.info("商品同步锁释放"); + lock.unlock(); } } - executor.shutdown(); - //记录到数据库 - syncPrimaryKey(); - shopBaseProductCategoryService.clearCategoryCache(storeId); - productBrandService.clearBrandMapByStoreId(storeId); - List syncFileLogs = new ArrayList<>(); - for (int i = 0; i < failFolders.size(); i++) { - String path = failFolders.get(i); - String taskId = failMessage.get(i).split("_")[0]; - SyncFileLog syncFileLog = new SyncFileLog(); - syncFileLog.setSyncType(syncType); - syncFileLog.setFileName(path.substring(path.lastIndexOf(FileUtils.pathSeparator) + 1)); - syncFileLog.setSyncStatus(DicEnum.FAILED.getCode()); - syncFileLog.setSyncTaskId(taskId); - syncFileLog.setSyncStoreId(storeId); - syncFileLog.setFilePath(path); - syncFileLog.setErrorMessage(failMessage.get(i)); - syncFileLog.setSourceSystem(DicEnum.SOURCE_SYSTEM_TYPE_1005.getCode()); - syncFileLog.setTargetSystem(DicEnum.SOURCE_SYSTEM_TYPE_SELF.getCode()); - syncFileLogs.add(syncFileLog); - } - if (CollUtil.isNotEmpty(syncFileLogs)) { - syncFileLogService.saveBatch(syncFileLogs, syncFileLogs.size()); - } - syncStoreDataList=syncStoreDataList.stream().peek(syncStoreData -> { - syncStoreData.setStatus("1"); - }).collect(Collectors.toList()); - if(!syncStoreDataList.isEmpty()){ - syncStoreDataService.updateBatchById(syncStoreDataList,syncStoreDataList.size());//处理结束后更新已处理 - } - //todo 定时清理文件,建议用服务器脚本 - logger.info("执行成功{}个文件,失败{}个文件", success, fails); - logger.info("同步商品数据执行结束"); - //更新当前的获取时间,用户客户端获取 -// try { -// -// if (ObjectUtil.isNotEmpty(storeDbConfig)) { -// storeDbConfig.setRefreshTime(date); -// storeDbConfigService.saveOrUpdate(storeDbConfig); -// } -// } catch (RuntimeException e) { -// logger.error("同步时间失败" + e.getMessage()); -// } - productMappingService.syncAllProductMapping(Integer.valueOf(storeId),DicEnum.YESORNO_0.getCode()); - if(ObjectUtil.isNull(storeDbConfig.getRefreshTime())){ - syncShopImages(Integer.valueOf(storeId));//同时商品图库数据 - } - - redisService.del(key); } @Override @@ -736,38 +756,19 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements /** * 保证单线程执行同步数据 - * @param storeId + * @param lock * @return */ - public synchronized boolean checkeckIsLock(String storeId){ - logger.info("等待时间开始"); - String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId; - Map isLockMap= (Map) redisService.get(key); - if(isLockMap!=null){ - String result= (String) isLockMap.get("batchGoodIsLock"); - if("true".equals(result)){ - try { - logger.info("进入等待时间"); - Thread.sleep(1000 * 60 * 1);//五分钟 - return checkeckIsLock(storeId); - } catch (InterruptedException e) { - logger.info("等待异常:{}",e.getMessage()); - return false; - } - } + public synchronized boolean checkeckIsLock(RLock lock){ + try { + boolean resul=lock.tryLock(60, 120, TimeUnit.SECONDS); + logger.info("等待获取锁成功"); + return resul; + }catch (InterruptedException e){ + return false; } - logger.info("等待时间结束"); - return true; } - public synchronized void setLock(String storeId,String value){ - String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId; - Map map= new HashMap<>(); - map.put("batchGoodIsLock",value); - redisService.set(key,map,600000); - } - - @Override public ResponseEntity downloadToClient(String primaryKey, String clienVersionName) { logger.info("primaryKey:{}", primaryKey);