fix:执行计划

问题:在有些特殊时间节点下,由于定时任务更新批次信息和批量重试的时候存在同时操作的问题,导致队列会被删除
解决:修改锁获取的先后逻辑。对于批次信息、状态处理的逻辑都需要获取锁进行操作,降低由于时间差导致的数据异常。
hz_1122
darkmanlee 2025-01-17 11:06:27 +08:00
parent 5d95eda546
commit 8beaa4b87a
5 changed files with 469 additions and 421 deletions

View File

@ -5,20 +5,26 @@
package net.northking.cctp.executePlan.api;
import cn.gjing.tools.auth.annotation.RequiredPermissions;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import net.northking.cctp.common.exception.PlatformRuntimeException;
import net.northking.cctp.common.http.ResultWrapper;
import net.northking.cctp.common.security.authentication.NKSecurityContext;
import net.northking.cctp.executePlan.api.service.AtuPlanBatchApiService;
import net.northking.cctp.executePlan.api.service.AtuPlanInfoApiService;
import net.northking.cctp.executePlan.api.service.AtuPlanTaskApiService;
import net.northking.cctp.executePlan.constants.PlanConstant;
import net.northking.cctp.executePlan.constants.RedisConstant;
import net.northking.cctp.executePlan.db.entity.AtuPlanBatch;
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
import net.northking.cctp.executePlan.dto.planBatch.BatchRetryDto;
import net.northking.cctp.executePlan.dto.planInfo.AtuPlanInfoDetailDto;
import net.northking.cctp.executePlan.dto.planInfo.AtuPlanRunDto;
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskDetailDto;
import net.northking.cctp.executePlan.dto.planTask.AtuTaskSendBugDto;
import net.northking.cctp.executePlan.exception.ExecPlanError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -27,7 +33,9 @@ import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
/**
* => Restful HTTP/HTTPS
@ -205,7 +213,28 @@ public class AtuPlanInfoRestfulCtrl {
logger.info("start taskRetry: retryDto =======> " + JSONUtil.toJsonStr(retryDto));
ResultWrapper<Boolean> wrapper = new ResultWrapper<>();
Boolean result = taskApiService.taskRetry(retryDto);
Boolean result = false;
if (retryDto.getTaskIdList() != null && !retryDto.getTaskIdList().isEmpty()) {
logger.debug("查询需要重试的任务信息");
List<AtuPlanTask> planTaskList = taskApiService.findByPrimaryKeys(retryDto.getTaskIdList());
logger.debug("查询需要重试的任务信息数量:{}", planTaskList.size());
if (CollUtil.isEmpty(planTaskList)) {
throw new PlatformRuntimeException(ExecPlanError.TASK_INFO_IS_NULL);
}
AtuPlanTask firstFailTask = planTaskList.get(0);
AtuPlanBatch planBatch = batchApiService.findByPrimaryKey(firstFailTask.getBatchId());
// 加个锁,当选择的任务量比较多的时候,需要时间进行处理
Lock lock = redisLockRegistry.obtain(String.format("%s%s", RedisConstant.PLAN_BATCH_RETRY_LOCK_PRE, planBatch.getId()));
if (!lock.tryLock()) {
// 获取不到锁,正在处理重试
throw new PlatformRuntimeException(ExecPlanError.BATCH_IS_DEALING_WITH_RETRY);
}
try {
result = taskApiService.taskRetry(retryDto);
} finally {
lock.unlock();
}
}
logger.info("end taskRetry");

View File

@ -128,6 +128,14 @@ public interface AtuPlanTaskApiService extends ExcelService
*/
AtuPlanTask findByPrimaryKey(String id);
/**
*
*
* @param keys
* @return
*/
List<AtuPlanTask> findByPrimaryKeys(List<Object> keys);
/**
*
* @param retryDto id

View File

@ -1470,6 +1470,11 @@ public class AtuPlanTaskApiServiceImpl extends AbstractExcelService<AtuPlanTask>
return this.atuPlanTaskService.findByPrimaryKey(id);
}
@Override
public List<AtuPlanTask> findByPrimaryKeys(List<Object> keys) {
return this.atuPlanTaskService.findByPrimaryKeys(keys);
}
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean taskRetry(BatchRetryDto retryDto) {
@ -1486,13 +1491,6 @@ public class AtuPlanTaskApiServiceImpl extends AbstractExcelService<AtuPlanTask>
PlanConstant.BATCH_EXECUTING_STATUS.equals(planBatch.getStatus())) {
throw new PlatformRuntimeException(ExecPlanError.BATCH_NOT_FINISH);
}
// 加个锁,当选择的任务量比较多的时候,需要时间进行处理
Lock lock = redisLockRegistry.obtain(String.format("%s%s", RedisConstant.PLAN_BATCH_RETRY_LOCK_PRE, planBatch.getId()));
if (!lock.tryLock()) {
// 获取不到锁,正在处理重试
throw new PlatformRuntimeException(ExecPlanError.BATCH_IS_DEALING_WITH_RETRY);
}
try {
logger.debug("根据批次编号[{}]查询计划信息", firstFailTask.getBatchId());
AtuPlanInfo planInfo = atuPlanInfoApiService.findByBatchId(firstFailTask.getBatchId());
if (planInfo == null) {
@ -1751,9 +1749,6 @@ public class AtuPlanTaskApiServiceImpl extends AbstractExcelService<AtuPlanTask>
atuPlanInfoApiService.handlePlanDevice(planInfo.getId(), planInfo.getPriority(), planBatch.getId(),
retryDto.getHasOfflineDevice(), caseTypeMap, true);
return true;
} finally {
lock.unlock();
}
}
@Override

View File

@ -88,7 +88,7 @@ public enum ExecPlanError implements PlatformError {
GET_FILE_FAIL("获取文件失败"),
EMPTY_BATCH_ID("批次id不能为空"),
BATCH_IS_DEALING_WITH_RETRY("批次的任务重试正在处理中..."),
BATCH_IS_DEALING_WITH_RETRY("批次信息正在处理中,请稍后尝试操作..."),
/**
*

View File

@ -9,6 +9,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import net.northking.cctp.common.enums.FileBusinessTypeEnum;
import net.northking.cctp.common.exception.PlatformRuntimeException;
import net.northking.cctp.common.s3.NKFile;
import net.northking.cctp.common.s3.SimpleStorageService;
import net.northking.cctp.common.security.authentication.NKSecurityContext;
@ -27,6 +28,7 @@ import net.northking.cctp.executePlan.dto.planBatch.*;
import net.northking.cctp.executePlan.dto.planInfo.AtuPlanRunDto;
import net.northking.cctp.executePlan.dto.planTask.ScriptStatusStatistic;
import net.northking.cctp.executePlan.enums.MobilePlatformEnum;
import net.northking.cctp.executePlan.exception.ExecPlanError;
import net.northking.cctp.executePlan.feign.PublicFeignClient;
import net.northking.cctp.executePlan.utils.MinioPathUtils;
import org.redisson.api.RLock;
@ -34,6 +36,7 @@ import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
@ -119,6 +122,9 @@ public class PlanBatchTaskDataUpdateJob {
@Autowired
private RedisLockRegistry redisLockRegistry;
@Autowired
private RabbitTemplate rabbitTemplate;
private final static String LOCK_KEY_BATCH_SUM = "LOCK:PLAN:BATCH-SUM-DATA-UPDATE-METHOD";
private final static String BATCH_IS_NULL_COUNT_HASH = "PLAN:BATCH:NULL-COUNT-HASH";
@ -165,7 +171,13 @@ public class PlanBatchTaskDataUpdateJob {
String batchId = key.substring(key.lastIndexOf(":") + 1);
String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) +
RedisConstant.CLUSTER_KEY_SUFFIX;
Lock batchLock = redisLockRegistry.obtain(String.format("%s%s", RedisConstant.PLAN_BATCH_RETRY_LOCK_PRE, batchId));
if (!batchLock.tryLock()) {
// 获取不到锁,正在处理重试,此次统计不需要
logger.info("批次【{}】正在处理任务重试操作,此次统计不需要", batchId);
return;
}
try {
// 通过数据库进行统计,减少统计数据出错的原因
AtuPlanBatch batch = this.planBatchService.findByPrimaryKey(batchId);
if (batch == null) {
@ -334,6 +346,10 @@ public class PlanBatchTaskDataUpdateJob {
if (batchPlanInfo != null) {
handleEnd(batchPlanInfo, atuPlanBatchDetailDto);
}
} finally {
batchLock.unlock();
}
});
logger.debug("同步缓存中计划批次统计数据----end----");
} catch (Exception e) {