parent
eb58a1dd44
commit
61ac7cd494
|
@ -21,4 +21,37 @@ public class AtuPlanConfig {
|
|||
}
|
||||
this.waitTimeout = waitTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否启用执行超时,默认启用
|
||||
* atu.plan.batchExecuteTimeoutEnabled
|
||||
*/
|
||||
private Boolean batchExecuteTimeoutEnabled;
|
||||
|
||||
/**
|
||||
* 批次执行超时时间,默认48小时
|
||||
* atu.plan.batchExecuteTimeout
|
||||
*/
|
||||
private Integer batchExecuteTimeout;
|
||||
|
||||
|
||||
public boolean isBatchExecuteTimeoutEnabled() {
|
||||
return batchExecuteTimeoutEnabled == null ? true : batchExecuteTimeoutEnabled;
|
||||
}
|
||||
|
||||
public void setBatchExecuteTimeoutEnabled(boolean batchExecuteTimeoutEnabled) {
|
||||
this.batchExecuteTimeoutEnabled = batchExecuteTimeoutEnabled;
|
||||
}
|
||||
|
||||
public Integer getBatchExecuteTimeout() {
|
||||
if (isBatchExecuteTimeoutEnabled()) {
|
||||
// 如果是启用了批次执行超时,
|
||||
return batchExecuteTimeout == null ? 48 : batchExecuteTimeout;
|
||||
}
|
||||
return 48;
|
||||
}
|
||||
|
||||
public void setBatchExecuteTimeout(Integer batchExecuteTimeout) {
|
||||
this.batchExecuteTimeout = batchExecuteTimeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
*/
|
||||
package net.northking.cctp.executePlan.db.dao;
|
||||
|
||||
import net.northking.cctp.common.http.QueryByPage;
|
||||
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
||||
import net.northking.cctp.executePlan.db.mapper.AtuPlanTaskMapper;
|
||||
import net.northking.cctp.executePlan.dto.planTask.*;
|
||||
|
@ -131,4 +130,15 @@ public interface AtuPlanTaskDao extends AtuPlanTaskMapper
|
|||
List<AtuPlanTask> queryWaitTimeoutTask(@Param("waitTimeout") Integer waitTimeout);
|
||||
|
||||
List<ScriptStatusStatistic> countTaskStatus(AtuPlanTask task);
|
||||
|
||||
/**
|
||||
* 将批次下的所有等待中和执行中的任务改成超时
|
||||
*
|
||||
* @param batchId 批次id
|
||||
* @param timeoutStatus 超时状态码
|
||||
* @param waitStatus 等待中状态码
|
||||
* @param startStatus 执行中状态码
|
||||
*/
|
||||
long batchUpdateTaskStatusToTimeout(@Param(value = "batchId")String batchId, @Param(value = "timeoutStatus")String timeoutStatus,
|
||||
@Param(value = "waitStatus")String waitStatus, @Param(value = "startStatus")String startStatus);
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
*/
|
||||
package net.northking.cctp.executePlan.db.impl;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import net.northking.cctp.common.db.BasicDao;
|
||||
import net.northking.cctp.common.db.PaginationService;
|
||||
import net.northking.cctp.common.exception.PlatformRuntimeException;
|
||||
|
@ -197,6 +198,22 @@ private static final Logger logger = LoggerFactory.getLogger(AtuPlanTaskServiceI
|
|||
return this.atuPlanTaskDao.countTaskStatus(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将批次下的所有等待中和执行中的任务改成超时
|
||||
*
|
||||
* @param batchId 批次id
|
||||
* @param timeoutStatus 超时状态码
|
||||
* @param waitStatus 等待中状态码
|
||||
* @param startStatus 执行中状态码
|
||||
*/
|
||||
@Override
|
||||
public void batchUpdateTaskStatusToTimeout(String batchId, String timeoutStatus, String waitStatus, String startStatus) {
|
||||
if (StringUtils.isBlank(batchId)) {
|
||||
throw new PlatformRuntimeException(ExecPlanError.EMPTY_BATCH_ID);
|
||||
}
|
||||
this.atuPlanTaskDao.batchUpdateTaskStatusToTimeout(batchId, timeoutStatus, waitStatus, startStatus);
|
||||
}
|
||||
|
||||
// ---- The End by Generator ----//
|
||||
|
||||
|
||||
|
|
|
@ -140,4 +140,13 @@ public interface AtuPlanTaskService extends BasicService<AtuPlanTask>
|
|||
* @return
|
||||
*/
|
||||
List<ScriptStatusStatistic> countTaskStatus(AtuPlanTask task);
|
||||
|
||||
/**
|
||||
* 将批次下的所有等待中和执行中的任务改成超时
|
||||
* @param batchId 批次id
|
||||
* @param timeoutStatus 超时状态码
|
||||
* @param waitStatus 等待中状态码
|
||||
* @param startStatus 执行中状态码
|
||||
*/
|
||||
void batchUpdateTaskStatusToTimeout(String batchId, String timeoutStatus, String waitStatus, String startStatus);
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import net.northking.cctp.executePlan.api.service.AtuPlanInfoApiService;
|
|||
import net.northking.cctp.executePlan.api.service.AtuPlanTaskApiService;
|
||||
import net.northking.cctp.executePlan.api.service.MessageCenterService;
|
||||
import net.northking.cctp.executePlan.api.third.feilang.service.FeiLangService;
|
||||
import net.northking.cctp.executePlan.config.AtuPlanConfig;
|
||||
import net.northking.cctp.executePlan.constants.MsgConstant;
|
||||
import net.northking.cctp.executePlan.constants.PlanConstant;
|
||||
import net.northking.cctp.executePlan.constants.RabbitConstant;
|
||||
|
@ -36,14 +37,20 @@ import org.springframework.amqp.core.AmqpAdmin;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.integration.redis.util.RedisLockRegistry;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.io.File;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -109,6 +116,14 @@ public class PlanBatchTaskDataUpdateJob {
|
|||
@Autowired
|
||||
private RedissonClient redisson;
|
||||
|
||||
@Autowired
|
||||
private RedisLockRegistry redisLockRegistry;
|
||||
|
||||
private final static String LOCK_KEY_BATCH_SUM = "LOCK:PLAN:BATCH-SUM-DATA-UPDATE-METHOD";
|
||||
|
||||
@Autowired
|
||||
private AtuPlanConfig atuPlanConfig;
|
||||
|
||||
public static final int EXECUTOR_CORE_POOL_SIZE = 2 * 2 + 2;
|
||||
public static final int EXECUTOR_MAX_POOL_SIZE = 10;
|
||||
public static final int EXECUTOR_LINKED_BLOCKING_SIZE = 50;
|
||||
|
@ -121,178 +136,185 @@ public class PlanBatchTaskDataUpdateJob {
|
|||
|
||||
@Scheduled(fixedRateString = "${atu.plan.batchCountUpdateJob:20000}")
|
||||
private void batchSumDataUpdate() {
|
||||
logger.debug("同步缓存中计划批次统计数据----start----");
|
||||
// 1. 获取缓存中所有的批次统计数据
|
||||
String patternKey = RedisConstant.CLUSTER_KEY_PREFIX + "*" + RedisConstant.CLUSTER_KEY_SUFFIX +
|
||||
RedisConstant.BATCH_SCRIPT_SUM_KEY + "*";
|
||||
Set<String> batchKeySet = redisTemplate.keys(patternKey);
|
||||
if (CollUtil.isEmpty(batchKeySet)) {
|
||||
logger.debug("不存在批次缓存统计数据----end----");
|
||||
// 统计加锁,多节点只需要一个节点启动
|
||||
Lock lock = redisLockRegistry.obtain(LOCK_KEY_BATCH_SUM);
|
||||
if (!lock.tryLock()) { // 没有拿到锁退出方法,有别的节点获取到锁进行统计
|
||||
return;
|
||||
}
|
||||
logger.debug("缓存中计划批次统计数据量 => " + batchKeySet.size());
|
||||
batchKeySet.forEach(key -> {
|
||||
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
|
||||
if (CollUtil.isEmpty(entries)) {
|
||||
logger.info("缓存[" + key + "]中无统计数据");
|
||||
// 没有统计数据,删掉key
|
||||
redisTemplate.delete(key);
|
||||
try {
|
||||
logger.debug("同步缓存中计划批次统计数据----start----");
|
||||
// 1. 获取缓存中所有的批次统计数据
|
||||
String patternKey = RedisConstant.CLUSTER_KEY_PREFIX + "*" + RedisConstant.CLUSTER_KEY_SUFFIX +
|
||||
RedisConstant.BATCH_SCRIPT_SUM_KEY + "*";
|
||||
Set<String> batchKeySet = redisTemplate.keys(patternKey);
|
||||
if (CollUtil.isEmpty(batchKeySet)) {
|
||||
logger.debug("不存在批次缓存统计数据----end----");
|
||||
return;
|
||||
}
|
||||
String batchId = key.substring(key.lastIndexOf(":") + 1);
|
||||
String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) +
|
||||
RedisConstant.CLUSTER_KEY_SUFFIX;
|
||||
// 2. 遍历判断该批次是否已完成
|
||||
int waitTotal = 0;
|
||||
Object waitStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_WAIT_EXECUTE_STATUS);
|
||||
if (ObjectUtil.isNotNull(waitStatusObj)) {
|
||||
waitTotal = Integer.parseInt(waitStatusObj.toString());
|
||||
}
|
||||
int runningTotal = 0;
|
||||
Object runningStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_START_EXECUTE_STATUS);
|
||||
if (ObjectUtil.isNotNull(runningStatusObj)) {
|
||||
runningTotal = Integer.parseInt(runningStatusObj.toString());
|
||||
}
|
||||
// TODO: 判断有没有数据是否合法、如果存在问题,重新获取数据库进行统计刷新
|
||||
if (waitTotal < 0 || runningTotal < 0) { // 重新统计
|
||||
logger.debug("缓存中计划批次统计数据量 => " + batchKeySet.size());
|
||||
batchKeySet.forEach(key -> {
|
||||
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
|
||||
if (CollUtil.isEmpty(entries)) {
|
||||
logger.info("缓存[" + key + "]中无统计数据");
|
||||
// 没有统计数据,删掉key
|
||||
redisTemplate.delete(key);
|
||||
return;
|
||||
}
|
||||
String batchId = key.substring(key.lastIndexOf(":") + 1);
|
||||
String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) +
|
||||
RedisConstant.CLUSTER_KEY_SUFFIX;
|
||||
|
||||
// 通过数据库进行统计,减少统计数据出错的原因
|
||||
AtuPlanTask taskQ = new AtuPlanTask();
|
||||
taskQ.setBatchId(batchId);
|
||||
List<ScriptStatusStatistic> resultList = planTaskService.countTaskStatus(taskQ);
|
||||
List<String> batchStatusList = new ArrayList<>();
|
||||
for (ScriptStatusStatistic result : resultList) {
|
||||
Set<String> statusSet = result.getStatusSet();
|
||||
if (!statusSet.isEmpty()) {
|
||||
// 状态集合不为空,根据状态设置脚本的结果状态
|
||||
result.setStatus(scriptStatus(statusSet)); // 根据脚本下的所有任务确定脚本的执行结果
|
||||
batchStatusList.add(result.getStatus());
|
||||
}
|
||||
int allScriptCount = batchStatusList.size(); // 总脚本执行数
|
||||
// 等待中
|
||||
int waitCount = Collections.frequency(batchStatusList, PlanConstant.TASK_WAIT_EXECUTE_STATUS);
|
||||
// 执行中
|
||||
int ingCount = Collections.frequency(batchStatusList, PlanConstant.TASK_START_EXECUTE_STATUS);
|
||||
// 成功
|
||||
int successCount = Collections.frequency(batchStatusList, PlanConstant.TASK_EXECUTE_SUCCESS_STATUS);
|
||||
// 执行失败
|
||||
int executeFailCount = Collections.frequency(batchStatusList, PlanConstant.TASK_EXECUTE_FAIL_STATUS);
|
||||
// 断言失败
|
||||
int assertFailCount = Collections.frequency(batchStatusList, PlanConstant.TASK_ASSERT_FAIL_STATUS);
|
||||
// 取消的
|
||||
int cancelCount = Collections.frequency(batchStatusList, PlanConstant.TASK_CANCEL_STATUS);
|
||||
// 超时的
|
||||
int timeoutCount = Collections.frequency(batchStatusList, PlanConstant.TASK_TIMEOUT_STATUS);
|
||||
|
||||
} else {
|
||||
// todo: 如果是空,如何处理
|
||||
if (waitCount > 0 || ingCount > 0) { // 批次还没有完成
|
||||
// 检查批次的开始时间
|
||||
AtuPlanBatch batch = this.planBatchService.findByPrimaryKey(batchId);
|
||||
if (batch != null) {
|
||||
Date createdTime = batch.getCreatedTime();
|
||||
if (createdTime != null) { // 有创建时间,那么比较开始时间与现在的时间差
|
||||
LocalDateTime createDateTime = createdTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
Duration duration = Duration.between(createDateTime, now);
|
||||
long hours = duration.toHours();
|
||||
if (atuPlanConfig.isBatchExecuteTimeoutEnabled() && hours > atuPlanConfig.getBatchExecuteTimeout().longValue()) { // 参数化
|
||||
logger.info("批次[{}-{}]已创建{}小时,任务没有执行完成,强制结束批次", batch.getId(), batch.getBatch(), 48);
|
||||
// 如果批次的创建时间离现在已经过去48小时还没完成,那么判定批次已完成,等待中|执行中 的任务全部算超时处理
|
||||
timeoutCount += waitCount;
|
||||
timeoutCount += ingCount;
|
||||
waitCount = 0;
|
||||
ingCount = 0;
|
||||
// 更新批次下 等待中(0)|执行中(1) 的任务全部算超时处理(6)
|
||||
this.planTaskService.batchUpdateTaskStatusToTimeout(batch.getId(), PlanConstant.TASK_TIMEOUT_STATUS, PlanConstant.TASK_WAIT_EXECUTE_STATUS, PlanConstant.TASK_START_EXECUTE_STATUS);
|
||||
} else {
|
||||
// 批次还没有完成,退出统计
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// 有批次信息,但是没有创建时间,应该是数据有问题
|
||||
logger.debug("批次[{}]没有创建时间", batch.getId());
|
||||
return;
|
||||
}
|
||||
} else { // 没有对应批次数据,删除缓存数据
|
||||
logger.debug("没有对应的批次信息[{}],删除缓存数据", batchId);
|
||||
redisTemplate.delete(key);
|
||||
return; // 退出这个批次统计
|
||||
}
|
||||
}
|
||||
}
|
||||
// 失败的数据
|
||||
// 脚本超时的
|
||||
if (waitTotal != 0 || runningTotal != 0) {
|
||||
// 该批次还未完成
|
||||
return;
|
||||
}
|
||||
// 2.1. 完成则更新数据库中的批次统计数据
|
||||
int total = 0;
|
||||
Object scriptTotalObj = entries.get(clusterKeyPrefix + PlanConstant.SCRIPT_TOTAL);
|
||||
if (ObjectUtil.isNotNull(scriptTotalObj)) {
|
||||
total = Integer.parseInt(scriptTotalObj.toString());
|
||||
}
|
||||
int successTotal = 0;
|
||||
Object successStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_EXECUTE_SUCCESS_STATUS);
|
||||
if (ObjectUtil.isNotNull(successStatusObj)) {
|
||||
successTotal = Integer.parseInt(successStatusObj.toString());
|
||||
}
|
||||
int failTotal = 0;
|
||||
Object failStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_EXECUTE_FAIL_STATUS);
|
||||
if (ObjectUtil.isNotNull(failStatusObj)) {
|
||||
failTotal = Integer.parseInt(failStatusObj.toString());
|
||||
if (failTotal < 0) {
|
||||
failTotal = 0;
|
||||
// 批次已完成
|
||||
|
||||
// 更新批次表
|
||||
AtuPlanBatch planBatch = new AtuPlanBatch();
|
||||
planBatch.setId(batchId);
|
||||
planBatch.setSuccessNum(successCount);
|
||||
planBatch.setExecFailNum(executeFailCount);
|
||||
planBatch.setAssertFailNum(assertFailCount);
|
||||
planBatch.setTimeoutNum(timeoutCount);
|
||||
planBatch.setCancelNum(cancelCount);
|
||||
|
||||
planBatch.setSuccessRate(MsgConstant.PLAN_BATCH_SUCCESS_RATE);
|
||||
if (planBatch.getSuccessNum() > 0 && allScriptCount > 0) {
|
||||
planBatch.setSuccessRate(NumberUtil.decimalFormat(MsgConstant.PATTERN_HASH_DOT_HASH_HASH, NumberUtil.div(planBatch.getSuccessNum(), allScriptCount)));
|
||||
}
|
||||
}
|
||||
int assertFailTotal = 0;
|
||||
Object assertFailStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_ASSERT_FAIL_STATUS);
|
||||
if (ObjectUtil.isNotNull(assertFailStatusObj)) {
|
||||
assertFailTotal = Integer.parseInt(assertFailStatusObj.toString());
|
||||
}
|
||||
int timeoutTotal = 0;
|
||||
Object timeoutStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_TIMEOUT_STATUS);
|
||||
if (ObjectUtil.isNotNull(timeoutStatusObj)) {
|
||||
timeoutTotal = Integer.parseInt(timeoutStatusObj.toString());
|
||||
}
|
||||
int cancelTotal = 0;
|
||||
Object cancelStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_CANCEL_STATUS);
|
||||
if (ObjectUtil.isNotNull(cancelStatusObj)) {
|
||||
cancelTotal = Integer.parseInt(cancelStatusObj.toString());
|
||||
}
|
||||
// TODO: 如果等待中和执行中都没有,都为0,统计库表数据进行更新,而不是通过缓存
|
||||
// 更新批次表
|
||||
AtuPlanBatch planBatch = new AtuPlanBatch();
|
||||
planBatch.setId(batchId);
|
||||
planBatch.setSuccessNum(successTotal);
|
||||
planBatch.setExecFailNum(failTotal);
|
||||
planBatch.setAssertFailNum(assertFailTotal);
|
||||
planBatch.setTimeoutNum(timeoutTotal);
|
||||
planBatch.setCancelNum(cancelTotal);
|
||||
|
||||
planBatch.setSuccessRate(MsgConstant.PLAN_BATCH_SUCCESS_RATE);
|
||||
if (successTotal > 0 && total > 0) {
|
||||
planBatch.setSuccessRate(NumberUtil.decimalFormat(MsgConstant.PATTERN_HASH_DOT_HASH_HASH, NumberUtil.div(successTotal, total)));
|
||||
}
|
||||
// 查询批次最先开始执行的任务的开始时间
|
||||
if (planBatch.getStartTime() == null) {
|
||||
Long firstTaskStartTime = planTaskService.queryBatchFirstTaskStartTime(batchId);
|
||||
if (ObjectUtil.isNotNull(firstTaskStartTime)) {
|
||||
planBatch.setStartTime(new Date(firstTaskStartTime));
|
||||
// 查询批次最先开始执行的任务的开始时间
|
||||
if (planBatch.getStartTime() == null) {
|
||||
Long firstTaskStartTime = planTaskService.queryBatchFirstTaskStartTime(batchId);
|
||||
if (ObjectUtil.isNotNull(firstTaskStartTime)) {
|
||||
planBatch.setStartTime(new Date(firstTaskStartTime));
|
||||
}
|
||||
}
|
||||
// 任务状态判断
|
||||
String planStatus = "";
|
||||
if (planBatch.getCancelNum() > 0) {
|
||||
planStatus = PlanConstant.PLAN_CANCEL_STATUS;
|
||||
// 修改为取消状态
|
||||
planBatch.setStatus(PlanConstant.BATCH_CANCEL_STATUS);
|
||||
} else {
|
||||
planStatus = PlanConstant.PLAN_FINISH_STATUS;
|
||||
// 修改为已完成状态
|
||||
planBatch.setStatus(PlanConstant.BATCH_FINISH_STATUS);
|
||||
}
|
||||
}
|
||||
// 任务状态判断
|
||||
String planStatus = "";
|
||||
if (cancelTotal > 0) {
|
||||
planStatus = PlanConstant.PLAN_CANCEL_STATUS;
|
||||
// 修改为取消状态
|
||||
planBatch.setStatus(PlanConstant.BATCH_CANCEL_STATUS);
|
||||
} else {
|
||||
planStatus = PlanConstant.PLAN_FINISH_STATUS;
|
||||
// 修改为已完成状态
|
||||
planBatch.setStatus(PlanConstant.BATCH_FINISH_STATUS);
|
||||
}
|
||||
|
||||
// 查询批次最后执行完成的任务的结束时间
|
||||
Long lastTaskEndTime = planTaskService.queryBatchLastTaskEndTime(batchId);
|
||||
if (ObjectUtil.isNotNull(lastTaskEndTime)) {
|
||||
planBatch.setEndTime(new Date(lastTaskEndTime));
|
||||
}
|
||||
logger.debug("更新批次信息");
|
||||
planBatchService.updateByPrimaryKey(planBatch);
|
||||
// 查询批次最后执行完成的任务的结束时间
|
||||
Long lastTaskEndTime = planTaskService.queryBatchLastTaskEndTime(batchId);
|
||||
if (ObjectUtil.isNotNull(lastTaskEndTime)) {
|
||||
planBatch.setEndTime(new Date(lastTaskEndTime));
|
||||
}
|
||||
logger.debug("更新批次信息");
|
||||
planBatchService.updateByPrimaryKey(planBatch);
|
||||
|
||||
// 删除任务队列
|
||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_PC_KEY + batchId);
|
||||
// 移动端删除各个平台对于的队列
|
||||
MobilePlatformEnum[] platformEnums = MobilePlatformEnum.values();
|
||||
for (MobilePlatformEnum platformEnum : platformEnums) {
|
||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_MOB_KEY + platformEnum.getName() + "." + batchId);
|
||||
}
|
||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_API_KEY + batchId);
|
||||
// 删除任务队列
|
||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_PC_KEY + batchId);
|
||||
// 移动端删除各个平台对于的队列
|
||||
MobilePlatformEnum[] platformEnums = MobilePlatformEnum.values();
|
||||
for (MobilePlatformEnum platformEnum : platformEnums) {
|
||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_MOB_KEY + platformEnum.getName() + "." + batchId);
|
||||
}
|
||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_API_KEY + batchId);
|
||||
|
||||
//获取更新后的批次信息
|
||||
AtuPlanBatchDetailDto atuPlanBatchDetailDto = planBatchService.queryBatchDetailById(planBatch.getId());
|
||||
//获取更新后的批次信息
|
||||
AtuPlanBatchDetailDto atuPlanBatchDetailDto = planBatchService.queryBatchDetailById(planBatch.getId());
|
||||
|
||||
// 判断是否计划最后一批次
|
||||
AtuPlanInfo planInfo = planInfoService.queryByLastBatchId(batchId);
|
||||
if (ObjectUtil.isNotNull(planInfo)) {
|
||||
// 是则更新计划表
|
||||
planInfo.setStatus(planStatus);
|
||||
planInfo.setWaitingNum(waitTotal);
|
||||
planInfo.setRunningNum(runningTotal);
|
||||
planInfo.setSuccessNum(successTotal);
|
||||
planInfo.setExecFailNum(failTotal);
|
||||
planInfo.setAssertFailNum(assertFailTotal);
|
||||
planInfo.setTimeoutNum(timeoutTotal);
|
||||
planInfo.setCancelNum(cancelTotal);
|
||||
//不用代码生成的方法
|
||||
planInfoService.updatePlanResultById(planInfo);
|
||||
}
|
||||
// 判断是否计划最后一批次
|
||||
AtuPlanInfo planInfo = planInfoService.queryByLastBatchId(batchId);
|
||||
if (ObjectUtil.isNotNull(planInfo)) {
|
||||
// 是则更新计划表
|
||||
planInfo.setStatus(planStatus);
|
||||
planInfo.setWaitingNum(waitCount);
|
||||
planInfo.setRunningNum(ingCount);
|
||||
planInfo.setSuccessNum(planBatch.getSuccessNum());
|
||||
planInfo.setExecFailNum(planBatch.getExecFailNum());
|
||||
planInfo.setAssertFailNum(planBatch.getAssertFailNum());
|
||||
planInfo.setTimeoutNum(planBatch.getTimeoutNum());
|
||||
planInfo.setCancelNum(planBatch.getCancelNum());
|
||||
//不用代码生成的方法
|
||||
planInfoService.updatePlanResultById(planInfo);
|
||||
}
|
||||
|
||||
executor.execute(() -> {
|
||||
// 保存设备性能数据
|
||||
saveDevicePerData(atuPlanBatchDetailDto);
|
||||
executor.execute(() -> {
|
||||
// 保存设备性能数据
|
||||
saveDevicePerData(atuPlanBatchDetailDto);
|
||||
});
|
||||
|
||||
// 2.2. 删除缓存中已完成批次的记录
|
||||
logger.debug("删除批次统计缓存 => " + key);
|
||||
redisTemplate.delete(key);
|
||||
|
||||
// 收尾处理
|
||||
AtuPlanInfo batchPlanInfo = planInfoService.findByBatchId(batchId);
|
||||
if (batchPlanInfo != null) {
|
||||
handleEnd(batchPlanInfo, atuPlanBatchDetailDto);
|
||||
}
|
||||
});
|
||||
|
||||
// 2.2. 删除缓存中已完成批次的记录
|
||||
logger.debug("删除批次统计缓存 => " + key);
|
||||
redisTemplate.delete(key);
|
||||
|
||||
// 收尾处理
|
||||
AtuPlanInfo batchPlanInfo = planInfoService.findByBatchId(batchId);
|
||||
if (batchPlanInfo != null) {
|
||||
handleEnd(batchPlanInfo, atuPlanBatchDetailDto);
|
||||
}
|
||||
});
|
||||
logger.debug("同步缓存中计划批次统计数据----end----");
|
||||
logger.debug("同步缓存中计划批次统计数据----end----");
|
||||
} catch (Exception e) {
|
||||
logger.error("批次统计数据执行发生异常", e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -728,8 +750,36 @@ public class PlanBatchTaskDataUpdateJob {
|
|||
planScriptLinkService.insertByBatch(scriptLinkList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 用于确定脚本的执行结果状态
|
||||
* @param statusSet 脚本下的所有任务状态
|
||||
* @return
|
||||
*/
|
||||
private String scriptStatus(Set<String> statusSet) {
|
||||
if (statusSet == null || statusSet.isEmpty()) {
|
||||
String status = PlanConstant.TASK_EXECUTE_SUCCESS_STATUS; // 默认成功
|
||||
if (statusSet == null || statusSet.isEmpty()) { // 如果状态为空,那么脚本成功
|
||||
return status;
|
||||
}
|
||||
boolean flag = statusSet.remove(PlanConstant.TASK_WAIT_EXECUTE_STATUS);
|
||||
if (flag) { // 如果存在等待(0)
|
||||
if (statusSet.isEmpty()) { // 全都是等待(0)
|
||||
status = PlanConstant.TASK_WAIT_EXECUTE_STATUS;
|
||||
} else { // 还有其他状态
|
||||
status = PlanConstant.TASK_START_EXECUTE_STATUS;
|
||||
}
|
||||
} else { // 不存在等待(0)
|
||||
if (statusSet.remove(PlanConstant.TASK_START_EXECUTE_STATUS)) { // 存在执行中(1)
|
||||
status = PlanConstant.TASK_START_EXECUTE_STATUS;
|
||||
} else if (statusSet.remove(PlanConstant.TASK_CANCEL_STATUS)) { // 存在取消(5)
|
||||
status = PlanConstant.TASK_CANCEL_STATUS;
|
||||
} else if (statusSet.remove(PlanConstant.TASK_TIMEOUT_STATUS)) { // 存在超时(6)
|
||||
status = PlanConstant.TASK_TIMEOUT_STATUS;
|
||||
} else if (statusSet.remove(PlanConstant.TASK_ASSERT_FAIL_STATUS)) { // 存在断言失败(4)
|
||||
status = PlanConstant.TASK_ASSERT_FAIL_STATUS;
|
||||
} else if (statusSet.remove(PlanConstant.TASK_EXECUTE_FAIL_STATUS)) { // 存在执行失败(3)
|
||||
status = PlanConstant.TASK_EXECUTE_FAIL_STATUS;
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -386,4 +386,15 @@
|
|||
batch_id = #{task.batchId,jdbcType=VARCHAR}
|
||||
GROUP BY script_id
|
||||
</select>
|
||||
|
||||
<update id="batchUpdateTaskStatusToTimeout" >
|
||||
UPDATE
|
||||
<include refid="Table_Name" />
|
||||
SET
|
||||
`status` = #{timeoutStatus,jdbcType=VARCHAR}
|
||||
WHERE
|
||||
batch_id = #{batchId,jdbcType=VARCHAR}
|
||||
AND
|
||||
(`status` = #{waitStatus, jdbcType=VARCHAR} or `status` = #{timeoutStatus, jdbcType=VARCHAR})
|
||||
</update>
|
||||
</mapper>
|
Loading…
Reference in New Issue