优化 seata 的死锁释放配置
This commit is contained in:
parent
425aebf601
commit
6c29c0d6cc
@ -86,6 +86,12 @@ seata:
|
||||
service:
|
||||
vgroup-mapping:
|
||||
my_test_tx_group: default
|
||||
client:
|
||||
rm:
|
||||
lock:
|
||||
lock-timeout: 60000
|
||||
retry-times: 5
|
||||
retry-interval-milliseconds: 2000
|
||||
# 控制台日志信息
|
||||
logging:
|
||||
level:
|
||||
|
||||
@ -27,6 +27,7 @@ spring:
|
||||
aop-patterns: com.suisung.mall.common.service.impl.*,com.suisung.mall.core.web.service.impl.*,com.suisung.mall.pay.*
|
||||
remove-abandoned: true
|
||||
remove-abandoned-timeout: 1800
|
||||
transaction-query-timeout: 3000
|
||||
redis:
|
||||
host: @redis.host@ # Redis服务器地址
|
||||
database: @redis.database@ # Redis数据库索引(默认为0)
|
||||
@ -86,6 +87,12 @@ seata:
|
||||
service:
|
||||
vgroup-mapping:
|
||||
my_test_tx_group: default
|
||||
client:
|
||||
rm:
|
||||
lock:
|
||||
lock-timeout: 60000
|
||||
retry-times: 5
|
||||
retry-interval-milliseconds: 2000
|
||||
# 控制台日志信息
|
||||
logging:
|
||||
level:
|
||||
|
||||
@ -21,7 +21,9 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -29,215 +31,44 @@ import java.util.stream.Collectors;
|
||||
@Slf4j
|
||||
public class ShopBatchSubmitListener extends AnalysisEventListener<SxGoosModelExcel> {
|
||||
// 批处理阈值
|
||||
private static final int BATCH_SIZE = 500;
|
||||
// 数据缓存
|
||||
private List<SxGoosModelExcel> cachedDataList = new ArrayList<>(BATCH_SIZE);
|
||||
|
||||
private SyncThirdDataService syncThirdDataService;
|
||||
|
||||
private SyncStoreSpecsService syncStoreSpecsService;
|
||||
|
||||
private static final int BATCH_SIZE = 490;
|
||||
// 线程池配置
|
||||
private final ExecutorService executorService;
|
||||
|
||||
private List<Future<?>> futures ;
|
||||
private AtomicInteger success;
|
||||
private AtomicInteger fails;
|
||||
private AtomicInteger batchSize;
|
||||
// 数据缓存
|
||||
private final List<SxGoosModelExcel> cachedDataList = new ArrayList<>(BATCH_SIZE);
|
||||
private final SyncThirdDataService syncThirdDataService;
|
||||
private final SyncStoreSpecsService syncStoreSpecsService;
|
||||
private final List<Future<?>> futures;
|
||||
private final AtomicInteger success;
|
||||
private final AtomicInteger fails;
|
||||
private final AtomicInteger batchSize;
|
||||
private final ProductSpecManager productSpecManager;
|
||||
@Setter
|
||||
@Getter
|
||||
private String storeId;
|
||||
|
||||
private String storeId;
|
||||
@Setter
|
||||
@Getter
|
||||
private String isNegativeAllowed;
|
||||
|
||||
@Setter
|
||||
@Getter
|
||||
private Map<String,Integer> brandMaps;
|
||||
private Map<String, Integer> brandMaps;
|
||||
|
||||
private ProductSpecManager productSpecManager;
|
||||
|
||||
public ShopBatchSubmitListener(SyncThirdDataService syncThirdDataService,SyncStoreSpecsService syncStoreSpecsService) {
|
||||
public ShopBatchSubmitListener(SyncThirdDataService syncThirdDataService, SyncStoreSpecsService syncStoreSpecsService) {
|
||||
this.syncThirdDataService = syncThirdDataService;
|
||||
this.syncStoreSpecsService = syncStoreSpecsService;
|
||||
// 创建线程池(根据CPU核心数优化)
|
||||
int corePoolSize = Runtime.getRuntime().availableProcessors();
|
||||
log.info("核心线程数量{}" , corePoolSize);
|
||||
this.executorService = Executors.newFixedThreadPool(6);
|
||||
// log.info("核心线程数量{}", corePoolSize);
|
||||
this.executorService = Executors.newFixedThreadPool(corePoolSize);
|
||||
this.futures = new ArrayList<>();
|
||||
this.success = new AtomicInteger();
|
||||
this.fails = new AtomicInteger();
|
||||
this.batchSize= new AtomicInteger();
|
||||
this.productSpecManager=new ProductSpecManager();
|
||||
Map<String, List<String>> specsMap=new HashMap<>();
|
||||
this.batchSize = new AtomicInteger();
|
||||
this.productSpecManager = new ProductSpecManager();
|
||||
Map<String, List<String>> specsMap = new HashMap<>();
|
||||
productSpecManager.setSpecsMap(specsMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(SxGoosModelExcel sxGoosModelExcel, AnalysisContext analysisContext) {
|
||||
synchronized (cachedDataList) {
|
||||
cachedDataList.add(sxGoosModelExcel);
|
||||
// 达到批处理阈值时提交
|
||||
if (cachedDataList.size() >= BATCH_SIZE) {
|
||||
batchSize.incrementAndGet();
|
||||
submitBatch();
|
||||
// 提交后清空缓存
|
||||
cachedDataList.clear();
|
||||
productSpecManager.getSpecsMap().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAfterAllAnalysed(AnalysisContext context) {
|
||||
synchronized (cachedDataList) {
|
||||
// 处理最后一批不足BATCH_SIZE的数据
|
||||
if (!cachedDataList.isEmpty()) {
|
||||
batchSize.incrementAndGet();
|
||||
submitBatch();
|
||||
cachedDataList.clear();
|
||||
productSpecManager.getSpecsMap().clear();
|
||||
}
|
||||
}
|
||||
// 等待所有任务完成
|
||||
for (Future<?> future : futures) {
|
||||
try {
|
||||
log.info("任务结果:{}" ,future.get());
|
||||
} catch (Exception e) {
|
||||
log.info("任务执行异常: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
log.info("Excel解析完成,总处理条数: {}" , context.readSheetHolder().getTotal());
|
||||
log.info("成功数量:{};失败数量:{}",success.get(),fails.get());
|
||||
// 关闭线程池
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
private void submitBatch() {
|
||||
// 复制当前批次数据(避免异步修改)
|
||||
List<SxGoosModelExcel> batchCopy = new ArrayList<>(deduplicateById(cachedDataList));
|
||||
Map<String,List<String>> sepcsMap = new HashMap<>(productSpecManager.getSpecsMap());
|
||||
log.info("去重前:{};去重后:{}" , cachedDataList.size(), batchCopy.size());
|
||||
final int index = batchSize.get();
|
||||
futures.add(executorService.submit(()->{
|
||||
int i=0;
|
||||
while (true){
|
||||
i++;
|
||||
try {
|
||||
Gson gson=new Gson();
|
||||
String jsonShops=gson.toJson(batchCopy);
|
||||
JSONArray jsonArray=new JSONArray(jsonShops);
|
||||
syncThirdDataService.baseSaveOrUpdateGoodsBatch(jsonArray,storeId,isNegativeAllowed,brandMaps);
|
||||
log.info("已提交批次: {} 条", batchCopy.size());
|
||||
success.getAndIncrement();
|
||||
saveSnycStoreSpec(sepcsMap);
|
||||
return "完成批次:"+index;
|
||||
} catch (Exception e) {
|
||||
if(i<2){
|
||||
continue;
|
||||
}
|
||||
fails.getAndIncrement();
|
||||
return "失败批次:"+index+";失败原因:"+e.getMessage();
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据处理
|
||||
* @param list
|
||||
* @return
|
||||
*/
|
||||
private List<SxGoosModelExcel> deduplicateById(List<SxGoosModelExcel> list) {
|
||||
return list.stream()
|
||||
.peek(sxGoosModelExcel -> {
|
||||
if(StringUtils.isNotEmpty(sxGoosModelExcel.getShop_spec())){
|
||||
sxGoosModelExcel.setProduct_spec(Collections.singletonList(sxGoosModelExcel.getShop_spec()));
|
||||
}
|
||||
if(null==sxGoosModelExcel.getUnit()){
|
||||
sxGoosModelExcel.setUnit("");
|
||||
}
|
||||
if(ObjectUtil.isEmpty(sxGoosModelExcel.getBuy_limit())){
|
||||
sxGoosModelExcel.setBuy_limit(0);
|
||||
}
|
||||
if(StringUtils.isEmpty(sxGoosModelExcel.getBrand_name())){
|
||||
sxGoosModelExcel.setBrand_name("其它品牌");
|
||||
}
|
||||
if(ObjectUtil.isEmpty(sxGoosModelExcel.getStock())){
|
||||
sxGoosModelExcel.setStock(BigDecimal.ZERO);
|
||||
}
|
||||
sxGoosModelExcel.setProduct_number(sxGoosModelExcel.getProduct_barcode());
|
||||
sxGoosModelExcel.setOriginal_price(sxGoosModelExcel.getRetail_price());
|
||||
|
||||
if(StringUtils.isNotEmpty(sxGoosModelExcel.getShop_specs())){
|
||||
List<ProductSpecManager.SpecItem> specItems= productSpecManager.parseSpecString(sxGoosModelExcel.getShop_specs());
|
||||
sxGoosModelExcel.setJsonSpecs(specItems.toString());
|
||||
productSpecManager.addProductSpec(sxGoosModelExcel.getProduct_barcode(),specItems,sxGoosModelExcel.getRetail_price(),sxGoosModelExcel.getStock());
|
||||
}
|
||||
})
|
||||
.filter(CommonUtil.distinctByKey(SxGoosModelExcel::getProduct_number))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存多规格数据
|
||||
*/
|
||||
private void saveSnycStoreSpec(Map<String,List<String>> listMap){
|
||||
if(listMap==null){
|
||||
return;
|
||||
}
|
||||
//List<SyncStoreSpecs> syncStoreSpecs=new ArrayList<>();
|
||||
List<SyncStoreSpecs> addSyncStoreSpecs=new ArrayList<>();
|
||||
List<SyncStoreSpecs> updateSyncStoreSpecs=new ArrayList<>();
|
||||
Map<String,Long> existId= getExistProductId(listMap);
|
||||
if(existId==null){
|
||||
return;
|
||||
}
|
||||
listMap.forEach((k,v)->{
|
||||
SyncStoreSpecs syncStoreSpec=new SyncStoreSpecs();
|
||||
syncStoreSpec.setProductNumber(k);
|
||||
syncStoreSpec.setJsonSpecs(v.toString());
|
||||
syncStoreSpec.setIsDeal(DicEnum.YESORNO_0.getCode());
|
||||
syncStoreSpec.setStoreId(Integer.valueOf(storeId));
|
||||
if(existId.containsKey(k)){
|
||||
syncStoreSpec.setSycnSpecsId(existId.get(k));
|
||||
updateSyncStoreSpecs.add(syncStoreSpec);
|
||||
}else {
|
||||
addSyncStoreSpecs.add(syncStoreSpec);
|
||||
}
|
||||
});
|
||||
if(!addSyncStoreSpecs.isEmpty()){
|
||||
syncStoreSpecsService.saveBatch(addSyncStoreSpecs,addSyncStoreSpecs.size());
|
||||
}
|
||||
if(!updateSyncStoreSpecs.isEmpty()){
|
||||
syncStoreSpecsService.updateBatchById(updateSyncStoreSpecs,updateSyncStoreSpecs.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找存量数据
|
||||
* @param listMap
|
||||
* @return
|
||||
*/
|
||||
private Map<String,Long> getExistProductId(Map<String,List<String>> listMap){
|
||||
QueryWrapper<SyncStoreSpecs> queryWrapper = new QueryWrapper<>();
|
||||
listMap.forEach((k,v)->{
|
||||
queryWrapper.or(q->q.eq("store_id",storeId).eq("product_number",k));
|
||||
});
|
||||
|
||||
List<SyncStoreSpecs> syncStoreSpecs = syncStoreSpecsService.list(queryWrapper);
|
||||
if(syncStoreSpecs==null){
|
||||
return null;
|
||||
}
|
||||
Map<String,Long> resultMap=new HashMap<>();
|
||||
syncStoreSpecs.forEach(m->{
|
||||
resultMap.put(String.valueOf(m.getProductNumber()),m.getSycnSpecsId());
|
||||
});
|
||||
return resultMap;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
SxGoosModelExcel sxGoosModelExcel = new SxGoosModelExcel();
|
||||
sxGoosModelExcel.setProduct_barcode("1");
|
||||
@ -269,21 +100,187 @@ public class ShopBatchSubmitListener extends AnalysisEventListener<SxGoosModelEx
|
||||
sxGoosModelExcelList.add(sxGoosModelExcel);
|
||||
sxGoosModelExcelList.add(sxGoosModelExcel1);
|
||||
sxGoosModelExcelList.add(sxGoosModelExcel3);
|
||||
ProductSpecManager productSpecManager=new ProductSpecManager();
|
||||
Map<String, List<String>> specsMap=new HashMap<>();
|
||||
ProductSpecManager productSpecManager = new ProductSpecManager();
|
||||
Map<String, List<String>> specsMap = new HashMap<>();
|
||||
productSpecManager.setSpecsMap(specsMap);
|
||||
|
||||
sxGoosModelExcelList=sxGoosModelExcelList.stream().peek(gm -> {
|
||||
if(StringUtils.isNotEmpty(gm.getJsonSpecs())){
|
||||
List<ProductSpecManager.SpecItem> specItems= productSpecManager.parseSpecString(gm.getJsonSpecs());
|
||||
productSpecManager.addProductSpec(gm.getProduct_barcode(),specItems,gm.getRetail_price(),gm.getStock());
|
||||
sxGoosModelExcelList = sxGoosModelExcelList.stream().peek(gm -> {
|
||||
if (StringUtils.isNotEmpty(gm.getJsonSpecs())) {
|
||||
List<ProductSpecManager.SpecItem> specItems = productSpecManager.parseSpecString(gm.getJsonSpecs());
|
||||
productSpecManager.addProductSpec(gm.getProduct_barcode(), specItems, gm.getRetail_price(), gm.getStock());
|
||||
}
|
||||
}).filter(CommonUtil.distinctByKey(SxGoosModelExcel::getProduct_barcode))
|
||||
}).filter(CommonUtil.distinctByKey(SxGoosModelExcel::getProduct_barcode))
|
||||
.collect(Collectors.toList());
|
||||
Gson goson=new Gson();
|
||||
log.info("规格:{}",productSpecManager.getSepcMapValue("1"));
|
||||
log.info("规格数量:{}",productSpecManager.getSepcMapValue("1").size());
|
||||
Gson goson = new Gson();
|
||||
log.info("规格:{}", productSpecManager.getSepcMapValue("1"));
|
||||
log.info("规格数量:{}", productSpecManager.getSepcMapValue("1").size());
|
||||
log.info(String.valueOf(sxGoosModelExcelList.size()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invoke(SxGoosModelExcel sxGoosModelExcel, AnalysisContext analysisContext) {
|
||||
synchronized (cachedDataList) {
|
||||
cachedDataList.add(sxGoosModelExcel);
|
||||
// 达到批处理阈值时提交
|
||||
if (cachedDataList.size() >= BATCH_SIZE) {
|
||||
batchSize.incrementAndGet();
|
||||
submitBatch();
|
||||
// 提交后清空缓存
|
||||
cachedDataList.clear();
|
||||
productSpecManager.getSpecsMap().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doAfterAllAnalysed(AnalysisContext context) {
|
||||
synchronized (cachedDataList) {
|
||||
// 处理最后一批不足BATCH_SIZE的数据
|
||||
if (!cachedDataList.isEmpty()) {
|
||||
batchSize.incrementAndGet();
|
||||
submitBatch();
|
||||
cachedDataList.clear();
|
||||
productSpecManager.getSpecsMap().clear();
|
||||
}
|
||||
}
|
||||
// 等待所有任务完成
|
||||
for (Future<?> future : futures) {
|
||||
try {
|
||||
log.info("任务结果:{}", future.get());
|
||||
} catch (Exception e) {
|
||||
log.info("任务执行异常: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
log.info("Excel解析完成,总处理条数: {}", context.readSheetHolder().getTotal());
|
||||
log.info("成功数量:{};失败数量:{}", success.get(), fails.get());
|
||||
// 关闭线程池
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
private void submitBatch() {
|
||||
// 复制当前批次数据(避免异步修改)
|
||||
List<SxGoosModelExcel> batchCopy = new ArrayList<>(deduplicateById(cachedDataList));
|
||||
Map<String, List<String>> sepcsMap = new HashMap<>(productSpecManager.getSpecsMap());
|
||||
log.info("去重前:{};去重后:{}", cachedDataList.size(), batchCopy.size());
|
||||
final int index = batchSize.get();
|
||||
futures.add(executorService.submit(() -> {
|
||||
int i = 0;
|
||||
while (true) {
|
||||
i++;
|
||||
try {
|
||||
Gson gson = new Gson();
|
||||
String jsonShops = gson.toJson(batchCopy);
|
||||
JSONArray jsonArray = new JSONArray(jsonShops);
|
||||
syncThirdDataService.baseSaveOrUpdateGoodsBatch(jsonArray, storeId, isNegativeAllowed, brandMaps);
|
||||
log.info("已提交批次: {} 条", batchCopy.size());
|
||||
success.getAndIncrement();
|
||||
saveSnycStoreSpec(sepcsMap);
|
||||
return "完成批次:" + index;
|
||||
} catch (Exception e) {
|
||||
if (i < 2) {
|
||||
continue;
|
||||
}
|
||||
fails.getAndIncrement();
|
||||
return "失败批次:" + index + ";失败原因:" + e.getMessage();
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* 数据处理
|
||||
*
|
||||
* @param list
|
||||
* @return
|
||||
*/
|
||||
private List<SxGoosModelExcel> deduplicateById(List<SxGoosModelExcel> list) {
|
||||
return list.stream()
|
||||
.peek(sxGoosModelExcel -> {
|
||||
if (StringUtils.isNotEmpty(sxGoosModelExcel.getShop_spec())) {
|
||||
sxGoosModelExcel.setProduct_spec(Collections.singletonList(sxGoosModelExcel.getShop_spec()));
|
||||
}
|
||||
if (null == sxGoosModelExcel.getUnit()) {
|
||||
sxGoosModelExcel.setUnit("");
|
||||
}
|
||||
if (ObjectUtil.isEmpty(sxGoosModelExcel.getBuy_limit())) {
|
||||
sxGoosModelExcel.setBuy_limit(0);
|
||||
}
|
||||
if (StringUtils.isEmpty(sxGoosModelExcel.getBrand_name())) {
|
||||
sxGoosModelExcel.setBrand_name("其它品牌");
|
||||
}
|
||||
if (ObjectUtil.isEmpty(sxGoosModelExcel.getStock())) {
|
||||
sxGoosModelExcel.setStock(BigDecimal.ZERO);
|
||||
}
|
||||
sxGoosModelExcel.setProduct_number(sxGoosModelExcel.getProduct_barcode());
|
||||
sxGoosModelExcel.setOriginal_price(sxGoosModelExcel.getRetail_price());
|
||||
|
||||
if (StringUtils.isNotEmpty(sxGoosModelExcel.getShop_specs())) {
|
||||
List<ProductSpecManager.SpecItem> specItems = productSpecManager.parseSpecString(sxGoosModelExcel.getShop_specs());
|
||||
sxGoosModelExcel.setJsonSpecs(specItems.toString());
|
||||
productSpecManager.addProductSpec(sxGoosModelExcel.getProduct_barcode(), specItems, sxGoosModelExcel.getRetail_price(), sxGoosModelExcel.getStock());
|
||||
}
|
||||
})
|
||||
.filter(CommonUtil.distinctByKey(SxGoosModelExcel::getProduct_number))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存多规格数据
|
||||
*/
|
||||
private void saveSnycStoreSpec(Map<String, List<String>> listMap) {
|
||||
if (listMap == null) {
|
||||
return;
|
||||
}
|
||||
//List<SyncStoreSpecs> syncStoreSpecs=new ArrayList<>();
|
||||
List<SyncStoreSpecs> addSyncStoreSpecs = new ArrayList<>();
|
||||
List<SyncStoreSpecs> updateSyncStoreSpecs = new ArrayList<>();
|
||||
Map<String, Long> existId = getExistProductId(listMap);
|
||||
if (existId == null) {
|
||||
return;
|
||||
}
|
||||
listMap.forEach((k, v) -> {
|
||||
SyncStoreSpecs syncStoreSpec = new SyncStoreSpecs();
|
||||
syncStoreSpec.setProductNumber(k);
|
||||
syncStoreSpec.setJsonSpecs(v.toString());
|
||||
syncStoreSpec.setIsDeal(DicEnum.YESORNO_0.getCode());
|
||||
syncStoreSpec.setStoreId(Integer.valueOf(storeId));
|
||||
if (existId.containsKey(k)) {
|
||||
syncStoreSpec.setSycnSpecsId(existId.get(k));
|
||||
updateSyncStoreSpecs.add(syncStoreSpec);
|
||||
} else {
|
||||
addSyncStoreSpecs.add(syncStoreSpec);
|
||||
}
|
||||
});
|
||||
if (!addSyncStoreSpecs.isEmpty()) {
|
||||
syncStoreSpecsService.saveBatch(addSyncStoreSpecs, addSyncStoreSpecs.size());
|
||||
}
|
||||
if (!updateSyncStoreSpecs.isEmpty()) {
|
||||
syncStoreSpecsService.updateBatchById(updateSyncStoreSpecs, updateSyncStoreSpecs.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 查找存量数据
|
||||
*
|
||||
* @param listMap
|
||||
* @return
|
||||
*/
|
||||
private Map<String, Long> getExistProductId(Map<String, List<String>> listMap) {
|
||||
QueryWrapper<SyncStoreSpecs> queryWrapper = new QueryWrapper<>();
|
||||
listMap.forEach((k, v) -> {
|
||||
queryWrapper.or(q -> q.eq("store_id", storeId).eq("product_number", k));
|
||||
});
|
||||
|
||||
List<SyncStoreSpecs> syncStoreSpecs = syncStoreSpecsService.list(queryWrapper);
|
||||
if (syncStoreSpecs == null) {
|
||||
return null;
|
||||
}
|
||||
Map<String, Long> resultMap = new HashMap<>();
|
||||
syncStoreSpecs.forEach(m -> {
|
||||
resultMap.put(String.valueOf(m.getProductNumber()), m.getSycnSpecsId());
|
||||
});
|
||||
return resultMap;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -31,11 +31,14 @@ public class ThreadPoolManager {
|
||||
private volatile boolean isShutdown = false;
|
||||
|
||||
|
||||
|
||||
public ThreadPoolManager(int corePoolSize, int maxPoolSize, String successFile, String failureFile) {
|
||||
this.successFile = successFile;
|
||||
this.failureFile = failureFile;
|
||||
|
||||
// 动态分配
|
||||
corePoolSize = Math.max(4, Runtime.getRuntime().availableProcessors() * 2);
|
||||
maxPoolSize = corePoolSize * 2;
|
||||
|
||||
this.executor = new ThreadPoolExecutor(
|
||||
corePoolSize,
|
||||
maxPoolSize,
|
||||
@ -54,41 +57,6 @@ public class ThreadPoolManager {
|
||||
Runtime.getRuntime().availableProcessors() * 2,
|
||||
successFile, failureFile);
|
||||
}
|
||||
// 自定义线程工厂
|
||||
private static class CustomThreadFactory implements ThreadFactory {
|
||||
private final AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = new Thread(r, "pool-thread-" + counter.incrementAndGet());
|
||||
thread.setPriority(Thread.NORM_PRIORITY);
|
||||
thread.setDaemon(false);
|
||||
return thread;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 自定义拒绝策略
|
||||
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
if (!isShutdown) {
|
||||
// 1. 尝试直接执行(调用者线程)
|
||||
try {
|
||||
System.out.println("线程池饱和,由调用线程直接执行");
|
||||
r.run();
|
||||
} catch (Exception e) {
|
||||
System.err.println("调用者执行任务失败: " + e.getMessage());
|
||||
}
|
||||
|
||||
// 2. 记录指标
|
||||
failureCount.incrementAndGet();
|
||||
} else {
|
||||
System.err.println("线程池已关闭,拒绝新任务");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 消费待处理任务的线程
|
||||
private void startPendingTaskConsumer() {
|
||||
@ -110,7 +78,6 @@ public class ThreadPoolManager {
|
||||
consumerThread.start();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 提交文件夹处理任务(带重试机制)
|
||||
*/
|
||||
@ -136,7 +103,7 @@ public class ThreadPoolManager {
|
||||
System.out.printf("[%s] 任务处理失败,第%d次重试...%n",
|
||||
taskName, retryCount);
|
||||
try {
|
||||
Thread.sleep(1000 * retryCount); // 指数退避
|
||||
Thread.sleep(1000L * retryCount); // 指数退避
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
@ -155,9 +122,10 @@ public class ThreadPoolManager {
|
||||
|
||||
/**
|
||||
* 批量提交文件夹处理任务
|
||||
* @param taskName 任务名称(用于日志记录)
|
||||
*
|
||||
* @param taskName 任务名称(用于日志记录)
|
||||
* @param folderPaths 要处理的文件夹路径列表
|
||||
* @param callback 处理完成的回调接口
|
||||
* @param callback 处理完成的回调接口
|
||||
* @return CountDownLatch 用于等待所有任务完成
|
||||
*/
|
||||
// public void submitBatchFolderTasks(String taskName, List<String> folderPaths,
|
||||
@ -287,6 +255,7 @@ public class ThreadPoolManager {
|
||||
monitorThread.setDaemon(true);
|
||||
monitorThread.start();
|
||||
}
|
||||
|
||||
private void printPoolStatus() {
|
||||
System.out.println("\n=== 线程池状态 ===");
|
||||
System.out.println("活跃线程: " + executor.getActiveCount());
|
||||
@ -326,7 +295,7 @@ public class ThreadPoolManager {
|
||||
|
||||
// 3. 尝试恢复 - 中断死锁线程
|
||||
if (executor instanceof ThreadPoolExecutor) {
|
||||
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
|
||||
ThreadPoolExecutor tpe = executor;
|
||||
BlockingQueue<Runnable> queue = tpe.getQueue();
|
||||
|
||||
System.err.println("尝试中断死锁线程并清空队列(" + queue.size() + "个任务)");
|
||||
@ -374,6 +343,7 @@ public class ThreadPoolManager {
|
||||
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 5);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭线程池(优雅关闭)
|
||||
*/
|
||||
@ -411,4 +381,38 @@ public class ThreadPoolManager {
|
||||
public interface JsonProcessCallback {
|
||||
void process(JSONArray jsonArray, String taskName, String fileName);
|
||||
}
|
||||
|
||||
// 自定义线程工厂
|
||||
private static class CustomThreadFactory implements ThreadFactory {
|
||||
private final AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread thread = new Thread(r, "pool-thread-" + counter.incrementAndGet());
|
||||
thread.setPriority(Thread.NORM_PRIORITY);
|
||||
thread.setDaemon(false);
|
||||
return thread;
|
||||
}
|
||||
}
|
||||
|
||||
// 自定义拒绝策略
|
||||
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
if (!isShutdown) {
|
||||
// 1. 尝试直接执行(调用者线程)
|
||||
try {
|
||||
System.out.println("线程池饱和,由调用线程直接执行");
|
||||
r.run();
|
||||
} catch (Exception e) {
|
||||
System.err.println("调用者执行任务失败: " + e.getMessage());
|
||||
}
|
||||
|
||||
// 2. 记录指标
|
||||
failureCount.incrementAndGet();
|
||||
} else {
|
||||
System.err.println("线程池已关闭,拒绝新任务");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ spring:
|
||||
aop-patterns: com.suisung.mall.common.service.impl.*,com.suisung.mall.core.web.service.impl.*,com.suisung.mall.shop.*
|
||||
remove-abandoned: true
|
||||
remove-abandoned-timeout: 1800
|
||||
transaction-query-timeout: 3000
|
||||
redis:
|
||||
host: @redis.host@ # Redis服务器地址
|
||||
database: @redis.database@ # Redis数据库索引(默认为0)
|
||||
@ -100,6 +101,12 @@ seata:
|
||||
service:
|
||||
vgroup-mapping:
|
||||
my_test_tx_group: default
|
||||
client:
|
||||
rm:
|
||||
lock:
|
||||
lock-timeout: 60000
|
||||
retry-times: 5
|
||||
retry-interval-milliseconds: 2000
|
||||
# 控制台日志信息
|
||||
logging:
|
||||
level:
|
||||
|
||||
6
pom.xml
6
pom.xml
@ -317,6 +317,7 @@
|
||||
<!-- seata配置 -->
|
||||
<seata.group>SEATA_GROUP</seata.group>
|
||||
<seata.tx-service-group>my_test_tx_group</seata.tx-service-group>
|
||||
<seata.server.address>114.132.210.208:8091</seata.server.address>
|
||||
<!-- sentinel配置 -->
|
||||
<sentinel.transport.dashboard>114.132.210.208:8718</sentinel.transport.dashboard>
|
||||
<!-- mysql配置 -->
|
||||
@ -367,6 +368,7 @@
|
||||
<!-- seata配置 -->
|
||||
<seata.group>SEATA_GROUP</seata.group>
|
||||
<seata.tx-service-group>my_test_tx_group</seata.tx-service-group>
|
||||
<seata.server.address>114.132.210.208:8091</seata.server.address>
|
||||
<!-- sentinel配置 -->
|
||||
<sentinel.transport.dashboard>114.132.210.208:8718</sentinel.transport.dashboard>
|
||||
<!-- mysql配置 -->
|
||||
@ -417,6 +419,7 @@
|
||||
<!-- seata配置 -->
|
||||
<seata.group>SEATA_GROUP</seata.group>
|
||||
<seata.tx-service-group>my_test_tx_group</seata.tx-service-group>
|
||||
<seata.server.address>114.132.210.208:8091</seata.server.address>
|
||||
<!-- sentinel配置 -->
|
||||
<sentinel.transport.dashboard>10.1.8.3:8718</sentinel.transport.dashboard>
|
||||
<!-- mysql配置 -->
|
||||
@ -467,6 +470,7 @@
|
||||
<!-- seata配置 -->
|
||||
<seata.group>SEATA_GROUP</seata.group>
|
||||
<seata.tx-service-group>my_test_tx_group</seata.tx-service-group>
|
||||
<!-- <seata.server.address>172.16.0.11:8091</seata.server.address>-->
|
||||
<!-- sentinel配置 -->
|
||||
<sentinel.transport.dashboard>172.16.0.11:8718</sentinel.transport.dashboard>
|
||||
<!-- mysql配置 -->
|
||||
@ -550,7 +554,7 @@
|
||||
<!-- <baseImage>openjdk:8-jre-alpine</baseImage>-->
|
||||
<baseImage>openjdk:8-jre</baseImage>
|
||||
<!--定义容器启动命令,注意不能换行-->
|
||||
<entryPoint>["java", "-jar", "-Xms256m", "-Xmx512m", "-XX:MetaspaceSize=256m", "-XX:MaxMetaspaceSize=256m", "-XX:+UseContainerSupport", "-XX:MaxRAMPercentage=60.0", "-XX:+UseSerialGC", "-XX:MinHeapFreeRatio=40", "-XX:MaxHeapFreeRatio=60", "-Dspring.profiles.active=${spring.profile}", "-Duser.timezone=Asia/Shanghai", "/${project.build.finalName}.jar"]
|
||||
<entryPoint>["java", "-jar", "-Xms256m", "-Xmx512m", "-XX:MetaspaceSize=256m", "-XX:MaxMetaspaceSize=256m", "-XX:+UseContainerSupport", "-XX:MaxRAMPercentage=60.0", "-XX:+UseSerialGC", "-XX:MinHeapFreeRatio=40", "-XX:MaxHeapFreeRatio=60", "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps", "-Xloggc:./gc.log", "-XX:+UseGCLogFileRotation", "-XX:NumberOfGCLogFiles=5", "-XX:GCLogFileSize=10M", "-Dspring.profiles.active=${spring.profile}", "-Duser.timezone=Asia/Shanghai", "/${project.build.finalName}.jar"]
|
||||
</entryPoint>
|
||||
|
||||
<!--推送镜像仓库校验安全证书,无安全证书无法推送-->
|
||||
|
||||
Loading…
Reference in New Issue
Block a user