From 61ac7cd4944d60a8762a86ace24714c91af014df Mon Sep 17 00:00:00 2001 From: "jieying.li" Date: Thu, 5 Dec 2024 15:55:21 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E6=89=A7=E8=A1=8C=E8=AE=A1?= =?UTF-8?q?=E5=88=92=201.=E4=BF=AE=E6=94=B9=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E7=BB=9F=E8=AE=A1=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E6=9F=A5=E8=AF=A2=E5=BA=93=E8=A1=A8=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executePlan/config/AtuPlanConfig.java | 33 ++ .../executePlan/db/dao/AtuPlanTaskDao.java | 12 +- .../db/impl/AtuPlanTaskServiceImpl.java | 17 + .../db/service/AtuPlanTaskService.java | 9 + .../job/PlanBatchTaskDataUpdateJob.java | 356 ++++++++++-------- .../resources/mybatis/ext/AtuPlanTask.Dao.xml | 11 + 6 files changed, 284 insertions(+), 154 deletions(-) diff --git a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/config/AtuPlanConfig.java b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/config/AtuPlanConfig.java index 098d5d6..b17ae61 100644 --- a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/config/AtuPlanConfig.java +++ b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/config/AtuPlanConfig.java @@ -21,4 +21,37 @@ public class AtuPlanConfig { } this.waitTimeout = waitTimeout; } + + /** + * 是否启用执行超时,默认启用 + * atu.plan.batchExecuteTimeoutEnabled + */ + private Boolean batchExecuteTimeoutEnabled; + + /** + * 批次执行超时时间,默认48小时 + * atu.plan.batchExecuteTimeout + */ + private Integer batchExecuteTimeout; + + + public boolean isBatchExecuteTimeoutEnabled() { + return batchExecuteTimeoutEnabled == null ? true : batchExecuteTimeoutEnabled; + } + + public void setBatchExecuteTimeoutEnabled(boolean batchExecuteTimeoutEnabled) { + this.batchExecuteTimeoutEnabled = batchExecuteTimeoutEnabled; + } + + public Integer getBatchExecuteTimeout() { + if (isBatchExecuteTimeoutEnabled()) { + // 如果是启用了批次执行超时, + return batchExecuteTimeout == null ? 48 : batchExecuteTimeout; + } + return 48; + } + + public void setBatchExecuteTimeout(Integer batchExecuteTimeout) { + this.batchExecuteTimeout = batchExecuteTimeout; + } } diff --git a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/dao/AtuPlanTaskDao.java b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/dao/AtuPlanTaskDao.java index cb408da..bc06590 100644 --- a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/dao/AtuPlanTaskDao.java +++ b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/dao/AtuPlanTaskDao.java @@ -4,7 +4,6 @@ */ package net.northking.cctp.executePlan.db.dao; -import net.northking.cctp.common.http.QueryByPage; import net.northking.cctp.executePlan.db.entity.AtuPlanTask; import net.northking.cctp.executePlan.db.mapper.AtuPlanTaskMapper; import net.northking.cctp.executePlan.dto.planTask.*; @@ -131,4 +130,15 @@ public interface AtuPlanTaskDao extends AtuPlanTaskMapper List queryWaitTimeoutTask(@Param("waitTimeout") Integer waitTimeout); List countTaskStatus(AtuPlanTask task); + + /** + * 将批次下的所有等待中和执行中的任务改成超时 + * + * @param batchId 批次id + * @param timeoutStatus 超时状态码 + * @param waitStatus 等待中状态码 + * @param startStatus 执行中状态码 + */ + long batchUpdateTaskStatusToTimeout(@Param(value = "batchId")String batchId, @Param(value = "timeoutStatus")String timeoutStatus, + @Param(value = "waitStatus")String waitStatus, @Param(value = "startStatus")String startStatus); } diff --git a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/impl/AtuPlanTaskServiceImpl.java b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/impl/AtuPlanTaskServiceImpl.java index 99147e8..c19051e 100644 --- a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/impl/AtuPlanTaskServiceImpl.java +++ b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/impl/AtuPlanTaskServiceImpl.java @@ -4,6 +4,7 @@ */ package net.northking.cctp.executePlan.db.impl; +import cn.hutool.core.util.StrUtil; import net.northking.cctp.common.db.BasicDao; import net.northking.cctp.common.db.PaginationService; import net.northking.cctp.common.exception.PlatformRuntimeException; @@ -197,6 +198,22 @@ private static final Logger logger = LoggerFactory.getLogger(AtuPlanTaskServiceI return this.atuPlanTaskDao.countTaskStatus(task); } + /** + * 将批次下的所有等待中和执行中的任务改成超时 + * + * @param batchId 批次id + * @param timeoutStatus 超时状态码 + * @param waitStatus 等待中状态码 + * @param startStatus 执行中状态码 + */ + @Override + public void batchUpdateTaskStatusToTimeout(String batchId, String timeoutStatus, String waitStatus, String startStatus) { + if (StringUtils.isBlank(batchId)) { + throw new PlatformRuntimeException(ExecPlanError.EMPTY_BATCH_ID); + } + this.atuPlanTaskDao.batchUpdateTaskStatusToTimeout(batchId, timeoutStatus, waitStatus, startStatus); + } + // ---- The End by Generator ----// diff --git a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/service/AtuPlanTaskService.java b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/service/AtuPlanTaskService.java index 81b772d..44e6ffc 100644 --- a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/service/AtuPlanTaskService.java +++ b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/db/service/AtuPlanTaskService.java @@ -140,4 +140,13 @@ public interface AtuPlanTaskService extends BasicService * @return */ List countTaskStatus(AtuPlanTask task); + + /** + * 将批次下的所有等待中和执行中的任务改成超时 + * @param batchId 批次id + * @param timeoutStatus 超时状态码 + * @param waitStatus 等待中状态码 + * @param startStatus 执行中状态码 + */ + void batchUpdateTaskStatusToTimeout(String batchId, String timeoutStatus, String waitStatus, String startStatus); } diff --git a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java index 3f81593..d251d98 100644 --- a/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java +++ b/cctp-atu/atu-execute-plan/src/main/java/net/northking/cctp/executePlan/job/PlanBatchTaskDataUpdateJob.java @@ -16,6 +16,7 @@ import net.northking.cctp.executePlan.api.service.AtuPlanInfoApiService; import net.northking.cctp.executePlan.api.service.AtuPlanTaskApiService; import net.northking.cctp.executePlan.api.service.MessageCenterService; import net.northking.cctp.executePlan.api.third.feilang.service.FeiLangService; +import net.northking.cctp.executePlan.config.AtuPlanConfig; import net.northking.cctp.executePlan.constants.MsgConstant; import net.northking.cctp.executePlan.constants.PlanConstant; import net.northking.cctp.executePlan.constants.RabbitConstant; @@ -36,14 +37,20 @@ import org.springframework.amqp.core.AmqpAdmin; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.integration.redis.util.RedisLockRegistry; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import java.io.File; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; /** @@ -109,6 +116,14 @@ public class PlanBatchTaskDataUpdateJob { @Autowired private RedissonClient redisson; + @Autowired + private RedisLockRegistry redisLockRegistry; + + private final static String LOCK_KEY_BATCH_SUM = "LOCK:PLAN:BATCH-SUM-DATA-UPDATE-METHOD"; + + @Autowired + private AtuPlanConfig atuPlanConfig; + public static final int EXECUTOR_CORE_POOL_SIZE = 2 * 2 + 2; public static final int EXECUTOR_MAX_POOL_SIZE = 10; public static final int EXECUTOR_LINKED_BLOCKING_SIZE = 50; @@ -121,178 +136,185 @@ public class PlanBatchTaskDataUpdateJob { @Scheduled(fixedRateString = "${atu.plan.batchCountUpdateJob:20000}") private void batchSumDataUpdate() { - logger.debug("同步缓存中计划批次统计数据----start----"); - // 1. 获取缓存中所有的批次统计数据 - String patternKey = RedisConstant.CLUSTER_KEY_PREFIX + "*" + RedisConstant.CLUSTER_KEY_SUFFIX + - RedisConstant.BATCH_SCRIPT_SUM_KEY + "*"; - Set batchKeySet = redisTemplate.keys(patternKey); - if (CollUtil.isEmpty(batchKeySet)) { - logger.debug("不存在批次缓存统计数据----end----"); + // 统计加锁,多节点只需要一个节点启动 + Lock lock = redisLockRegistry.obtain(LOCK_KEY_BATCH_SUM); + if (!lock.tryLock()) { // 没有拿到锁退出方法,有别的节点获取到锁进行统计 return; } - logger.debug("缓存中计划批次统计数据量 => " + batchKeySet.size()); - batchKeySet.forEach(key -> { - Map entries = redisTemplate.opsForHash().entries(key); - if (CollUtil.isEmpty(entries)) { - logger.info("缓存[" + key + "]中无统计数据"); - // 没有统计数据,删掉key - redisTemplate.delete(key); + try { + logger.debug("同步缓存中计划批次统计数据----start----"); + // 1. 获取缓存中所有的批次统计数据 + String patternKey = RedisConstant.CLUSTER_KEY_PREFIX + "*" + RedisConstant.CLUSTER_KEY_SUFFIX + + RedisConstant.BATCH_SCRIPT_SUM_KEY + "*"; + Set batchKeySet = redisTemplate.keys(patternKey); + if (CollUtil.isEmpty(batchKeySet)) { + logger.debug("不存在批次缓存统计数据----end----"); return; } - String batchId = key.substring(key.lastIndexOf(":") + 1); - String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) + - RedisConstant.CLUSTER_KEY_SUFFIX; - // 2. 遍历判断该批次是否已完成 - int waitTotal = 0; - Object waitStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_WAIT_EXECUTE_STATUS); - if (ObjectUtil.isNotNull(waitStatusObj)) { - waitTotal = Integer.parseInt(waitStatusObj.toString()); - } - int runningTotal = 0; - Object runningStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_START_EXECUTE_STATUS); - if (ObjectUtil.isNotNull(runningStatusObj)) { - runningTotal = Integer.parseInt(runningStatusObj.toString()); - } - // TODO: 判断有没有数据是否合法、如果存在问题,重新获取数据库进行统计刷新 - if (waitTotal < 0 || runningTotal < 0) { // 重新统计 + logger.debug("缓存中计划批次统计数据量 => " + batchKeySet.size()); + batchKeySet.forEach(key -> { + Map entries = redisTemplate.opsForHash().entries(key); + if (CollUtil.isEmpty(entries)) { + logger.info("缓存[" + key + "]中无统计数据"); + // 没有统计数据,删掉key + redisTemplate.delete(key); + return; + } + String batchId = key.substring(key.lastIndexOf(":") + 1); + String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) + + RedisConstant.CLUSTER_KEY_SUFFIX; + + // 通过数据库进行统计,减少统计数据出错的原因 AtuPlanTask taskQ = new AtuPlanTask(); taskQ.setBatchId(batchId); List resultList = planTaskService.countTaskStatus(taskQ); + List batchStatusList = new ArrayList<>(); for (ScriptStatusStatistic result : resultList) { Set statusSet = result.getStatusSet(); - if (!statusSet.isEmpty()) { - // 状态集合不为空,根据状态设置脚本的结果状态 + result.setStatus(scriptStatus(statusSet)); // 根据脚本下的所有任务确定脚本的执行结果 + batchStatusList.add(result.getStatus()); + } + int allScriptCount = batchStatusList.size(); // 总脚本执行数 + // 等待中 + int waitCount = Collections.frequency(batchStatusList, PlanConstant.TASK_WAIT_EXECUTE_STATUS); + // 执行中 + int ingCount = Collections.frequency(batchStatusList, PlanConstant.TASK_START_EXECUTE_STATUS); + // 成功 + int successCount = Collections.frequency(batchStatusList, PlanConstant.TASK_EXECUTE_SUCCESS_STATUS); + // 执行失败 + int executeFailCount = Collections.frequency(batchStatusList, PlanConstant.TASK_EXECUTE_FAIL_STATUS); + // 断言失败 + int assertFailCount = Collections.frequency(batchStatusList, PlanConstant.TASK_ASSERT_FAIL_STATUS); + // 取消的 + int cancelCount = Collections.frequency(batchStatusList, PlanConstant.TASK_CANCEL_STATUS); + // 超时的 + int timeoutCount = Collections.frequency(batchStatusList, PlanConstant.TASK_TIMEOUT_STATUS); - } else { - // todo: 如果是空,如何处理 + if (waitCount > 0 || ingCount > 0) { // 批次还没有完成 + // 检查批次的开始时间 + AtuPlanBatch batch = this.planBatchService.findByPrimaryKey(batchId); + if (batch != null) { + Date createdTime = batch.getCreatedTime(); + if (createdTime != null) { // 有创建时间,那么比较开始时间与现在的时间差 + LocalDateTime createDateTime = createdTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + LocalDateTime now = LocalDateTime.now(); + Duration duration = Duration.between(createDateTime, now); + long hours = duration.toHours(); + if (atuPlanConfig.isBatchExecuteTimeoutEnabled() && hours > atuPlanConfig.getBatchExecuteTimeout().longValue()) { // 参数化 + logger.info("批次[{}-{}]已创建{}小时,任务没有执行完成,强制结束批次", batch.getId(), batch.getBatch(), 48); + // 如果批次的创建时间离现在已经过去48小时还没完成,那么判定批次已完成,等待中|执行中 的任务全部算超时处理 + timeoutCount += waitCount; + timeoutCount += ingCount; + waitCount = 0; + ingCount = 0; + // 更新批次下 等待中(0)|执行中(1) 的任务全部算超时处理(6) + this.planTaskService.batchUpdateTaskStatusToTimeout(batch.getId(), PlanConstant.TASK_TIMEOUT_STATUS, PlanConstant.TASK_WAIT_EXECUTE_STATUS, PlanConstant.TASK_START_EXECUTE_STATUS); + } else { + // 批次还没有完成,退出统计 + return; + } + } else { + // 有批次信息,但是没有创建时间,应该是数据有问题 + logger.debug("批次[{}]没有创建时间", batch.getId()); + return; + } + } else { // 没有对应批次数据,删除缓存数据 + logger.debug("没有对应的批次信息[{}],删除缓存数据", batchId); + redisTemplate.delete(key); + return; // 退出这个批次统计 } } - } - // 失败的数据 - // 脚本超时的 - if (waitTotal != 0 || runningTotal != 0) { - // 该批次还未完成 - return; - } - // 2.1. 完成则更新数据库中的批次统计数据 - int total = 0; - Object scriptTotalObj = entries.get(clusterKeyPrefix + PlanConstant.SCRIPT_TOTAL); - if (ObjectUtil.isNotNull(scriptTotalObj)) { - total = Integer.parseInt(scriptTotalObj.toString()); - } - int successTotal = 0; - Object successStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_EXECUTE_SUCCESS_STATUS); - if (ObjectUtil.isNotNull(successStatusObj)) { - successTotal = Integer.parseInt(successStatusObj.toString()); - } - int failTotal = 0; - Object failStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_EXECUTE_FAIL_STATUS); - if (ObjectUtil.isNotNull(failStatusObj)) { - failTotal = Integer.parseInt(failStatusObj.toString()); - if (failTotal < 0) { - failTotal = 0; + // 批次已完成 + + // 更新批次表 + AtuPlanBatch planBatch = new AtuPlanBatch(); + planBatch.setId(batchId); + planBatch.setSuccessNum(successCount); + planBatch.setExecFailNum(executeFailCount); + planBatch.setAssertFailNum(assertFailCount); + planBatch.setTimeoutNum(timeoutCount); + planBatch.setCancelNum(cancelCount); + + planBatch.setSuccessRate(MsgConstant.PLAN_BATCH_SUCCESS_RATE); + if (planBatch.getSuccessNum() > 0 && allScriptCount > 0) { + planBatch.setSuccessRate(NumberUtil.decimalFormat(MsgConstant.PATTERN_HASH_DOT_HASH_HASH, NumberUtil.div(planBatch.getSuccessNum(), allScriptCount))); } - } - int assertFailTotal = 0; - Object assertFailStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_ASSERT_FAIL_STATUS); - if (ObjectUtil.isNotNull(assertFailStatusObj)) { - assertFailTotal = Integer.parseInt(assertFailStatusObj.toString()); - } - int timeoutTotal = 0; - Object timeoutStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_TIMEOUT_STATUS); - if (ObjectUtil.isNotNull(timeoutStatusObj)) { - timeoutTotal = Integer.parseInt(timeoutStatusObj.toString()); - } - int cancelTotal = 0; - Object cancelStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_CANCEL_STATUS); - if (ObjectUtil.isNotNull(cancelStatusObj)) { - cancelTotal = Integer.parseInt(cancelStatusObj.toString()); - } - // TODO: 如果等待中和执行中都没有,都为0,统计库表数据进行更新,而不是通过缓存 - // 更新批次表 - AtuPlanBatch planBatch = new AtuPlanBatch(); - planBatch.setId(batchId); - planBatch.setSuccessNum(successTotal); - planBatch.setExecFailNum(failTotal); - planBatch.setAssertFailNum(assertFailTotal); - planBatch.setTimeoutNum(timeoutTotal); - planBatch.setCancelNum(cancelTotal); - - planBatch.setSuccessRate(MsgConstant.PLAN_BATCH_SUCCESS_RATE); - if (successTotal > 0 && total > 0) { - planBatch.setSuccessRate(NumberUtil.decimalFormat(MsgConstant.PATTERN_HASH_DOT_HASH_HASH, NumberUtil.div(successTotal, total))); - } - // 查询批次最先开始执行的任务的开始时间 - if (planBatch.getStartTime() == null) { - Long firstTaskStartTime = planTaskService.queryBatchFirstTaskStartTime(batchId); - if (ObjectUtil.isNotNull(firstTaskStartTime)) { - planBatch.setStartTime(new Date(firstTaskStartTime)); + // 查询批次最先开始执行的任务的开始时间 + if (planBatch.getStartTime() == null) { + Long firstTaskStartTime = planTaskService.queryBatchFirstTaskStartTime(batchId); + if (ObjectUtil.isNotNull(firstTaskStartTime)) { + planBatch.setStartTime(new Date(firstTaskStartTime)); + } + } + // 任务状态判断 + String planStatus = ""; + if (planBatch.getCancelNum() > 0) { + planStatus = PlanConstant.PLAN_CANCEL_STATUS; + // 修改为取消状态 + planBatch.setStatus(PlanConstant.BATCH_CANCEL_STATUS); + } else { + planStatus = PlanConstant.PLAN_FINISH_STATUS; + // 修改为已完成状态 + planBatch.setStatus(PlanConstant.BATCH_FINISH_STATUS); } - } - // 任务状态判断 - String planStatus = ""; - if (cancelTotal > 0) { - planStatus = PlanConstant.PLAN_CANCEL_STATUS; - // 修改为取消状态 - planBatch.setStatus(PlanConstant.BATCH_CANCEL_STATUS); - } else { - planStatus = PlanConstant.PLAN_FINISH_STATUS; - // 修改为已完成状态 - planBatch.setStatus(PlanConstant.BATCH_FINISH_STATUS); - } - // 查询批次最后执行完成的任务的结束时间 - Long lastTaskEndTime = planTaskService.queryBatchLastTaskEndTime(batchId); - if (ObjectUtil.isNotNull(lastTaskEndTime)) { - planBatch.setEndTime(new Date(lastTaskEndTime)); - } - logger.debug("更新批次信息"); - planBatchService.updateByPrimaryKey(planBatch); + // 查询批次最后执行完成的任务的结束时间 + Long lastTaskEndTime = planTaskService.queryBatchLastTaskEndTime(batchId); + if (ObjectUtil.isNotNull(lastTaskEndTime)) { + planBatch.setEndTime(new Date(lastTaskEndTime)); + } + logger.debug("更新批次信息"); + planBatchService.updateByPrimaryKey(planBatch); - // 删除任务队列 - amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_PC_KEY + batchId); - // 移动端删除各个平台对于的队列 - MobilePlatformEnum[] platformEnums = MobilePlatformEnum.values(); - for (MobilePlatformEnum platformEnum : platformEnums) { - amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_MOB_KEY + platformEnum.getName() + "." + batchId); - } - amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_API_KEY + batchId); + // 删除任务队列 + amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_PC_KEY + batchId); + // 移动端删除各个平台对于的队列 + MobilePlatformEnum[] platformEnums = MobilePlatformEnum.values(); + for (MobilePlatformEnum platformEnum : platformEnums) { + amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_MOB_KEY + platformEnum.getName() + "." + batchId); + } + amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_API_KEY + batchId); - //获取更新后的批次信息 - AtuPlanBatchDetailDto atuPlanBatchDetailDto = planBatchService.queryBatchDetailById(planBatch.getId()); + //获取更新后的批次信息 + AtuPlanBatchDetailDto atuPlanBatchDetailDto = planBatchService.queryBatchDetailById(planBatch.getId()); - // 判断是否计划最后一批次 - AtuPlanInfo planInfo = planInfoService.queryByLastBatchId(batchId); - if (ObjectUtil.isNotNull(planInfo)) { - // 是则更新计划表 - planInfo.setStatus(planStatus); - planInfo.setWaitingNum(waitTotal); - planInfo.setRunningNum(runningTotal); - planInfo.setSuccessNum(successTotal); - planInfo.setExecFailNum(failTotal); - planInfo.setAssertFailNum(assertFailTotal); - planInfo.setTimeoutNum(timeoutTotal); - planInfo.setCancelNum(cancelTotal); - //不用代码生成的方法 - planInfoService.updatePlanResultById(planInfo); - } + // 判断是否计划最后一批次 + AtuPlanInfo planInfo = planInfoService.queryByLastBatchId(batchId); + if (ObjectUtil.isNotNull(planInfo)) { + // 是则更新计划表 + planInfo.setStatus(planStatus); + planInfo.setWaitingNum(waitCount); + planInfo.setRunningNum(ingCount); + planInfo.setSuccessNum(planBatch.getSuccessNum()); + planInfo.setExecFailNum(planBatch.getExecFailNum()); + planInfo.setAssertFailNum(planBatch.getAssertFailNum()); + planInfo.setTimeoutNum(planBatch.getTimeoutNum()); + planInfo.setCancelNum(planBatch.getCancelNum()); + //不用代码生成的方法 + planInfoService.updatePlanResultById(planInfo); + } - executor.execute(() -> { - // 保存设备性能数据 - saveDevicePerData(atuPlanBatchDetailDto); + executor.execute(() -> { + // 保存设备性能数据 + saveDevicePerData(atuPlanBatchDetailDto); + }); + + // 2.2. 删除缓存中已完成批次的记录 + logger.debug("删除批次统计缓存 => " + key); + redisTemplate.delete(key); + + // 收尾处理 + AtuPlanInfo batchPlanInfo = planInfoService.findByBatchId(batchId); + if (batchPlanInfo != null) { + handleEnd(batchPlanInfo, atuPlanBatchDetailDto); + } }); - - // 2.2. 删除缓存中已完成批次的记录 - logger.debug("删除批次统计缓存 => " + key); - redisTemplate.delete(key); - - // 收尾处理 - AtuPlanInfo batchPlanInfo = planInfoService.findByBatchId(batchId); - if (batchPlanInfo != null) { - handleEnd(batchPlanInfo, atuPlanBatchDetailDto); - } - }); - logger.debug("同步缓存中计划批次统计数据----end----"); + logger.debug("同步缓存中计划批次统计数据----end----"); + } catch (Exception e) { + logger.error("批次统计数据执行发生异常", e); + } finally { + lock.unlock(); + } } /** @@ -728,8 +750,36 @@ public class PlanBatchTaskDataUpdateJob { planScriptLinkService.insertByBatch(scriptLinkList); } + /** + * 用于确定脚本的执行结果状态 + * @param statusSet 脚本下的所有任务状态 + * @return + */ private String scriptStatus(Set statusSet) { - if (statusSet == null || statusSet.isEmpty()) { + String status = PlanConstant.TASK_EXECUTE_SUCCESS_STATUS; // 默认成功 + if (statusSet == null || statusSet.isEmpty()) { // 如果状态为空,那么脚本成功 + return status; } + boolean flag = statusSet.remove(PlanConstant.TASK_WAIT_EXECUTE_STATUS); + if (flag) { // 如果存在等待(0) + if (statusSet.isEmpty()) { // 全都是等待(0) + status = PlanConstant.TASK_WAIT_EXECUTE_STATUS; + } else { // 还有其他状态 + status = PlanConstant.TASK_START_EXECUTE_STATUS; + } + } else { // 不存在等待(0) + if (statusSet.remove(PlanConstant.TASK_START_EXECUTE_STATUS)) { // 存在执行中(1) + status = PlanConstant.TASK_START_EXECUTE_STATUS; + } else if (statusSet.remove(PlanConstant.TASK_CANCEL_STATUS)) { // 存在取消(5) + status = PlanConstant.TASK_CANCEL_STATUS; + } else if (statusSet.remove(PlanConstant.TASK_TIMEOUT_STATUS)) { // 存在超时(6) + status = PlanConstant.TASK_TIMEOUT_STATUS; + } else if (statusSet.remove(PlanConstant.TASK_ASSERT_FAIL_STATUS)) { // 存在断言失败(4) + status = PlanConstant.TASK_ASSERT_FAIL_STATUS; + } else if (statusSet.remove(PlanConstant.TASK_EXECUTE_FAIL_STATUS)) { // 存在执行失败(3) + status = PlanConstant.TASK_EXECUTE_FAIL_STATUS; + } + } + return status; } } diff --git a/cctp-atu/atu-execute-plan/src/main/resources/mybatis/ext/AtuPlanTask.Dao.xml b/cctp-atu/atu-execute-plan/src/main/resources/mybatis/ext/AtuPlanTask.Dao.xml index 355d360..0b87f2e 100644 --- a/cctp-atu/atu-execute-plan/src/main/resources/mybatis/ext/AtuPlanTask.Dao.xml +++ b/cctp-atu/atu-execute-plan/src/main/resources/mybatis/ext/AtuPlanTask.Dao.xml @@ -386,4 +386,15 @@ batch_id = #{task.batchId,jdbcType=VARCHAR} GROUP BY script_id + + + UPDATE + + SET + `status` = #{timeoutStatus,jdbcType=VARCHAR} + WHERE + batch_id = #{batchId,jdbcType=VARCHAR} + AND + (`status` = #{waitStatus, jdbcType=VARCHAR} or `status` = #{timeoutStatus, jdbcType=VARCHAR}) + \ No newline at end of file