新增redisson锁和redisson锁切面,保证分布式锁,分布式id
This commit is contained in:
parent
ff7be4b8ae
commit
31a28b418d
@ -0,0 +1,39 @@
|
|||||||
|
package com.suisung.mall.common.annotation;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 分布式锁注解
|
||||||
|
*/
|
||||||
|
@Target(ElementType.METHOD)
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
public @interface DistributedLock {
|
||||||
|
/**
|
||||||
|
* 锁的key,支持SpEL表达式,下面是示例
|
||||||
|
* key = "'ACCOUNT_LOCK:' + #batchSize", // 使用SpEL表达式,锁key包含参数
|
||||||
|
* public List<Integer> getBatchUserAccountBaseId(int batchSize)
|
||||||
|
*/
|
||||||
|
String key();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 等待时间(秒),默认0-不等待
|
||||||
|
*/
|
||||||
|
long waitTime() default 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 锁持有时间(秒),默认30秒,-1表示使用看门狗机制
|
||||||
|
*/
|
||||||
|
long leaseTime() default 30;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 时间单位,默认秒
|
||||||
|
*/
|
||||||
|
TimeUnit timeUnit() default TimeUnit.SECONDS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取锁失败时的错误消息
|
||||||
|
*/
|
||||||
|
String errorMsg() default "系统繁忙,请稍后再试";
|
||||||
|
}
|
||||||
@ -0,0 +1,110 @@
|
|||||||
|
package com.suisung.mall.shop.Aspect;
|
||||||
|
|
||||||
|
import com.suisung.mall.common.annotation.DistributedLock;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.aspectj.lang.ProceedingJoinPoint;
|
||||||
|
import org.aspectj.lang.annotation.Around;
|
||||||
|
import org.aspectj.lang.annotation.Aspect;
|
||||||
|
import org.aspectj.lang.reflect.MethodSignature;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.core.DefaultParameterNameDiscoverer;
|
||||||
|
import org.springframework.core.ParameterNameDiscoverer;
|
||||||
|
import org.springframework.expression.EvaluationContext;
|
||||||
|
import org.springframework.expression.Expression;
|
||||||
|
import org.springframework.expression.ExpressionParser;
|
||||||
|
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||||
|
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 分布式锁切面
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Aspect
|
||||||
|
@Component
|
||||||
|
public class DistributedLockAspect {
|
||||||
|
@Autowired
|
||||||
|
private RedissonClient redissonClient;
|
||||||
|
|
||||||
|
private final ExpressionParser expressionParser = new SpelExpressionParser();
|
||||||
|
private final ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
|
||||||
|
|
||||||
|
@Around("@annotation(distributedLock)")
|
||||||
|
public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
|
||||||
|
// 解析锁的key,支持SpEL表达式
|
||||||
|
String lockKey = parseLockKey(joinPoint, distributedLock);
|
||||||
|
|
||||||
|
RLock lock = redissonClient.getLock(lockKey);
|
||||||
|
boolean isLocked = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 尝试获取锁
|
||||||
|
if (distributedLock.leaseTime() == -1) {
|
||||||
|
// 使用看门狗机制
|
||||||
|
isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.timeUnit());
|
||||||
|
} else {
|
||||||
|
isLocked = lock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isLocked) {
|
||||||
|
log.warn("获取分布式锁失败,lockKey: {}", lockKey);
|
||||||
|
throw new RuntimeException(distributedLock.errorMsg());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("成功获取分布式锁,lockKey: {}", lockKey);
|
||||||
|
// 执行原方法
|
||||||
|
return joinPoint.proceed();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
// 释放锁
|
||||||
|
if (isLocked && lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
log.debug("释放分布式锁,lockKey: {}", lockKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解析锁的key,支持SpEL表达式
|
||||||
|
*/
|
||||||
|
private String parseLockKey(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) {
|
||||||
|
String keyExpression = distributedLock.key();
|
||||||
|
|
||||||
|
// 如果key不包含SpEL表达式,直接返回
|
||||||
|
if (!keyExpression.contains("#")) {
|
||||||
|
return keyExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
|
||||||
|
Method method = signature.getMethod();
|
||||||
|
Object[] args = joinPoint.getArgs();
|
||||||
|
String[] parameterNames = parameterNameDiscoverer.getParameterNames(method);
|
||||||
|
|
||||||
|
EvaluationContext context = new StandardEvaluationContext();
|
||||||
|
if (parameterNames != null) {
|
||||||
|
for (int i = 0; i < parameterNames.length; i++) {
|
||||||
|
context.setVariable(parameterNames[i], args[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置一些常用变量
|
||||||
|
context.setVariable("methodName", method.getName());
|
||||||
|
context.setVariable("className", method.getDeclaringClass().getSimpleName());
|
||||||
|
|
||||||
|
Expression expression = expressionParser.parseExpression(keyExpression);
|
||||||
|
String result = expression.getValue(context, String.class);
|
||||||
|
|
||||||
|
return StringUtils.isNotBlank(result) ? result : keyExpression;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("解析锁key表达式失败,使用原表达式: {}, error: {}", keyExpression, e.getMessage());
|
||||||
|
return keyExpression;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -37,7 +37,13 @@ public class RedissionConfig {
|
|||||||
config.useSingleServer().setAddress("redis://" + host + ":" + port);
|
config.useSingleServer().setAddress("redis://" + host + ":" + port);
|
||||||
config.useSingleServer().setDatabase(database);
|
config.useSingleServer().setDatabase(database);
|
||||||
config.useSingleServer().setPassword(password);
|
config.useSingleServer().setPassword(password);
|
||||||
|
config.useSingleServer().setTimeout(1000*120);// 命令超时时间(毫秒)
|
||||||
|
config.useSingleServer().setRetryAttempts(5);// 重试次数
|
||||||
|
config.useSingleServer().setRetryInterval(2000); // 重试间隔(毫秒)
|
||||||
|
// 设置netty线程数
|
||||||
|
config.setNettyThreads(32);
|
||||||
|
// 增加处理线程数
|
||||||
|
config.setThreads(16);
|
||||||
//2、根据Config创建出RedissonClient实例
|
//2、根据Config创建出RedissonClient实例
|
||||||
return Redisson.create(config);
|
return Redisson.create(config);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,8 @@ package com.suisung.mall.shop.number.service.impl;
|
|||||||
|
|
||||||
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.DateUtil;
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
|
import com.suisung.mall.common.annotation.DistributedLock;
|
||||||
|
import com.suisung.mall.common.exception.ApiException;
|
||||||
import com.suisung.mall.common.feignService.AccountService;
|
import com.suisung.mall.common.feignService.AccountService;
|
||||||
import com.suisung.mall.common.modules.base.ShopBaseProductSpec;
|
import com.suisung.mall.common.modules.base.ShopBaseProductSpec;
|
||||||
import com.suisung.mall.common.modules.library.LibraryProduct;
|
import com.suisung.mall.common.modules.library.LibraryProduct;
|
||||||
@ -19,6 +21,8 @@ import com.suisung.mall.shop.product.service.ShopProductBaseService;
|
|||||||
import com.suisung.mall.shop.product.service.ShopProductItemService;
|
import com.suisung.mall.shop.product.service.ShopProductItemService;
|
||||||
import com.suisung.mall.shop.product.service.ShopProductSpecItemService;
|
import com.suisung.mall.shop.product.service.ShopProductSpecItemService;
|
||||||
import com.suisung.mall.shop.sync.keymanage.RedisKey;
|
import com.suisung.mall.shop.sync.keymanage.RedisKey;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@ -29,6 +33,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.LongStream;
|
import java.util.stream.LongStream;
|
||||||
@ -71,6 +76,9 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
@Lazy
|
@Lazy
|
||||||
private ShopProductItemService shopProductItemService;
|
private ShopProductItemService shopProductItemService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RedissonClient redissonClient;
|
||||||
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
System.out.printf(IntStream.rangeClosed(1, 1).boxed().collect(Collectors.toList()).toString());
|
System.out.printf(IntStream.rangeClosed(1, 1).boxed().collect(Collectors.toList()).toString());
|
||||||
@ -85,8 +93,13 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
||||||
|
@DistributedLock(
|
||||||
|
key = "CREATENEXTSEQ_LOCK", // 锁的key
|
||||||
|
waitTime = 3, // 等待3秒
|
||||||
|
leaseTime = 10, // 锁持有10秒
|
||||||
|
errorMsg = "生成ID繁忙,请稍后重试" // 自定义错误消息
|
||||||
|
)
|
||||||
public synchronized String createNextSeq(String prefix) {
|
public synchronized String createNextSeq(String prefix) {
|
||||||
|
|
||||||
String ymd = DateUtil.format(new Date(), "yyyyMMdd");
|
String ymd = DateUtil.format(new Date(), "yyyyMMdd");
|
||||||
String id = String.format("%s_%s_", prefix, ymd);
|
String id = String.format("%s_%s_", prefix, ymd);
|
||||||
ShopNumberSeq shopNumberSeq = this.baseMapper.selectById(id);
|
ShopNumberSeq shopNumberSeq = this.baseMapper.selectById(id);
|
||||||
@ -148,25 +161,6 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 校验同步数据是否同步完成,就是两个库的主键是否一致
|
|
||||||
*
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private void checkPrimaryKey() {
|
|
||||||
boolean flag = shopNumberSeqMapper.findNumberFromShopBaseAndItem(new ShopNumberSeq()).size() > 1;
|
|
||||||
while (!flag) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(3600);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("checkPrimaryKey枷锁失败--" + e.getMessage());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
flag = shopNumberSeqMapper.findNumberFromShopBaseAndItem(new ShopNumberSeq()).size() > 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 批量获取id
|
* 批量获取id
|
||||||
*
|
*
|
||||||
@ -175,11 +169,18 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
||||||
|
@DistributedLock(
|
||||||
|
key = "'CREATENEXTSEQ_LOCK:'+#seqName", // 锁的key
|
||||||
|
waitTime = 3, // 等待3秒
|
||||||
|
leaseTime = 10, // 锁持有10秒
|
||||||
|
errorMsg = "生成ID繁忙,请稍后重试" // 自定义错误消息
|
||||||
|
)
|
||||||
public synchronized List<Long> batchCreateNextNo(String seqName, int batchSize) {
|
public synchronized List<Long> batchCreateNextNo(String seqName, int batchSize) {
|
||||||
if (batchSize <= 0) {
|
if (batchSize <= 0) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
long number = 0;
|
long number = 0;
|
||||||
|
|
||||||
if (null == redisService.get(String.format(CACHE_PREFIX, seqName))) {
|
if (null == redisService.get(String.format(CACHE_PREFIX, seqName))) {
|
||||||
syncPrimaryKey();
|
syncPrimaryKey();
|
||||||
// 1. 获取当前序列值
|
// 1. 获取当前序列值
|
||||||
@ -200,7 +201,6 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
number = numberCache;
|
number = numberCache;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 计算新值范围
|
// 2. 计算新值范围
|
||||||
long start = number;
|
long start = number;
|
||||||
long end = start + batchSize - 1;
|
long end = start + batchSize - 1;
|
||||||
@ -269,25 +269,47 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<Integer> getBatchSpecId(int batchSize) {
|
public List<Integer> getBatchSpecId(int batchSize) {
|
||||||
|
// 定义锁的key,这个key在所有服务实例中必须一致
|
||||||
|
String lockKey = "LOCK:" + RedisKey.STOREDATASPECID;
|
||||||
|
// 2. 获取分布式锁对象
|
||||||
|
RLock lock = redissonClient.getLock(lockKey);
|
||||||
|
boolean isLocked = false;
|
||||||
int start = 0;
|
int start = 0;
|
||||||
if (null != redisService.get(RedisKey.STOREDATASPECID)) {
|
try {
|
||||||
start = (Integer) redisService.get(RedisKey.STOREDATASPECID);
|
isLocked = lock.tryLock(2, 30, TimeUnit.SECONDS);
|
||||||
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
|
if (!isLocked) {
|
||||||
|
// 获取锁失败,可以根据业务逻辑进行重试或抛出异常
|
||||||
|
throw new RuntimeException("系统繁忙,请稍后再试");
|
||||||
|
}
|
||||||
|
log.info("成功获得锁:{}",lockKey);
|
||||||
|
if (null != redisService.get(RedisKey.STOREDATASPECID)) {
|
||||||
|
start = (Integer) redisService.get(RedisKey.STOREDATASPECID);
|
||||||
|
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
QueryWrapper<ShopBaseProductSpec> queryWrapper = new QueryWrapper<>();
|
||||||
|
queryWrapper.select("max(spec_id) as spec_id");
|
||||||
|
ShopBaseProductSpec shopBaseProductSpec = shopBaseProductSpecService.getOne(queryWrapper);
|
||||||
|
if (null != shopBaseProductSpec) {
|
||||||
|
start = shopBaseProductSpec.getSpec_id();
|
||||||
|
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
|
||||||
|
}
|
||||||
|
if (start == 0) {
|
||||||
|
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ApiException(e);
|
||||||
|
}finally {
|
||||||
|
// 5. 最终检查并释放锁,确保锁一定被释放
|
||||||
|
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
log.info("成功释放锁:{}",lockKey);
|
||||||
}
|
}
|
||||||
QueryWrapper<ShopBaseProductSpec> queryWrapper = new QueryWrapper<>();
|
|
||||||
queryWrapper.select("max(spec_id) as spec_id");
|
|
||||||
ShopBaseProductSpec shopBaseProductSpec = shopBaseProductSpecService.getOne(queryWrapper);
|
|
||||||
if (null != shopBaseProductSpec) {
|
|
||||||
start = shopBaseProductSpec.getSpec_id();
|
|
||||||
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
|
|
||||||
}
|
|
||||||
if (start == 0) {
|
|
||||||
redisService.set(RedisKey.STOREDATASPECID, start + batchSize);
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -297,25 +319,46 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<Integer> getBatchSpecItemId(int batchSize) {
|
public List<Integer> getBatchSpecItemId(int batchSize) {
|
||||||
|
// 定义锁的key,这个key在所有服务实例中必须一致
|
||||||
|
String lockKey = "LOCK:" + RedisKey.STOREDATASPECITEMID;
|
||||||
|
// 2. 获取分布式锁对象
|
||||||
|
RLock lock = redissonClient.getLock(lockKey);
|
||||||
|
boolean isLocked = false;
|
||||||
int start = 0;
|
int start = 0;
|
||||||
if (null != redisService.get(RedisKey.STOREDATASPECITEMID)) {
|
try {
|
||||||
start = (Integer) redisService.get(RedisKey.STOREDATASPECITEMID);
|
isLocked=lock.tryLock(2,30,TimeUnit.SECONDS);
|
||||||
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
|
if (!isLocked) {
|
||||||
|
// 获取锁失败,可以根据业务逻辑进行重试或抛出异常
|
||||||
|
throw new ApiException("系统繁忙,请稍后再试");
|
||||||
|
}
|
||||||
|
log.info("成功获得锁:{}",lockKey);
|
||||||
|
if (null != redisService.get(RedisKey.STOREDATASPECITEMID)) {
|
||||||
|
start = (Integer) redisService.get(RedisKey.STOREDATASPECITEMID);
|
||||||
|
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
QueryWrapper<ShopProductSpecItem> queryWrapper = new QueryWrapper<>();
|
||||||
|
queryWrapper.select("max(spec_item_id) as spec_item_id");
|
||||||
|
ShopProductSpecItem shopProductSpecItem = shopProductSpecItemService.getOne(queryWrapper);
|
||||||
|
if (null != shopProductSpecItem) {
|
||||||
|
start = shopProductSpecItem.getSpec_item_id();
|
||||||
|
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
|
||||||
|
}
|
||||||
|
if (start == 0) {
|
||||||
|
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new ApiException(e);
|
||||||
|
}finally {
|
||||||
|
// 5. 最终检查并释放锁,确保锁一定被释放
|
||||||
|
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
log.info("成功释放锁:{}",lockKey);
|
||||||
}
|
}
|
||||||
QueryWrapper<ShopProductSpecItem> queryWrapper = new QueryWrapper<>();
|
|
||||||
queryWrapper.select("max(spec_item_id) as spec_item_id");
|
|
||||||
ShopProductSpecItem shopProductSpecItem = shopProductSpecItemService.getOne(queryWrapper);
|
|
||||||
if (null != shopProductSpecItem) {
|
|
||||||
start = shopProductSpecItem.getSpec_item_id();
|
|
||||||
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
|
|
||||||
}
|
|
||||||
if (start == 0) {
|
|
||||||
redisService.set(RedisKey.STOREDATASPECITEMID, start + batchSize);
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -325,23 +368,46 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<Integer> getBatchUserAccountBaseId(int batchSize) {
|
public List<Integer> getBatchUserAccountBaseId(int batchSize) {
|
||||||
int start = 0;
|
// 定义锁的key,这个key在所有服务实例中必须一致
|
||||||
if (null != redisService.get(RedisKey.STOREDATACCOUNTBASEID)) {
|
String lockKey = "LOCK:" + RedisKey.STOREDATACCOUNTBASEID;
|
||||||
start = (Integer) redisService.get(RedisKey.STOREDATACCOUNTBASEID);
|
// 2. 获取分布式锁对象
|
||||||
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
|
RLock lock = redissonClient.getLock(lockKey);
|
||||||
|
boolean isLocked = false;
|
||||||
|
try {
|
||||||
|
isLocked = lock.tryLock(2, 10, TimeUnit.SECONDS);
|
||||||
|
if (!isLocked) {
|
||||||
|
// 获取锁失败,可以根据业务逻辑进行重试或抛出异常
|
||||||
|
throw new RuntimeException("系统繁忙,请稍后再试");
|
||||||
|
}
|
||||||
|
log.info("成功获得锁:{}",lockKey);
|
||||||
|
int start = 0;
|
||||||
|
if (null != redisService.get(RedisKey.STOREDATACCOUNTBASEID)) {
|
||||||
|
start = (Integer) redisService.get(RedisKey.STOREDATACCOUNTBASEID);
|
||||||
|
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
Integer maxId = accountService.getAccountMaxId();
|
||||||
|
if (null != maxId) {
|
||||||
|
start = maxId;
|
||||||
|
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
|
||||||
|
}
|
||||||
|
if (start == 0) {
|
||||||
|
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("getBatchUserAccountBaseId获取锁时被中断", e);
|
||||||
|
}finally {
|
||||||
|
// 5. 最终检查并释放锁,确保锁一定被释放
|
||||||
|
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
log.info("成功释放锁:{}",lockKey);
|
||||||
}
|
}
|
||||||
Integer maxId = accountService.getAccountMaxId();
|
|
||||||
if (null != maxId) {
|
|
||||||
start = maxId;
|
|
||||||
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
|
|
||||||
}
|
|
||||||
if (start == 0) {
|
|
||||||
redisService.set(RedisKey.STOREDATACCOUNTBASEID, start + batchSize);
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -352,25 +418,47 @@ public class ShopNumberSeqServiceImpl extends BaseServiceImpl<ShopNumberSeqMappe
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<Integer> getBatchLibraryProductId(int batchSize) {
|
public List<Integer> getBatchLibraryProductId(int batchSize) {
|
||||||
int start = 0;
|
// 定义锁的key,这个key在所有服务实例中必须一致
|
||||||
if (null != redisService.get(RedisKey.STOREDATALIBRARYID)) {
|
String lockKey = "LOCK:" + RedisKey.STOREDATALIBRARYID;
|
||||||
start = (Integer) redisService.get(RedisKey.STOREDATALIBRARYID);
|
// 2. 获取分布式锁对象
|
||||||
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
|
RLock lock = redissonClient.getLock(lockKey);
|
||||||
|
boolean isLocked = false;
|
||||||
|
try {
|
||||||
|
isLocked = lock.tryLock(2, 10, TimeUnit.SECONDS);
|
||||||
|
int start = 0;
|
||||||
|
if (!isLocked) {
|
||||||
|
// 获取锁失败,可以根据业务逻辑进行重试或抛出异常
|
||||||
|
throw new RuntimeException("系统繁忙,请稍后再试");
|
||||||
|
}
|
||||||
|
log.info("成功获得锁:{}",lockKey);
|
||||||
|
if (null != redisService.get(RedisKey.STOREDATALIBRARYID)) {
|
||||||
|
start = (Integer) redisService.get(RedisKey.STOREDATALIBRARYID);
|
||||||
|
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
QueryWrapper<LibraryProduct> queryWrapper = new QueryWrapper<>();
|
||||||
|
queryWrapper.select("max(id) as id");
|
||||||
|
LibraryProduct libraryProduct = libraryProductService.getOne(queryWrapper);
|
||||||
|
if (null != libraryProduct) {
|
||||||
|
start = Math.toIntExact(libraryProduct.getId());
|
||||||
|
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
|
||||||
|
}
|
||||||
|
if (start == 0) {
|
||||||
|
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
|
||||||
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
}
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}finally {
|
||||||
|
// 5. 最终检查并释放锁,确保锁一定被释放
|
||||||
|
if (lock != null && lock.isLocked() && lock.isHeldByCurrentThread()) {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
log.info("成功释放锁:{}",lockKey);
|
||||||
}
|
}
|
||||||
QueryWrapper<LibraryProduct> queryWrapper = new QueryWrapper<>();
|
|
||||||
queryWrapper.select("max(id) as id");
|
|
||||||
LibraryProduct libraryProduct = libraryProductService.getOne(queryWrapper);
|
|
||||||
if (null != libraryProduct) {
|
|
||||||
start = Math.toIntExact(libraryProduct.getId());
|
|
||||||
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
|
|
||||||
}
|
|
||||||
if (start == 0) {
|
|
||||||
redisService.set(RedisKey.STOREDATALIBRARYID, start + batchSize);
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
return IntStream.rangeClosed(start + 1, start + batchSize).boxed().collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -24,7 +24,6 @@ import com.google.gson.GsonBuilder;
|
|||||||
import com.qcloud.cos.model.COSObjectSummary;
|
import com.qcloud.cos.model.COSObjectSummary;
|
||||||
import com.suisung.mall.common.adapter.BigDecimalTypeAdapter;
|
import com.suisung.mall.common.adapter.BigDecimalTypeAdapter;
|
||||||
import com.suisung.mall.common.api.CommonResult;
|
import com.suisung.mall.common.api.CommonResult;
|
||||||
import com.suisung.mall.common.api.ResultCode;
|
|
||||||
import com.suisung.mall.common.api.StateCode;
|
import com.suisung.mall.common.api.StateCode;
|
||||||
import com.suisung.mall.common.enums.DicEnum;
|
import com.suisung.mall.common.enums.DicEnum;
|
||||||
import com.suisung.mall.common.exception.ApiException;
|
import com.suisung.mall.common.exception.ApiException;
|
||||||
@ -72,10 +71,11 @@ import com.suisung.mall.shop.sync.dto.ActiveShopInfo;
|
|||||||
import com.suisung.mall.shop.sync.dto.BrandModel;
|
import com.suisung.mall.shop.sync.dto.BrandModel;
|
||||||
import com.suisung.mall.shop.sync.keymanage.RedisKey;
|
import com.suisung.mall.shop.sync.keymanage.RedisKey;
|
||||||
import com.suisung.mall.shop.sync.service.*;
|
import com.suisung.mall.shop.sync.service.*;
|
||||||
import com.sun.corba.se.impl.orbutil.concurrent.Sync;
|
|
||||||
import io.seata.spring.annotation.GlobalTransactional;
|
import io.seata.spring.annotation.GlobalTransactional;
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.commons.collections4.ListUtils;
|
import org.apache.commons.collections4.ListUtils;
|
||||||
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RedissonClient;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -189,8 +189,11 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
|
|||||||
@Autowired
|
@Autowired
|
||||||
private SyncStoreDataService syncStoreDataService;
|
private SyncStoreDataService syncStoreDataService;
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisService redisService;
|
private RedissonClient redissonClient;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 批量保存商品的分类
|
* 批量保存商品的分类
|
||||||
*
|
*
|
||||||
@ -579,153 +582,170 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
|
|||||||
logger.info("没有商品数据");
|
logger.info("没有商品数据");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
|
|
||||||
// redisService.del(key);
|
// redisService.del(key);
|
||||||
if(!checkeckIsLock(storeId)){
|
RLock lock =null;
|
||||||
logger.info("批量同步商品等待时间异常结束");
|
String lockKey=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
|
||||||
return;
|
try {
|
||||||
}
|
redissonClient.getKeys().count();
|
||||||
setLock(storeId,"true");
|
}catch (Exception e){
|
||||||
List<String> newFolders = new ArrayList<>();
|
logger.info("锁异常");
|
||||||
folders.forEach(page -> {
|
return;
|
||||||
String newfolder = new FileUtils().getSyncTypeFlag(syncType, clientPath) + storeId + FileUtils.pathSeparator + page + FileUtils.pathSeparator;
|
|
||||||
newFolders.add(newfolder);
|
|
||||||
});
|
|
||||||
|
|
||||||
//upLoadZipToOss(newFolders.get(0));//上传文件到cos
|
|
||||||
// dowloadAndUnZip(newFolders.get(0));//读取cos文件回本地
|
|
||||||
syncPrimaryKey();
|
|
||||||
shopBaseProductCategoryService.clearCategoryCache(storeId);
|
|
||||||
// shopProductSpecItemService.clearExistItem(Integer.valueOf(storeId));
|
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(6);
|
|
||||||
List<Future<?>> futures = new ArrayList<>();
|
|
||||||
// 提交任务
|
|
||||||
AtomicInteger success = new AtomicInteger(0);
|
|
||||||
AtomicInteger fails = new AtomicInteger(0);
|
|
||||||
List<String> failFolders = new ArrayList<>();
|
|
||||||
List<String> failMessage = new ArrayList<>();
|
|
||||||
shopBaseProductCategoryService.getCategoryListByStoreId(storeId);
|
|
||||||
// getBrandMapByStoreId()
|
|
||||||
Map<String, Integer> brandMaps = productBrandService.getBrandMapByStoreId(storeId);
|
|
||||||
QueryWrapper<StoreDbConfig> storeDbConfigQueryWrapper = new QueryWrapper<>();
|
|
||||||
storeDbConfigQueryWrapper.eq("store_id", storeId);
|
|
||||||
StoreDbConfig storeDbConfig = storeDbConfigService.getOne(storeDbConfigQueryWrapper);
|
|
||||||
// Map shopProductSpecItemMap = shopProductSpecItemService.getExistItem(Integer.valueOf(storeId));//切割缓存
|
|
||||||
// Map productMappingMap = productMappingService.getProductMapping(Integer.valueOf(storeId));//切割缓存
|
|
||||||
// Map ShopBaseProductSpecMap = baseProductSpecService.getShopBaseProductSpecMap(Integer.valueOf(storeId));//切割商品缓存
|
|
||||||
// long seconds=System.currentTimeMillis();
|
|
||||||
// Date productSaleTime=Date.from(Instant.now().plusSeconds(seconds));
|
|
||||||
|
|
||||||
String fileIndex=folders.get(0);
|
|
||||||
String fileEndFix;
|
|
||||||
if (fileIndex.length()>1){
|
|
||||||
fileEndFix=fileIndex.split("_")[1];
|
|
||||||
} else {
|
|
||||||
fileEndFix = "";
|
|
||||||
}
|
}
|
||||||
|
lock = redissonClient.getLock(lockKey); // 锁名称通常与业务关联,如"ORDER_LOCK_123"
|
||||||
List<String> fileNames=new ArrayList<>();
|
//尝试三次刚好2分分钟
|
||||||
for(int i=0;i<newFolders.size();i++){
|
if(!checkeckIsLock(lock)){
|
||||||
String fileName = "goods_" + (i + 1)+"_"+fileEndFix+ ".txt";
|
logger.info("系统繁忙,无法获取锁");
|
||||||
fileNames.add(fileName);
|
return;
|
||||||
}
|
}
|
||||||
QueryWrapper<SyncStoreData> syncStoreDataQueryWrapper = new QueryWrapper<>();
|
try {
|
||||||
syncStoreDataQueryWrapper.eq("store_id", storeId);
|
List<String> newFolders = new ArrayList<>();
|
||||||
syncStoreDataQueryWrapper.eq("status", 0);
|
folders.forEach(page -> {
|
||||||
syncStoreDataQueryWrapper.in("file_name",fileNames);
|
String newfolder = new FileUtils().getSyncTypeFlag(syncType, clientPath) + storeId + FileUtils.pathSeparator + page + FileUtils.pathSeparator;
|
||||||
List<SyncStoreData> syncStoreDataList= syncStoreDataService.list(syncStoreDataQueryWrapper);
|
newFolders.add(newfolder);
|
||||||
Map<String,String> syncDataMap= syncStoreDataList.stream().collect(Collectors.toMap(SyncStoreData::getSyncStoreDataId, SyncStoreData::getContent));
|
});
|
||||||
for (int i = 0; i < newFolders.size(); i++) {
|
|
||||||
final int taskId = i;
|
//upLoadZipToOss(newFolders.get(0));//上传文件到cos
|
||||||
final String isNegativeAllowed = storeDbConfig.getIsNegativeAllowed();
|
// dowloadAndUnZip(newFolders.get(0));//读取cos文件回本地
|
||||||
final Integer automatic=storeDbConfig.getAutomatic();
|
syncPrimaryKey();
|
||||||
//String priorityMode = storeDbConfig.getPriorityMode();
|
shopBaseProductCategoryService.clearCategoryCache(storeId);
|
||||||
//boolean isUpdatePrice= ObjectUtil.isNotEmpty(storeDbConfig.getRefreshTime());//是否更新所有切割价格
|
// shopProductSpecItemService.clearExistItem(Integer.valueOf(storeId));
|
||||||
threadNum.incrementAndGet();
|
ExecutorService executor = Executors.newFixedThreadPool(6);
|
||||||
Map<String, String> finalSyncDataMap = syncDataMap;
|
List<Future<?>> futures = new ArrayList<>();
|
||||||
futures.add(executor.submit(() -> {
|
// 提交任务
|
||||||
int count = 0;//失败重试机制,当失败重试一次,再次失败则记录到数据库中
|
AtomicInteger success = new AtomicInteger(0);
|
||||||
while (true) {
|
AtomicInteger fails = new AtomicInteger(0);
|
||||||
count++;
|
List<String> failFolders = new ArrayList<>();
|
||||||
// String taskName = newFolders.get(taskId);
|
List<String> failMessage = new ArrayList<>();
|
||||||
String fileName = "goods_" + (taskId + 1) +"_"+fileEndFix+ ".txt";
|
shopBaseProductCategoryService.getCategoryListByStoreId(storeId);
|
||||||
String sycnDataId=DigestUtils.md5Hex(newFolders.get(taskId) + fileName);
|
// getBrandMapByStoreId()
|
||||||
JSONArray jsonArray = getSyncDataContent(finalSyncDataMap,sycnDataId);
|
Map<String, Integer> brandMaps = productBrandService.getBrandMapByStoreId(storeId);
|
||||||
try {
|
QueryWrapper<StoreDbConfig> storeDbConfigQueryWrapper = new QueryWrapper<>();
|
||||||
baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps,automatic);
|
storeDbConfigQueryWrapper.eq("store_id", storeId);
|
||||||
success.getAndIncrement();
|
StoreDbConfig storeDbConfig = storeDbConfigService.getOne(storeDbConfigQueryWrapper);
|
||||||
threadNum.decrementAndGet();
|
// Map shopProductSpecItemMap = shopProductSpecItemService.getExistItem(Integer.valueOf(storeId));//切割缓存
|
||||||
return "成功" + taskId;
|
// Map productMappingMap = productMappingService.getProductMapping(Integer.valueOf(storeId));//切割缓存
|
||||||
} catch (Exception e) {
|
// Map ShopBaseProductSpecMap = baseProductSpecService.getShopBaseProductSpecMap(Integer.valueOf(storeId));//切割商品缓存
|
||||||
if (count < 2) {
|
// long seconds=System.currentTimeMillis();
|
||||||
//Thread.sleep(100);
|
// Date productSaleTime=Date.from(Instant.now().plusSeconds(seconds));
|
||||||
continue;
|
|
||||||
|
String fileIndex=folders.get(0);
|
||||||
|
String fileEndFix;
|
||||||
|
if (fileIndex.length()>1){
|
||||||
|
fileEndFix=fileIndex.split("_")[1];
|
||||||
|
} else {
|
||||||
|
fileEndFix = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
List<String> fileNames=new ArrayList<>();
|
||||||
|
for(int i=0;i<newFolders.size();i++){
|
||||||
|
String fileName = "goods_" + (i + 1)+"_"+fileEndFix+ ".txt";
|
||||||
|
fileNames.add(fileName);
|
||||||
|
}
|
||||||
|
QueryWrapper<SyncStoreData> syncStoreDataQueryWrapper = new QueryWrapper<>();
|
||||||
|
syncStoreDataQueryWrapper.eq("store_id", storeId);
|
||||||
|
syncStoreDataQueryWrapper.eq("status", 0);
|
||||||
|
syncStoreDataQueryWrapper.in("file_name",fileNames);
|
||||||
|
List<SyncStoreData> syncStoreDataList= syncStoreDataService.list(syncStoreDataQueryWrapper);
|
||||||
|
Map<String,String> syncDataMap= syncStoreDataList.stream().collect(Collectors.toMap(SyncStoreData::getSyncStoreDataId, SyncStoreData::getContent));
|
||||||
|
for (int i = 0; i < newFolders.size(); i++) {
|
||||||
|
final int taskId = i;
|
||||||
|
final String isNegativeAllowed = storeDbConfig.getIsNegativeAllowed();
|
||||||
|
final Integer automatic=storeDbConfig.getAutomatic();
|
||||||
|
//String priorityMode = storeDbConfig.getPriorityMode();
|
||||||
|
//boolean isUpdatePrice= ObjectUtil.isNotEmpty(storeDbConfig.getRefreshTime());//是否更新所有切割价格
|
||||||
|
threadNum.incrementAndGet();
|
||||||
|
Map<String, String> finalSyncDataMap = syncDataMap;
|
||||||
|
futures.add(executor.submit(() -> {
|
||||||
|
int count = 0;//失败重试机制,当失败重试一次,再次失败则记录到数据库中
|
||||||
|
while (true) {
|
||||||
|
count++;
|
||||||
|
// String taskName = newFolders.get(taskId);
|
||||||
|
String fileName = "goods_" + (taskId + 1) +"_"+fileEndFix+ ".txt";
|
||||||
|
String sycnDataId=DigestUtils.md5Hex(newFolders.get(taskId) + fileName);
|
||||||
|
JSONArray jsonArray = getSyncDataContent(finalSyncDataMap,sycnDataId);
|
||||||
|
try {
|
||||||
|
baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps,automatic);
|
||||||
|
success.getAndIncrement();
|
||||||
|
threadNum.decrementAndGet();
|
||||||
|
return "成功" + taskId;
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (count < 2) {
|
||||||
|
//Thread.sleep(100);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
fails.getAndIncrement();
|
||||||
|
failFolders.add(newFolders.get(taskId) + fileName);
|
||||||
|
failMessage.add(taskId + "_" + e.getMessage());
|
||||||
|
return "失败" + newFolders.get(taskId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
fails.getAndIncrement();
|
}));
|
||||||
failFolders.add(newFolders.get(taskId) + fileName);
|
}
|
||||||
failMessage.add(taskId + "_" + e.getMessage());
|
// 等待所有任务完成
|
||||||
return "失败" + newFolders.get(taskId);
|
for (Future<?> future : futures) {
|
||||||
|
try {
|
||||||
|
System.out.println("任务结果: " + future.get());
|
||||||
|
} catch (Exception e) {
|
||||||
|
System.err.println("任务执行异常: " + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}));
|
executor.shutdown();
|
||||||
}
|
//记录到数据库
|
||||||
// 等待所有任务完成
|
syncPrimaryKey();
|
||||||
for (Future<?> future : futures) {
|
shopBaseProductCategoryService.clearCategoryCache(storeId);
|
||||||
try {
|
productBrandService.clearBrandMapByStoreId(storeId);
|
||||||
System.out.println("任务结果: " + future.get());
|
List<SyncFileLog> syncFileLogs = new ArrayList<>();
|
||||||
} catch (Exception e) {
|
for (int i = 0; i < failFolders.size(); i++) {
|
||||||
System.err.println("任务执行异常: " + e.getMessage());
|
String path = failFolders.get(i);
|
||||||
|
String taskId = failMessage.get(i).split("_")[0];
|
||||||
|
SyncFileLog syncFileLog = new SyncFileLog();
|
||||||
|
syncFileLog.setSyncType(syncType);
|
||||||
|
syncFileLog.setFileName(path.substring(path.lastIndexOf(FileUtils.pathSeparator) + 1));
|
||||||
|
syncFileLog.setSyncStatus(DicEnum.FAILED.getCode());
|
||||||
|
syncFileLog.setSyncTaskId(taskId);
|
||||||
|
syncFileLog.setSyncStoreId(storeId);
|
||||||
|
syncFileLog.setFilePath(path);
|
||||||
|
syncFileLog.setErrorMessage(failMessage.get(i));
|
||||||
|
syncFileLog.setSourceSystem(DicEnum.SOURCE_SYSTEM_TYPE_1005.getCode());
|
||||||
|
syncFileLog.setTargetSystem(DicEnum.SOURCE_SYSTEM_TYPE_SELF.getCode());
|
||||||
|
syncFileLogs.add(syncFileLog);
|
||||||
|
}
|
||||||
|
if (CollUtil.isNotEmpty(syncFileLogs)) {
|
||||||
|
syncFileLogService.saveBatch(syncFileLogs, syncFileLogs.size());
|
||||||
|
}
|
||||||
|
syncStoreDataList=syncStoreDataList.stream().peek(syncStoreData -> {
|
||||||
|
syncStoreData.setStatus("1");
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
if(!syncStoreDataList.isEmpty()){
|
||||||
|
syncStoreDataService.updateBatchById(syncStoreDataList,syncStoreDataList.size());//处理结束后更新已处理
|
||||||
|
}
|
||||||
|
//todo 定时清理文件,建议用服务器脚本
|
||||||
|
logger.info("执行成功{}个文件,失败{}个文件", success, fails);
|
||||||
|
logger.info("同步商品数据执行结束");
|
||||||
|
//更新当前的获取时间,用户客户端获取
|
||||||
|
// try {
|
||||||
|
//
|
||||||
|
// if (ObjectUtil.isNotEmpty(storeDbConfig)) {
|
||||||
|
// storeDbConfig.setRefreshTime(date);
|
||||||
|
// storeDbConfigService.saveOrUpdate(storeDbConfig);
|
||||||
|
// }
|
||||||
|
// } catch (RuntimeException e) {
|
||||||
|
// logger.error("同步时间失败" + e.getMessage());
|
||||||
|
// }
|
||||||
|
productMappingService.syncAllProductMapping(Integer.valueOf(storeId),DicEnum.YESORNO_0.getCode());
|
||||||
|
if(ObjectUtil.isNull(storeDbConfig.getRefreshTime())){
|
||||||
|
syncShopImages(Integer.valueOf(storeId));//同时商品图库数据
|
||||||
|
}
|
||||||
|
}catch (Exception e){
|
||||||
|
logger.info("执行失败:{}",e.getMessage());
|
||||||
|
}finally {
|
||||||
|
// 最终检查并释放锁,确保锁一定被释放
|
||||||
|
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
|
||||||
|
logger.info("商品同步锁释放");
|
||||||
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
executor.shutdown();
|
|
||||||
//记录到数据库
|
|
||||||
syncPrimaryKey();
|
|
||||||
shopBaseProductCategoryService.clearCategoryCache(storeId);
|
|
||||||
productBrandService.clearBrandMapByStoreId(storeId);
|
|
||||||
List<SyncFileLog> syncFileLogs = new ArrayList<>();
|
|
||||||
for (int i = 0; i < failFolders.size(); i++) {
|
|
||||||
String path = failFolders.get(i);
|
|
||||||
String taskId = failMessage.get(i).split("_")[0];
|
|
||||||
SyncFileLog syncFileLog = new SyncFileLog();
|
|
||||||
syncFileLog.setSyncType(syncType);
|
|
||||||
syncFileLog.setFileName(path.substring(path.lastIndexOf(FileUtils.pathSeparator) + 1));
|
|
||||||
syncFileLog.setSyncStatus(DicEnum.FAILED.getCode());
|
|
||||||
syncFileLog.setSyncTaskId(taskId);
|
|
||||||
syncFileLog.setSyncStoreId(storeId);
|
|
||||||
syncFileLog.setFilePath(path);
|
|
||||||
syncFileLog.setErrorMessage(failMessage.get(i));
|
|
||||||
syncFileLog.setSourceSystem(DicEnum.SOURCE_SYSTEM_TYPE_1005.getCode());
|
|
||||||
syncFileLog.setTargetSystem(DicEnum.SOURCE_SYSTEM_TYPE_SELF.getCode());
|
|
||||||
syncFileLogs.add(syncFileLog);
|
|
||||||
}
|
|
||||||
if (CollUtil.isNotEmpty(syncFileLogs)) {
|
|
||||||
syncFileLogService.saveBatch(syncFileLogs, syncFileLogs.size());
|
|
||||||
}
|
|
||||||
syncStoreDataList=syncStoreDataList.stream().peek(syncStoreData -> {
|
|
||||||
syncStoreData.setStatus("1");
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
if(!syncStoreDataList.isEmpty()){
|
|
||||||
syncStoreDataService.updateBatchById(syncStoreDataList,syncStoreDataList.size());//处理结束后更新已处理
|
|
||||||
}
|
|
||||||
//todo 定时清理文件,建议用服务器脚本
|
|
||||||
logger.info("执行成功{}个文件,失败{}个文件", success, fails);
|
|
||||||
logger.info("同步商品数据执行结束");
|
|
||||||
//更新当前的获取时间,用户客户端获取
|
|
||||||
// try {
|
|
||||||
//
|
|
||||||
// if (ObjectUtil.isNotEmpty(storeDbConfig)) {
|
|
||||||
// storeDbConfig.setRefreshTime(date);
|
|
||||||
// storeDbConfigService.saveOrUpdate(storeDbConfig);
|
|
||||||
// }
|
|
||||||
// } catch (RuntimeException e) {
|
|
||||||
// logger.error("同步时间失败" + e.getMessage());
|
|
||||||
// }
|
|
||||||
productMappingService.syncAllProductMapping(Integer.valueOf(storeId),DicEnum.YESORNO_0.getCode());
|
|
||||||
if(ObjectUtil.isNull(storeDbConfig.getRefreshTime())){
|
|
||||||
syncShopImages(Integer.valueOf(storeId));//同时商品图库数据
|
|
||||||
}
|
|
||||||
|
|
||||||
redisService.del(key);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -736,38 +756,19 @@ public class SyncThirdDataServiceImpl extends SyncBaseThirdSxAbstract implements
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 保证单线程执行同步数据
|
* 保证单线程执行同步数据
|
||||||
* @param storeId
|
* @param lock
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public synchronized boolean checkeckIsLock(String storeId){
|
public synchronized boolean checkeckIsLock(RLock lock){
|
||||||
logger.info("等待时间开始");
|
try {
|
||||||
String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
|
boolean resul=lock.tryLock(60, 120, TimeUnit.SECONDS);
|
||||||
Map isLockMap= (Map) redisService.get(key);
|
logger.info("等待获取锁成功");
|
||||||
if(isLockMap!=null){
|
return resul;
|
||||||
String result= (String) isLockMap.get("batchGoodIsLock");
|
}catch (InterruptedException e){
|
||||||
if("true".equals(result)){
|
return false;
|
||||||
try {
|
|
||||||
logger.info("进入等待时间");
|
|
||||||
Thread.sleep(1000 * 60 * 1);//五分钟
|
|
||||||
return checkeckIsLock(storeId);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.info("等待异常:{}",e.getMessage());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
logger.info("等待时间结束");
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void setLock(String storeId,String value){
|
|
||||||
String key=RedisKey.STOREDATAGOODBATCHLOCK+":"+storeId;
|
|
||||||
Map<String,String> map= new HashMap<>();
|
|
||||||
map.put("batchGoodIsLock",value);
|
|
||||||
redisService.set(key,map,600000);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResponseEntity<Resource> downloadToClient(String primaryKey, String clienVersionName) {
|
public ResponseEntity<Resource> downloadToClient(String primaryKey, String clienVersionName) {
|
||||||
logger.info("primaryKey:{}", primaryKey);
|
logger.info("primaryKey:{}", primaryKey);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user