java-mall/client/src/main/java/com/small/client/Schedule/DynamicTaskScheduler.java

112 lines
4.3 KiB
Java

package com.small.client.Schedule;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.small.client.dto.CommentModel;
import com.small.client.dto.DataBaseInfo;
import com.small.client.service.SxDataService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@Service
@Slf4j
public class DynamicTaskScheduler {
private final TaskScheduler taskScheduler;
private final SxDataService sxDataService;
private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
private boolean isRuning =false;
@Autowired
public DynamicTaskScheduler(TaskScheduler taskScheduler, SxDataService sxDataService) {
this.taskScheduler = taskScheduler;
this.sxDataService = sxDataService;
}
@PostConstruct
public void initTasks() {
refreshTasks();
// 每5分钟检查一次数据库更新
taskScheduler.scheduleAtFixedRate(this::refreshTasks, Duration.ofHours(1));
}
public void refreshTasks() {
if(!isRuning){
sxDataService.checkForUpdates();//检查app更新
}
CommentModel commentModel =sxDataService.getCommentModel();
DataBaseInfo enabledTask = sxDataService.getDataBaseInfo(commentModel);
if(enabledTask == null){
return;
}
if(ObjectUtil.isNotEmpty(enabledTask.getRefreshTime())){
commentModel.setSyncTime(DateUtil.formatDateTime(enabledTask.getRefreshTime()));
}
List<DataBaseInfo> enabledTasks=new ArrayList<>();
enabledTasks.add(enabledTask);
// 移除已禁用或删除的任务
scheduledTasks.keySet().removeIf(taskKey ->
enabledTasks.stream().noneMatch(task -> task.getDataBaseName().equals(taskKey)));
// 新增或更新任务
enabledTasks.forEach(task -> {
if (!scheduledTasks.containsKey(task.getDataBaseName()) ||
isCronModified(task)) {
cancelExistingTask(task.getDataBaseName());
scheduleTask(task,commentModel);
}
});
}
private void scheduleTask(DataBaseInfo task, CommentModel commentModel) {
ScheduledFuture<?> future = taskScheduler.schedule(
() -> executeTask(task.getDataBaseName(),commentModel),
new CronTrigger(task.getCronExpression())
);
scheduledTasks.put(task.getDataBaseName(), future);
}
/**
* 业务逻辑执行
* @param taskKey
*/
private void executeTask(String taskKey, CommentModel commentModel) {
isRuning=true;
log.info("execute task key:{}, commentModel:{}", taskKey, commentModel);
if(commentModel==null){
commentModel =sxDataService.getCommentModel();
}
if(StringUtils.isEmpty(commentModel.getSyncTime())){
commentModel =sxDataService.getCommentModel();
}
DataBaseInfo dataBaseInfo=sxDataService.getDataBaseInfo(commentModel);
sxDataService.syncStoreData(dataBaseInfo,commentModel);
sxDataService.SyncBranchList(dataBaseInfo,commentModel);
sxDataService.SyncCategory(dataBaseInfo,commentModel);
sxDataService.SyncGoods(dataBaseInfo,commentModel);//todo 暂时同步全部的商品如果后期修改,需要增加服务器的字段
sxDataService.SyncVipList(dataBaseInfo,commentModel);
isRuning=false;
}
private void cancelExistingTask(String taskKey) {
Optional.ofNullable(scheduledTasks.remove(taskKey))
.ifPresent(future -> future.cancel(false));
}
private boolean isCronModified(DataBaseInfo newTask) {
DataBaseInfo dataBaseInfo= sxDataService.getDataBaseInfo(sxDataService.getCommentModel());
return scheduledTasks.containsKey(newTask.getDataBaseName()) &&
!dataBaseInfo.getCronExpression().equals(newTask.getCronExpression());
}
}