Compare commits
No commits in common. "5d21b6090ad4ec6df69495459ee44539be8f1b60" and "2a5ed5f6fb164890959790f28ec98762a0190475" have entirely different histories.
5d21b6090a
...
2a5ed5f6fb
|
@ -21,37 +21,4 @@ public class AtuPlanConfig {
|
||||||
}
|
}
|
||||||
this.waitTimeout = waitTimeout;
|
this.waitTimeout = waitTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 是否启用执行超时,默认启用
|
|
||||||
* atu.plan.batchExecuteTimeoutEnabled
|
|
||||||
*/
|
|
||||||
private Boolean batchExecuteTimeoutEnabled;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批次执行超时时间,默认48小时
|
|
||||||
* atu.plan.batchExecuteTimeout
|
|
||||||
*/
|
|
||||||
private Integer batchExecuteTimeout;
|
|
||||||
|
|
||||||
|
|
||||||
public boolean isBatchExecuteTimeoutEnabled() {
|
|
||||||
return batchExecuteTimeoutEnabled == null ? true : batchExecuteTimeoutEnabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBatchExecuteTimeoutEnabled(boolean batchExecuteTimeoutEnabled) {
|
|
||||||
this.batchExecuteTimeoutEnabled = batchExecuteTimeoutEnabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getBatchExecuteTimeout() {
|
|
||||||
if (isBatchExecuteTimeoutEnabled()) {
|
|
||||||
// 如果是启用了批次执行超时,
|
|
||||||
return batchExecuteTimeout == null ? 48 : batchExecuteTimeout;
|
|
||||||
}
|
|
||||||
return 48;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBatchExecuteTimeout(Integer batchExecuteTimeout) {
|
|
||||||
this.batchExecuteTimeout = batchExecuteTimeout;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,9 +4,13 @@
|
||||||
*/
|
*/
|
||||||
package net.northking.cctp.executePlan.db.dao;
|
package net.northking.cctp.executePlan.db.dao;
|
||||||
|
|
||||||
|
import net.northking.cctp.common.http.QueryByPage;
|
||||||
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
||||||
import net.northking.cctp.executePlan.db.mapper.AtuPlanTaskMapper;
|
import net.northking.cctp.executePlan.db.mapper.AtuPlanTaskMapper;
|
||||||
import net.northking.cctp.executePlan.dto.planTask.*;
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskExtendDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskPageDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskQueryDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuTaskSendBugDto;
|
||||||
import net.northking.cctp.executePlan.pub.dto.ScriptFirstExecutionDTO;
|
import net.northking.cctp.executePlan.pub.dto.ScriptFirstExecutionDTO;
|
||||||
import org.apache.ibatis.annotations.Param;
|
import org.apache.ibatis.annotations.Param;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
|
@ -128,17 +132,4 @@ public interface AtuPlanTaskDao extends AtuPlanTaskMapper
|
||||||
AtuPlanTaskExtendDto queryTaskExtendById(String id);
|
AtuPlanTaskExtendDto queryTaskExtendById(String id);
|
||||||
|
|
||||||
List<AtuPlanTask> queryWaitTimeoutTask(@Param("waitTimeout") Integer waitTimeout);
|
List<AtuPlanTask> queryWaitTimeoutTask(@Param("waitTimeout") Integer waitTimeout);
|
||||||
|
|
||||||
List<ScriptStatusStatistic> countTaskStatus(AtuPlanTask task);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 将批次下的所有等待中和执行中的任务改成超时
|
|
||||||
*
|
|
||||||
* @param batchId 批次id
|
|
||||||
* @param timeoutStatus 超时状态码
|
|
||||||
* @param waitStatus 等待中状态码
|
|
||||||
* @param startStatus 执行中状态码
|
|
||||||
*/
|
|
||||||
long batchUpdateTaskStatusToTimeout(@Param(value = "batchId")String batchId, @Param(value = "timeoutStatus")String timeoutStatus,
|
|
||||||
@Param(value = "waitStatus")String waitStatus, @Param(value = "startStatus")String startStatus);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,19 +4,18 @@
|
||||||
*/
|
*/
|
||||||
package net.northking.cctp.executePlan.db.impl;
|
package net.northking.cctp.executePlan.db.impl;
|
||||||
|
|
||||||
import cn.hutool.core.util.StrUtil;
|
|
||||||
import net.northking.cctp.common.db.BasicDao;
|
import net.northking.cctp.common.db.BasicDao;
|
||||||
import net.northking.cctp.common.db.PaginationService;
|
import net.northking.cctp.common.db.PaginationService;
|
||||||
import net.northking.cctp.common.exception.PlatformRuntimeException;
|
|
||||||
import net.northking.cctp.common.http.QueryByPage;
|
import net.northking.cctp.common.http.QueryByPage;
|
||||||
import net.northking.cctp.executePlan.db.dao.AtuPlanTaskDao;
|
import net.northking.cctp.executePlan.db.dao.AtuPlanTaskDao;
|
||||||
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
||||||
import net.northking.cctp.executePlan.db.service.AtuPlanTaskService;
|
import net.northking.cctp.executePlan.db.service.AtuPlanTaskService;
|
||||||
import net.northking.cctp.executePlan.dto.planTask.*;
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskExtendDto;
|
||||||
import net.northking.cctp.executePlan.exception.ExecPlanError;
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskPageDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskQueryDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuTaskSendBugDto;
|
||||||
import net.northking.cctp.executePlan.pub.dto.ScriptFirstExecutionDTO;
|
import net.northking.cctp.executePlan.pub.dto.ScriptFirstExecutionDTO;
|
||||||
import org.apache.commons.collections4.CollectionUtils;
|
import org.apache.commons.collections4.CollectionUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
|
@ -184,36 +183,6 @@ private static final Logger logger = LoggerFactory.getLogger(AtuPlanTaskServiceI
|
||||||
return atuPlanTaskDao.queryWaitTimeoutTask(waitTimeout);
|
return atuPlanTaskDao.queryWaitTimeoutTask(waitTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 统计指定批次下分组查询脚本下任务的执行状态结果
|
|
||||||
*
|
|
||||||
* @param task
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public List<ScriptStatusStatistic> countTaskStatus(AtuPlanTask task) {
|
|
||||||
if (task == null || StringUtils.isBlank(task.getBatchId())) {
|
|
||||||
throw new PlatformRuntimeException(ExecPlanError.EMPTY_BATCH_ID);
|
|
||||||
}
|
|
||||||
return this.atuPlanTaskDao.countTaskStatus(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 将批次下的所有等待中和执行中的任务改成超时
|
|
||||||
*
|
|
||||||
* @param batchId 批次id
|
|
||||||
* @param timeoutStatus 超时状态码
|
|
||||||
* @param waitStatus 等待中状态码
|
|
||||||
* @param startStatus 执行中状态码
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void batchUpdateTaskStatusToTimeout(String batchId, String timeoutStatus, String waitStatus, String startStatus) {
|
|
||||||
if (StringUtils.isBlank(batchId)) {
|
|
||||||
throw new PlatformRuntimeException(ExecPlanError.EMPTY_BATCH_ID);
|
|
||||||
}
|
|
||||||
this.atuPlanTaskDao.batchUpdateTaskStatusToTimeout(batchId, timeoutStatus, waitStatus, startStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- The End by Generator ----//
|
// ---- The End by Generator ----//
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,10 @@ package net.northking.cctp.executePlan.db.service;
|
||||||
import net.northking.cctp.common.db.BasicService;
|
import net.northking.cctp.common.db.BasicService;
|
||||||
import net.northking.cctp.common.http.QueryByPage;
|
import net.northking.cctp.common.http.QueryByPage;
|
||||||
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
import net.northking.cctp.executePlan.db.entity.AtuPlanTask;
|
||||||
import net.northking.cctp.executePlan.dto.planTask.*;
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskExtendDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskPageDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuPlanTaskQueryDto;
|
||||||
|
import net.northking.cctp.executePlan.dto.planTask.AtuTaskSendBugDto;
|
||||||
import net.northking.cctp.executePlan.pub.dto.ScriptFirstExecutionDTO;
|
import net.northking.cctp.executePlan.pub.dto.ScriptFirstExecutionDTO;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
|
@ -133,20 +136,4 @@ public interface AtuPlanTaskService extends BasicService<AtuPlanTask>
|
||||||
AtuPlanTaskExtendDto queryTaskExtendById(String taskId);
|
AtuPlanTaskExtendDto queryTaskExtendById(String taskId);
|
||||||
|
|
||||||
List<AtuPlanTask> queryWaitTimeoutTask(Integer waitTimeout);
|
List<AtuPlanTask> queryWaitTimeoutTask(Integer waitTimeout);
|
||||||
|
|
||||||
/**
|
|
||||||
* 统计指定批次下分组查询脚本下任务的执行状态结果
|
|
||||||
* @param task
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
List<ScriptStatusStatistic> countTaskStatus(AtuPlanTask task);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 将批次下的所有等待中和执行中的任务改成超时
|
|
||||||
* @param batchId 批次id
|
|
||||||
* @param timeoutStatus 超时状态码
|
|
||||||
* @param waitStatus 等待中状态码
|
|
||||||
* @param startStatus 执行中状态码
|
|
||||||
*/
|
|
||||||
void batchUpdateTaskStatusToTimeout(String batchId, String timeoutStatus, String waitStatus, String startStatus);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,8 +85,6 @@ public enum ExecPlanError implements PlatformError {
|
||||||
|
|
||||||
PARSE_DATE_ERROR("格式化日期出错,请检查参数"),
|
PARSE_DATE_ERROR("格式化日期出错,请检查参数"),
|
||||||
|
|
||||||
EMPTY_BATCH_ID("批次id不能为空"),
|
|
||||||
|
|
||||||
GET_FILE_FAIL("获取文件失败"),
|
GET_FILE_FAIL("获取文件失败"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,7 +16,6 @@ import net.northking.cctp.executePlan.api.service.AtuPlanInfoApiService;
|
||||||
import net.northking.cctp.executePlan.api.service.AtuPlanTaskApiService;
|
import net.northking.cctp.executePlan.api.service.AtuPlanTaskApiService;
|
||||||
import net.northking.cctp.executePlan.api.service.MessageCenterService;
|
import net.northking.cctp.executePlan.api.service.MessageCenterService;
|
||||||
import net.northking.cctp.executePlan.api.third.feilang.service.FeiLangService;
|
import net.northking.cctp.executePlan.api.third.feilang.service.FeiLangService;
|
||||||
import net.northking.cctp.executePlan.config.AtuPlanConfig;
|
|
||||||
import net.northking.cctp.executePlan.constants.MsgConstant;
|
import net.northking.cctp.executePlan.constants.MsgConstant;
|
||||||
import net.northking.cctp.executePlan.constants.PlanConstant;
|
import net.northking.cctp.executePlan.constants.PlanConstant;
|
||||||
import net.northking.cctp.executePlan.constants.RabbitConstant;
|
import net.northking.cctp.executePlan.constants.RabbitConstant;
|
||||||
|
@ -25,7 +24,6 @@ import net.northking.cctp.executePlan.db.entity.*;
|
||||||
import net.northking.cctp.executePlan.db.service.*;
|
import net.northking.cctp.executePlan.db.service.*;
|
||||||
import net.northking.cctp.executePlan.dto.planBatch.*;
|
import net.northking.cctp.executePlan.dto.planBatch.*;
|
||||||
import net.northking.cctp.executePlan.dto.planInfo.AtuPlanRunDto;
|
import net.northking.cctp.executePlan.dto.planInfo.AtuPlanRunDto;
|
||||||
import net.northking.cctp.executePlan.dto.planTask.ScriptStatusStatistic;
|
|
||||||
import net.northking.cctp.executePlan.enums.MobilePlatformEnum;
|
import net.northking.cctp.executePlan.enums.MobilePlatformEnum;
|
||||||
import net.northking.cctp.executePlan.feign.PublicFeignClient;
|
import net.northking.cctp.executePlan.feign.PublicFeignClient;
|
||||||
import net.northking.cctp.executePlan.utils.MinioPathUtils;
|
import net.northking.cctp.executePlan.utils.MinioPathUtils;
|
||||||
|
@ -37,20 +35,14 @@ import org.springframework.amqp.core.AmqpAdmin;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.integration.redis.util.RedisLockRegistry;
|
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -116,14 +108,6 @@ public class PlanBatchTaskDataUpdateJob {
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedissonClient redisson;
|
private RedissonClient redisson;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RedisLockRegistry redisLockRegistry;
|
|
||||||
|
|
||||||
private final static String LOCK_KEY_BATCH_SUM = "LOCK:PLAN:BATCH-SUM-DATA-UPDATE-METHOD";
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private AtuPlanConfig atuPlanConfig;
|
|
||||||
|
|
||||||
public static final int EXECUTOR_CORE_POOL_SIZE = 2 * 2 + 2;
|
public static final int EXECUTOR_CORE_POOL_SIZE = 2 * 2 + 2;
|
||||||
public static final int EXECUTOR_MAX_POOL_SIZE = 10;
|
public static final int EXECUTOR_MAX_POOL_SIZE = 10;
|
||||||
public static final int EXECUTOR_LINKED_BLOCKING_SIZE = 50;
|
public static final int EXECUTOR_LINKED_BLOCKING_SIZE = 50;
|
||||||
|
@ -136,185 +120,162 @@ public class PlanBatchTaskDataUpdateJob {
|
||||||
|
|
||||||
@Scheduled(fixedRateString = "${atu.plan.batchCountUpdateJob:20000}")
|
@Scheduled(fixedRateString = "${atu.plan.batchCountUpdateJob:20000}")
|
||||||
private void batchSumDataUpdate() {
|
private void batchSumDataUpdate() {
|
||||||
// 统计加锁,多节点只需要一个节点启动
|
logger.debug("同步缓存中计划批次统计数据----start----");
|
||||||
Lock lock = redisLockRegistry.obtain(LOCK_KEY_BATCH_SUM);
|
// 1. 获取缓存中所有的批次统计数据
|
||||||
if (!lock.tryLock()) { // 没有拿到锁退出方法,有别的节点获取到锁进行统计
|
String patternKey = RedisConstant.CLUSTER_KEY_PREFIX + "*" + RedisConstant.CLUSTER_KEY_SUFFIX +
|
||||||
|
RedisConstant.BATCH_SCRIPT_SUM_KEY + "*";
|
||||||
|
Set<String> batchKeySet = redisTemplate.keys(patternKey);
|
||||||
|
if (CollUtil.isEmpty(batchKeySet)) {
|
||||||
|
logger.debug("不存在批次缓存统计数据----end----");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
logger.debug("缓存中计划批次统计数据量 => " + batchKeySet.size());
|
||||||
logger.debug("同步缓存中计划批次统计数据----start----");
|
batchKeySet.forEach(key -> {
|
||||||
// 1. 获取缓存中所有的批次统计数据
|
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
|
||||||
String patternKey = RedisConstant.CLUSTER_KEY_PREFIX + "*" + RedisConstant.CLUSTER_KEY_SUFFIX +
|
if (CollUtil.isEmpty(entries)) {
|
||||||
RedisConstant.BATCH_SCRIPT_SUM_KEY + "*";
|
logger.info("缓存[" + key + "]中无统计数据");
|
||||||
Set<String> batchKeySet = redisTemplate.keys(patternKey);
|
|
||||||
if (CollUtil.isEmpty(batchKeySet)) {
|
|
||||||
logger.debug("不存在批次缓存统计数据----end----");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
logger.debug("缓存中计划批次统计数据量 => " + batchKeySet.size());
|
String batchId = key.substring(key.lastIndexOf(":") + 1);
|
||||||
batchKeySet.forEach(key -> {
|
String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) +
|
||||||
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
|
RedisConstant.CLUSTER_KEY_SUFFIX;
|
||||||
if (CollUtil.isEmpty(entries)) {
|
// 2. 遍历判断该批次是否已完成
|
||||||
logger.info("缓存[" + key + "]中无统计数据");
|
int waitTotal = 0;
|
||||||
// 没有统计数据,删掉key
|
Object waitStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_WAIT_EXECUTE_STATUS);
|
||||||
redisTemplate.delete(key);
|
if (ObjectUtil.isNotNull(waitStatusObj)) {
|
||||||
return;
|
waitTotal = Integer.parseInt(waitStatusObj.toString());
|
||||||
|
}
|
||||||
|
int runningTotal = 0;
|
||||||
|
Object runningStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_START_EXECUTE_STATUS);
|
||||||
|
if (ObjectUtil.isNotNull(runningStatusObj)) {
|
||||||
|
runningTotal = Integer.parseInt(runningStatusObj.toString());
|
||||||
|
}
|
||||||
|
// TODO: 判断有没有负数、总数是否相等
|
||||||
|
// 失败的数据
|
||||||
|
// 脚本超时的
|
||||||
|
if (waitTotal != 0 || runningTotal != 0) {
|
||||||
|
// 该批次还未完成
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 2.1. 完成则更新数据库中的批次统计数据
|
||||||
|
int total = 0;
|
||||||
|
Object scriptTotalObj = entries.get(clusterKeyPrefix + PlanConstant.SCRIPT_TOTAL);
|
||||||
|
if (ObjectUtil.isNotNull(scriptTotalObj)) {
|
||||||
|
total = Integer.parseInt(scriptTotalObj.toString());
|
||||||
|
}
|
||||||
|
int successTotal = 0;
|
||||||
|
Object successStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_EXECUTE_SUCCESS_STATUS);
|
||||||
|
if (ObjectUtil.isNotNull(successStatusObj)) {
|
||||||
|
successTotal = Integer.parseInt(successStatusObj.toString());
|
||||||
|
}
|
||||||
|
int failTotal = 0;
|
||||||
|
Object failStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_EXECUTE_FAIL_STATUS);
|
||||||
|
if (ObjectUtil.isNotNull(failStatusObj)) {
|
||||||
|
failTotal = Integer.parseInt(failStatusObj.toString());
|
||||||
|
if (failTotal < 0) {
|
||||||
|
failTotal = 0;
|
||||||
}
|
}
|
||||||
String batchId = key.substring(key.lastIndexOf(":") + 1);
|
}
|
||||||
String clusterKeyPrefix = RedisConstant.CLUSTER_KEY_PREFIX + batchId.substring(0, 4) +
|
int assertFailTotal = 0;
|
||||||
RedisConstant.CLUSTER_KEY_SUFFIX;
|
Object assertFailStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_ASSERT_FAIL_STATUS);
|
||||||
|
if (ObjectUtil.isNotNull(assertFailStatusObj)) {
|
||||||
|
assertFailTotal = Integer.parseInt(assertFailStatusObj.toString());
|
||||||
|
}
|
||||||
|
int timeoutTotal = 0;
|
||||||
|
Object timeoutStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_TIMEOUT_STATUS);
|
||||||
|
if (ObjectUtil.isNotNull(timeoutStatusObj)) {
|
||||||
|
timeoutTotal = Integer.parseInt(timeoutStatusObj.toString());
|
||||||
|
}
|
||||||
|
int cancelTotal = 0;
|
||||||
|
Object cancelStatusObj = entries.get(clusterKeyPrefix + PlanConstant.TASK_CANCEL_STATUS);
|
||||||
|
if (ObjectUtil.isNotNull(cancelStatusObj)) {
|
||||||
|
cancelTotal = Integer.parseInt(cancelStatusObj.toString());
|
||||||
|
}
|
||||||
|
// TODO: 统计库表数据进行更新,而不是通过缓存
|
||||||
|
// 更新批次表
|
||||||
|
AtuPlanBatch planBatch = new AtuPlanBatch();
|
||||||
|
planBatch.setId(batchId);
|
||||||
|
planBatch.setSuccessNum(successTotal);
|
||||||
|
planBatch.setExecFailNum(failTotal);
|
||||||
|
planBatch.setAssertFailNum(assertFailTotal);
|
||||||
|
planBatch.setTimeoutNum(timeoutTotal);
|
||||||
|
planBatch.setCancelNum(cancelTotal);
|
||||||
|
|
||||||
// 通过数据库进行统计,减少统计数据出错的原因
|
planBatch.setSuccessRate(MsgConstant.PLAN_BATCH_SUCCESS_RATE);
|
||||||
AtuPlanTask taskQ = new AtuPlanTask();
|
if (successTotal > 0 && total > 0) {
|
||||||
taskQ.setBatchId(batchId);
|
planBatch.setSuccessRate(NumberUtil.decimalFormat(MsgConstant.PATTERN_HASH_DOT_HASH_HASH, NumberUtil.div(successTotal, total)));
|
||||||
List<ScriptStatusStatistic> resultList = planTaskService.countTaskStatus(taskQ);
|
}
|
||||||
List<String> batchStatusList = new ArrayList<>();
|
// 查询批次最先开始执行的任务的开始时间
|
||||||
for (ScriptStatusStatistic result : resultList) {
|
if (planBatch.getStartTime() == null) {
|
||||||
Set<String> statusSet = result.getStatusSet();
|
Long firstTaskStartTime = planTaskService.queryBatchFirstTaskStartTime(batchId);
|
||||||
result.setStatus(scriptStatus(statusSet)); // 根据脚本下的所有任务确定脚本的执行结果
|
if (ObjectUtil.isNotNull(firstTaskStartTime)) {
|
||||||
batchStatusList.add(result.getStatus());
|
planBatch.setStartTime(new Date(firstTaskStartTime));
|
||||||
}
|
}
|
||||||
int allScriptCount = batchStatusList.size(); // 总脚本执行数
|
}
|
||||||
// 等待中
|
// 任务状态判断
|
||||||
int waitCount = Collections.frequency(batchStatusList, PlanConstant.TASK_WAIT_EXECUTE_STATUS);
|
String planStatus = "";
|
||||||
// 执行中
|
if (cancelTotal > 0) {
|
||||||
int ingCount = Collections.frequency(batchStatusList, PlanConstant.TASK_START_EXECUTE_STATUS);
|
planStatus = PlanConstant.PLAN_CANCEL_STATUS;
|
||||||
// 成功
|
// 修改为取消状态
|
||||||
int successCount = Collections.frequency(batchStatusList, PlanConstant.TASK_EXECUTE_SUCCESS_STATUS);
|
planBatch.setStatus(PlanConstant.BATCH_CANCEL_STATUS);
|
||||||
// 执行失败
|
} else {
|
||||||
int executeFailCount = Collections.frequency(batchStatusList, PlanConstant.TASK_EXECUTE_FAIL_STATUS);
|
planStatus = PlanConstant.PLAN_FINISH_STATUS;
|
||||||
// 断言失败
|
// 修改为已完成状态
|
||||||
int assertFailCount = Collections.frequency(batchStatusList, PlanConstant.TASK_ASSERT_FAIL_STATUS);
|
planBatch.setStatus(PlanConstant.BATCH_FINISH_STATUS);
|
||||||
// 取消的
|
}
|
||||||
int cancelCount = Collections.frequency(batchStatusList, PlanConstant.TASK_CANCEL_STATUS);
|
|
||||||
// 超时的
|
|
||||||
int timeoutCount = Collections.frequency(batchStatusList, PlanConstant.TASK_TIMEOUT_STATUS);
|
|
||||||
|
|
||||||
if (waitCount > 0 || ingCount > 0) { // 批次还没有完成
|
// 查询批次最后执行完成的任务的结束时间
|
||||||
// 检查批次的开始时间
|
Long lastTaskEndTime = planTaskService.queryBatchLastTaskEndTime(batchId);
|
||||||
AtuPlanBatch batch = this.planBatchService.findByPrimaryKey(batchId);
|
if (ObjectUtil.isNotNull(lastTaskEndTime)) {
|
||||||
if (batch != null) {
|
planBatch.setEndTime(new Date(lastTaskEndTime));
|
||||||
Date createdTime = batch.getCreatedTime();
|
}
|
||||||
if (createdTime != null) { // 有创建时间,那么比较开始时间与现在的时间差
|
logger.debug("更新批次信息");
|
||||||
LocalDateTime createDateTime = createdTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
|
planBatchService.updateByPrimaryKey(planBatch);
|
||||||
LocalDateTime now = LocalDateTime.now();
|
|
||||||
Duration duration = Duration.between(createDateTime, now);
|
|
||||||
long hours = duration.toHours();
|
|
||||||
if (atuPlanConfig.isBatchExecuteTimeoutEnabled() && hours > atuPlanConfig.getBatchExecuteTimeout().longValue()) { // 参数化
|
|
||||||
logger.info("批次[{}-{}]已创建{}小时,任务没有执行完成,强制结束批次", batch.getId(), batch.getBatch(), 48);
|
|
||||||
// 如果批次的创建时间离现在已经过去48小时还没完成,那么判定批次已完成,等待中|执行中 的任务全部算超时处理
|
|
||||||
timeoutCount += waitCount;
|
|
||||||
timeoutCount += ingCount;
|
|
||||||
waitCount = 0;
|
|
||||||
ingCount = 0;
|
|
||||||
// 更新批次下 等待中(0)|执行中(1) 的任务全部算超时处理(6)
|
|
||||||
this.planTaskService.batchUpdateTaskStatusToTimeout(batch.getId(), PlanConstant.TASK_TIMEOUT_STATUS, PlanConstant.TASK_WAIT_EXECUTE_STATUS, PlanConstant.TASK_START_EXECUTE_STATUS);
|
|
||||||
} else {
|
|
||||||
// 批次还没有完成,退出统计
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// 有批次信息,但是没有创建时间,应该是数据有问题
|
|
||||||
logger.debug("批次[{}]没有创建时间", batch.getId());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else { // 没有对应批次数据,删除缓存数据
|
|
||||||
logger.debug("没有对应的批次信息[{}],删除缓存数据", batchId);
|
|
||||||
redisTemplate.delete(key);
|
|
||||||
return; // 退出这个批次统计
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 批次已完成
|
|
||||||
|
|
||||||
// 更新批次表
|
// 删除任务队列
|
||||||
AtuPlanBatch planBatch = new AtuPlanBatch();
|
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_PC_KEY + batchId);
|
||||||
planBatch.setId(batchId);
|
// 移动端删除各个平台对于的队列
|
||||||
planBatch.setSuccessNum(successCount);
|
MobilePlatformEnum[] platformEnums = MobilePlatformEnum.values();
|
||||||
planBatch.setExecFailNum(executeFailCount);
|
for (MobilePlatformEnum platformEnum : platformEnums) {
|
||||||
planBatch.setAssertFailNum(assertFailCount);
|
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_MOB_KEY + platformEnum.getName() + "." + batchId);
|
||||||
planBatch.setTimeoutNum(timeoutCount);
|
}
|
||||||
planBatch.setCancelNum(cancelCount);
|
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_API_KEY + batchId);
|
||||||
|
|
||||||
planBatch.setSuccessRate(MsgConstant.PLAN_BATCH_SUCCESS_RATE);
|
//获取更新后的批次信息
|
||||||
if (planBatch.getSuccessNum() > 0 && allScriptCount > 0) {
|
AtuPlanBatchDetailDto atuPlanBatchDetailDto = planBatchService.queryBatchDetailById(planBatch.getId());
|
||||||
planBatch.setSuccessRate(NumberUtil.decimalFormat(MsgConstant.PATTERN_HASH_DOT_HASH_HASH, NumberUtil.div(planBatch.getSuccessNum(), allScriptCount)));
|
|
||||||
}
|
|
||||||
// 查询批次最先开始执行的任务的开始时间
|
|
||||||
if (planBatch.getStartTime() == null) {
|
|
||||||
Long firstTaskStartTime = planTaskService.queryBatchFirstTaskStartTime(batchId);
|
|
||||||
if (ObjectUtil.isNotNull(firstTaskStartTime)) {
|
|
||||||
planBatch.setStartTime(new Date(firstTaskStartTime));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 任务状态判断
|
|
||||||
String planStatus = "";
|
|
||||||
if (planBatch.getCancelNum() > 0) {
|
|
||||||
planStatus = PlanConstant.PLAN_CANCEL_STATUS;
|
|
||||||
// 修改为取消状态
|
|
||||||
planBatch.setStatus(PlanConstant.BATCH_CANCEL_STATUS);
|
|
||||||
} else {
|
|
||||||
planStatus = PlanConstant.PLAN_FINISH_STATUS;
|
|
||||||
// 修改为已完成状态
|
|
||||||
planBatch.setStatus(PlanConstant.BATCH_FINISH_STATUS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 查询批次最后执行完成的任务的结束时间
|
// 判断是否计划最后一批次
|
||||||
Long lastTaskEndTime = planTaskService.queryBatchLastTaskEndTime(batchId);
|
AtuPlanInfo planInfo = planInfoService.queryByLastBatchId(batchId);
|
||||||
if (ObjectUtil.isNotNull(lastTaskEndTime)) {
|
if (ObjectUtil.isNotNull(planInfo)) {
|
||||||
planBatch.setEndTime(new Date(lastTaskEndTime));
|
// 是则更新计划表
|
||||||
}
|
planInfo.setStatus(planStatus);
|
||||||
logger.debug("更新批次信息");
|
planInfo.setWaitingNum(waitTotal);
|
||||||
planBatchService.updateByPrimaryKey(planBatch);
|
planInfo.setRunningNum(runningTotal);
|
||||||
|
planInfo.setSuccessNum(successTotal);
|
||||||
|
planInfo.setExecFailNum(failTotal);
|
||||||
|
planInfo.setAssertFailNum(assertFailTotal);
|
||||||
|
planInfo.setTimeoutNum(timeoutTotal);
|
||||||
|
planInfo.setCancelNum(cancelTotal);
|
||||||
|
//不用代码生成的方法
|
||||||
|
planInfoService.updatePlanResultById(planInfo);
|
||||||
|
}
|
||||||
|
|
||||||
// 删除任务队列
|
executor.execute(() -> {
|
||||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_PC_KEY + batchId);
|
// 保存设备性能数据
|
||||||
// 移动端删除各个平台对于的队列
|
saveDevicePerData(atuPlanBatchDetailDto);
|
||||||
MobilePlatformEnum[] platformEnums = MobilePlatformEnum.values();
|
|
||||||
for (MobilePlatformEnum platformEnum : platformEnums) {
|
|
||||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_MOB_KEY + platformEnum.getName() + "." + batchId);
|
|
||||||
}
|
|
||||||
amqpAdmin.deleteQueue(RabbitConstant.TASK_EXEC_QUEUE_API_KEY + batchId);
|
|
||||||
|
|
||||||
//获取更新后的批次信息
|
|
||||||
AtuPlanBatchDetailDto atuPlanBatchDetailDto = planBatchService.queryBatchDetailById(planBatch.getId());
|
|
||||||
|
|
||||||
// 判断是否计划最后一批次
|
|
||||||
AtuPlanInfo planInfo = planInfoService.queryByLastBatchId(batchId);
|
|
||||||
if (ObjectUtil.isNotNull(planInfo)) {
|
|
||||||
// 是则更新计划表
|
|
||||||
planInfo.setStatus(planStatus);
|
|
||||||
planInfo.setWaitingNum(waitCount);
|
|
||||||
planInfo.setRunningNum(ingCount);
|
|
||||||
planInfo.setSuccessNum(planBatch.getSuccessNum());
|
|
||||||
planInfo.setExecFailNum(planBatch.getExecFailNum());
|
|
||||||
planInfo.setAssertFailNum(planBatch.getAssertFailNum());
|
|
||||||
planInfo.setTimeoutNum(planBatch.getTimeoutNum());
|
|
||||||
planInfo.setCancelNum(planBatch.getCancelNum());
|
|
||||||
//不用代码生成的方法
|
|
||||||
planInfoService.updatePlanResultById(planInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
executor.execute(() -> {
|
|
||||||
// 保存设备性能数据
|
|
||||||
saveDevicePerData(atuPlanBatchDetailDto);
|
|
||||||
});
|
|
||||||
|
|
||||||
// 2.2. 删除缓存中已完成批次的记录
|
|
||||||
logger.debug("删除批次统计缓存 => " + key);
|
|
||||||
redisTemplate.delete(key);
|
|
||||||
|
|
||||||
// 收尾处理
|
|
||||||
AtuPlanInfo batchPlanInfo = planInfoService.findByBatchId(batchId);
|
|
||||||
if (batchPlanInfo != null) {
|
|
||||||
handleEnd(batchPlanInfo, atuPlanBatchDetailDto);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
logger.debug("同步缓存中计划批次统计数据----end----");
|
|
||||||
} catch (Exception e) {
|
// 2.2. 删除缓存中已完成批次的记录
|
||||||
logger.error("批次统计数据执行发生异常", e);
|
logger.debug("删除批次统计缓存 => " + key);
|
||||||
} finally {
|
redisTemplate.delete(key);
|
||||||
lock.unlock();
|
|
||||||
}
|
// 收尾处理
|
||||||
|
AtuPlanInfo batchPlanInfo = planInfoService.findByBatchId(batchId);
|
||||||
|
if (batchPlanInfo != null) {
|
||||||
|
handleEnd(batchPlanInfo, atuPlanBatchDetailDto);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
logger.debug("同步缓存中计划批次统计数据----end----");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -749,37 +710,4 @@ public class PlanBatchTaskDataUpdateJob {
|
||||||
// 绑定脚本
|
// 绑定脚本
|
||||||
planScriptLinkService.insertByBatch(scriptLinkList);
|
planScriptLinkService.insertByBatch(scriptLinkList);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 用于确定脚本的执行结果状态
|
|
||||||
* @param statusSet 脚本下的所有任务状态
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
private String scriptStatus(Set<String> statusSet) {
|
|
||||||
String status = PlanConstant.TASK_EXECUTE_SUCCESS_STATUS; // 默认成功
|
|
||||||
if (statusSet == null || statusSet.isEmpty()) { // 如果状态为空,那么脚本成功
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
boolean flag = statusSet.remove(PlanConstant.TASK_WAIT_EXECUTE_STATUS);
|
|
||||||
if (flag) { // 如果存在等待(0)
|
|
||||||
if (statusSet.isEmpty()) { // 全都是等待(0)
|
|
||||||
status = PlanConstant.TASK_WAIT_EXECUTE_STATUS;
|
|
||||||
} else { // 还有其他状态
|
|
||||||
status = PlanConstant.TASK_START_EXECUTE_STATUS;
|
|
||||||
}
|
|
||||||
} else { // 不存在等待(0)
|
|
||||||
if (statusSet.remove(PlanConstant.TASK_START_EXECUTE_STATUS)) { // 存在执行中(1)
|
|
||||||
status = PlanConstant.TASK_START_EXECUTE_STATUS;
|
|
||||||
} else if (statusSet.remove(PlanConstant.TASK_CANCEL_STATUS)) { // 存在取消(5)
|
|
||||||
status = PlanConstant.TASK_CANCEL_STATUS;
|
|
||||||
} else if (statusSet.remove(PlanConstant.TASK_TIMEOUT_STATUS)) { // 存在超时(6)
|
|
||||||
status = PlanConstant.TASK_TIMEOUT_STATUS;
|
|
||||||
} else if (statusSet.remove(PlanConstant.TASK_ASSERT_FAIL_STATUS)) { // 存在断言失败(4)
|
|
||||||
status = PlanConstant.TASK_ASSERT_FAIL_STATUS;
|
|
||||||
} else if (statusSet.remove(PlanConstant.TASK_EXECUTE_FAIL_STATUS)) { // 存在执行失败(3)
|
|
||||||
status = PlanConstant.TASK_EXECUTE_FAIL_STATUS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -366,35 +366,4 @@
|
||||||
<select id="queryAllFailIdList" resultType="java.lang.String">
|
<select id="queryAllFailIdList" resultType="java.lang.String">
|
||||||
select id from <include refid="Table_Name"/> where batch_id = #{batchId} and status in ("3", "4", "6")
|
select id from <include refid="Table_Name"/> where batch_id = #{batchId} and status in ("3", "4", "6")
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
<resultMap id="taskStatusMap" type="net.northking.cctp.executePlan.dto.planTask.ScriptStatusStatistic">
|
|
||||||
<!-- 脚本id -->
|
|
||||||
<result column="script_id" jdbcType="VARCHAR" property="scriptId"/>
|
|
||||||
<!-- 状态字符串 -->
|
|
||||||
<result column="task_status" jdbcType="VARCHAR" property="statusString"/>
|
|
||||||
<!-- 任务数量 -->
|
|
||||||
<result column="total" jdbcType="INTEGER" property="total" />
|
|
||||||
</resultMap>
|
|
||||||
|
|
||||||
<select id="countTaskStatus" parameterType="net.northking.cctp.executePlan.db.entity.AtuPlanTask" resultMap="taskStatusMap">
|
|
||||||
SELECT
|
|
||||||
script_id,
|
|
||||||
group_concat(status) as task_status,
|
|
||||||
count(*) as total
|
|
||||||
FROM <include refid="Table_Name" />
|
|
||||||
WHERE
|
|
||||||
batch_id = #{task.batchId,jdbcType=VARCHAR}
|
|
||||||
GROUP BY script_id
|
|
||||||
</select>
|
|
||||||
|
|
||||||
<update id="batchUpdateTaskStatusToTimeout" >
|
|
||||||
UPDATE
|
|
||||||
<include refid="Table_Name" />
|
|
||||||
SET
|
|
||||||
`status` = #{timeoutStatus,jdbcType=VARCHAR}
|
|
||||||
WHERE
|
|
||||||
batch_id = #{batchId,jdbcType=VARCHAR}
|
|
||||||
AND
|
|
||||||
(`status` = #{waitStatus, jdbcType=VARCHAR} or `status` = #{timeoutStatus, jdbcType=VARCHAR})
|
|
||||||
</update>
|
|
||||||
</mapper>
|
</mapper>
|
Loading…
Reference in New Issue