diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceTaskEvictPolicyResource.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceTaskEvictPolicyResource.java index 6425dd8e75..5a2ace9ae6 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceTaskEvictPolicyResource.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/api/inner/ServiceTaskEvictPolicyResource.java @@ -46,9 +46,10 @@ public interface ServiceTaskEvictPolicyResource { @GetMapping("/current") InternalResponse getCurrentPolicy(); - @ApiOperation(value = "设置任务驱逐策略,参考值:{\"@type\":\"ComposedTaskEvictPolicy\",\"operator\":\"OR\"," - + "\"policyList\":[{\"@type\":\"AppIdTaskEvictPolicy\",\"appIdsToEvict\":[2,3]}," - + "{\"@type\":\"AppCodeTaskEvictPolicy\",\"appCodesToEvict\":[\"appCode1\",\"appCode2\"]}]}\n", + @ApiOperation(value = "设置任务驱逐策略,参考值:{\"@type\":\"ComposedTaskEvictPolicy\",\"operator\":\"OR\"," + + "\"policyList\":[{\"@type\":\"TaskInstanceIdEvictPolicy\",\"taskInstanceIdsToEvict\":[1001,1002]}," + + "{\"@type\":\"AppIdTaskEvictPolicy\",\"appIdsToEvict\":[2,3]},{\"@type\":\"AppCodeTaskEvictPolicy\"," + + "\"appCodesToEvict\":[\"appCode1\",\"appCode2\"]}]}", produces = "application/json") @PutMapping("/") InternalResponse setPolicy(@RequestBody ComposedTaskEvictPolicyDTO policyDTO); diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppCodeTaskEvictPolicyDTO.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppCodeTaskEvictPolicyDTO.java index 739809304a..551a28f6a8 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppCodeTaskEvictPolicyDTO.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppCodeTaskEvictPolicyDTO.java @@ -30,6 +30,9 @@ import java.util.List; +/** + * 根据创建任务使用的AppCode来驱逐任务的策略实体 + */ @Data @NoArgsConstructor @EqualsAndHashCode(callSuper = false) diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppIdTaskEvictPolicyDTO.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppIdTaskEvictPolicyDTO.java index 13f1a4a145..5e707c4884 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppIdTaskEvictPolicyDTO.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/AppIdTaskEvictPolicyDTO.java @@ -30,6 +30,9 @@ import java.util.List; +/** + * 根据Job业务Id来驱逐任务的策略实体 + */ @Data @NoArgsConstructor @EqualsAndHashCode(callSuper = false) diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/ComposedTaskEvictPolicyDTO.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/ComposedTaskEvictPolicyDTO.java index b621861e75..e17c25ab18 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/ComposedTaskEvictPolicyDTO.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/ComposedTaskEvictPolicyDTO.java @@ -32,6 +32,9 @@ import java.util.Arrays; import java.util.List; +/** + * 任务驱逐组合策略实体 + */ @Data @NoArgsConstructor @EqualsAndHashCode(callSuper = false) @@ -39,11 +42,14 @@ public class ComposedTaskEvictPolicyDTO extends TaskEvictPolicyDTO { public static final String classType = "ComposedTaskEvictPolicy"; + // 策略组合操作符,支持AND/OR public enum ComposeOperator { AND, OR } + // 策略组合操作符 protected ComposeOperator operator = null; + // 子策略列表 protected List policyList = new ArrayList<>(); public ComposedTaskEvictPolicyDTO(ComposeOperator operator, TaskEvictPolicyDTO... policyList) { diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskEvictPolicyDTO.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskEvictPolicyDTO.java index 1016137362..42e3f06435 100644 --- a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskEvictPolicyDTO.java +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskEvictPolicyDTO.java @@ -32,6 +32,7 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type") @JsonSubTypes({ + @JsonSubTypes.Type(value = TaskInstanceIdEvictPolicyDTO.class, name = TaskInstanceIdEvictPolicyDTO.classType), @JsonSubTypes.Type(value = AppCodeTaskEvictPolicyDTO.class, name = AppCodeTaskEvictPolicyDTO.classType), @JsonSubTypes.Type(value = AppIdTaskEvictPolicyDTO.class, name = AppIdTaskEvictPolicyDTO.classType), @JsonSubTypes.Type(value = ComposedTaskEvictPolicyDTO.class, name = ComposedTaskEvictPolicyDTO.classType)}) diff --git a/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskInstanceIdEvictPolicyDTO.java b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskInstanceIdEvictPolicyDTO.java new file mode 100644 index 0000000000..111a5d6b92 --- /dev/null +++ b/src/backend/job-execute/api-job-execute/src/main/java/com/tencent/bk/job/execute/model/inner/TaskInstanceIdEvictPolicyDTO.java @@ -0,0 +1,49 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.model.inner; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 根据任务实例Id来驱逐任务的策略实体 + */ +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false) +public class TaskInstanceIdEvictPolicyDTO extends TaskEvictPolicyDTO { + + public static final String classType = "TaskInstanceIdEvictPolicy"; + + protected List taskInstanceIdsToEvict; + + public TaskInstanceIdEvictPolicyDTO(List taskInstanceIdsToEvict) { + this.taskInstanceIdsToEvict = taskInstanceIdsToEvict; + } + +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/ITaskEvictPolicy.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/ITaskEvictPolicy.java index 2ac3e67557..51c7555ced 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/ITaskEvictPolicy.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/ITaskEvictPolicy.java @@ -30,5 +30,12 @@ * 执行引擎任务驱逐策略接口类 */ public interface ITaskEvictPolicy { + + /** + * 判断一个任务实例是否需要被驱逐出执行引擎 + * + * @param taskInstance 任务实例 + * @return 布尔值,是否需要被驱逐 + */ boolean needToEvict(TaskInstanceDTO taskInstance); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/PolicyFactory.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/PolicyFactory.java index 91ec3a5015..8848470659 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/PolicyFactory.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/PolicyFactory.java @@ -30,6 +30,7 @@ import com.tencent.bk.job.execute.model.inner.AppIdTaskEvictPolicyDTO; import com.tencent.bk.job.execute.model.inner.ComposedTaskEvictPolicyDTO; import com.tencent.bk.job.execute.model.inner.TaskEvictPolicyDTO; +import com.tencent.bk.job.execute.model.inner.TaskInstanceIdEvictPolicyDTO; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -43,20 +44,22 @@ public class PolicyFactory { /** * 根据策略数据实体,创建任务驱逐策略对象 * - * @param policyDTO 策略数据实体 + * @param policy 策略数据实体 * @return 策略对象 */ - public static ITaskEvictPolicy createPolicyByDTO(TaskEvictPolicyDTO policyDTO) { - if (policyDTO instanceof AppCodeTaskEvictPolicyDTO) { - return new AppCodeTaskEvictPolicy(((AppCodeTaskEvictPolicyDTO) policyDTO).getAppCodesToEvict()); - } else if (policyDTO instanceof AppIdTaskEvictPolicyDTO) { - return new AppIdTaskEvictPolicy(((AppIdTaskEvictPolicyDTO) policyDTO).getAppIdsToEvict()); - } else if (policyDTO instanceof ComposedTaskEvictPolicyDTO) { - ComposedTaskEvictPolicyDTO composedPolicyDTO = (ComposedTaskEvictPolicyDTO) policyDTO; + public static ITaskEvictPolicy createPolicyByDTO(TaskEvictPolicyDTO policy) { + if (policy instanceof TaskInstanceIdEvictPolicyDTO) { + return new TaskInstanceIdEvictPolicy(((TaskInstanceIdEvictPolicyDTO) policy).getTaskInstanceIdsToEvict()); + } else if (policy instanceof AppCodeTaskEvictPolicyDTO) { + return new AppCodeTaskEvictPolicy(((AppCodeTaskEvictPolicyDTO) policy).getAppCodesToEvict()); + } else if (policy instanceof AppIdTaskEvictPolicyDTO) { + return new AppIdTaskEvictPolicy(((AppIdTaskEvictPolicyDTO) policy).getAppIdsToEvict()); + } else if (policy instanceof ComposedTaskEvictPolicyDTO) { + ComposedTaskEvictPolicyDTO composedPolicyDTO = (ComposedTaskEvictPolicyDTO) policy; List policyDTOList = composedPolicyDTO.getPolicyList(); return new ComposedTaskEvictPolicy(composedPolicyDTO.getOperator(), policyDTOList); } else { - log.error("TaskEvictPolicyDTO not support yet:{}", policyDTO); + log.error("TaskEvictPolicyDTO not support yet:{}", policy); throw new InternalException(ErrorCode.NOT_SUPPORT_FEATURE); } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java index b424e9924b..95864626b9 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java @@ -68,14 +68,16 @@ public boolean shouldEvictTask(TaskInstanceDTO taskInstance) { * 更新被驱逐的任务的状态为被丢弃状态 * * @param taskInstance 任务实例 - * @return 是否更新成功 */ - public boolean updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { + public void updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { long endTime = System.currentTimeMillis(); Long taskInstanceId = stepInstance.getTaskInstanceId(); - if (RunStatusEnum.isFinishedStatus(stepInstance.getStatus())) { - long totalTime = TaskCostCalculator.calculate(stepInstance.getStartTime(), endTime, - stepInstance.getTotalTime()); + if (!RunStatusEnum.isFinishedStatus(stepInstance.getStatus())) { + long totalTime = TaskCostCalculator.calculate( + stepInstance.getStartTime(), + endTime, + stepInstance.getTotalTime() + ); taskInstanceService.updateStepExecutionInfo( stepInstance.getId(), RunStatusEnum.ABANDONED, @@ -90,9 +92,12 @@ public boolean updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanc stepInstance.getStatus() ); } - if (RunStatusEnum.isFinishedStatus(taskInstance.getStatus())) { - long totalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime, - taskInstance.getTotalTime()); + if (!RunStatusEnum.isFinishedStatus(taskInstance.getStatus())) { + long totalTime = TaskCostCalculator.calculate( + taskInstance.getStartTime(), + endTime, + taskInstance.getTotalTime() + ); taskInstanceService.updateTaskExecutionInfo( taskInstanceId, RunStatusEnum.ABANDONED, @@ -108,6 +113,5 @@ public boolean updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanc taskInstance.getStatus() ); } - return true; } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyManager.java index ff56973a19..c7bf0f1292 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyManager.java @@ -48,9 +48,12 @@ public class TaskEvictPolicyManager { // 策略更新时间间隔:10s private final int POLICY_UPDATE_INTERVAL_MILLS = 10000; + // 序列化后的策略字符串 private String policyJsonStr = null; + // 组合策略 private volatile ComposedTaskEvictPolicy policy = null; + // Redis操作模板接口 private final RedisTemplate redisTemplate; @Autowired @@ -68,24 +71,25 @@ public ComposedTaskEvictPolicy getPolicy() { public void updatePolicy() { String loadedPolicyJsonStr = redisTemplate.opsForValue() .get(RedisConstants.KEY_EXECUTE_TASK_EVICT_POLICY); - if (StringUtil.isDifferent(policyJsonStr, loadedPolicyJsonStr)) { - policyJsonStr = loadedPolicyJsonStr; - try { - policy = StringUtils.isBlank(loadedPolicyJsonStr) ? null : JsonUtils.fromJson( - loadedPolicyJsonStr, new TypeReference() { - } - ); - log.info("loaded new policy:{}", loadedPolicyJsonStr); - } catch (Exception e) { - FormattingTuple message = MessageFormatter.format( - "Fail to parse taskEvictPolicy from {}", - loadedPolicyJsonStr - ); - log.warn(message.getMessage(), e); - throw e; - } - } else { + // 策略无任何改变则无须解析更新 + if (!StringUtil.isDifferent(policyJsonStr, loadedPolicyJsonStr)) { log.debug("taskEvictPolicy not change"); + return; + } + policyJsonStr = loadedPolicyJsonStr; + try { + policy = StringUtils.isBlank(loadedPolicyJsonStr) ? null : JsonUtils.fromJson( + loadedPolicyJsonStr, new TypeReference() { + } + ); + log.info("loaded new policy:{}", loadedPolicyJsonStr); + } catch (Exception e) { + FormattingTuple message = MessageFormatter.format( + "Fail to parse taskEvictPolicy from {}", + loadedPolicyJsonStr + ); + log.warn(message.getMessage(), e); + throw e; } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskInstanceIdEvictPolicy.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskInstanceIdEvictPolicy.java new file mode 100644 index 0000000000..69df453d68 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskInstanceIdEvictPolicy.java @@ -0,0 +1,58 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.engine.evict; + +import com.tencent.bk.job.execute.model.TaskInstanceDTO; +import com.tencent.bk.job.execute.model.inner.TaskInstanceIdEvictPolicyDTO; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * 驱逐策略:根据任务实例ID将任务驱逐出执行引擎 + */ +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = false) +public class TaskInstanceIdEvictPolicy extends TaskInstanceIdEvictPolicyDTO implements ITaskEvictPolicy { + + private Set taskInstanceIdSetToEvict; + + public TaskInstanceIdEvictPolicy(List taskInstanceIdsToEvict) { + super(taskInstanceIdsToEvict); + // 使用查找速度为O(1)的Set + taskInstanceIdSetToEvict = new HashSet<>(taskInstanceIdsToEvict); + } + + @Override + public boolean needToEvict(TaskInstanceDTO taskInstance) { + Long taskInstanceId = taskInstance.getId(); + return taskInstanceIdSetToEvict.contains(taskInstanceId); + } +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java index e1696d09b7..daf77a7fcd 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java @@ -27,7 +27,6 @@ import com.tencent.bk.job.common.exception.InternalException; import com.tencent.bk.job.common.model.InternalResponse; import com.tencent.bk.job.common.model.dto.HostDTO; -import com.tencent.bk.job.common.util.ThreadUtils; import com.tencent.bk.job.common.util.file.PathUtil; import com.tencent.bk.job.execute.client.FileSourceTaskResourceClient; import com.tencent.bk.job.execute.dao.FileSourceTaskLogDAO; @@ -50,13 +49,13 @@ import com.tencent.bk.job.file_gateway.model.req.inner.FileSourceTaskContent; import com.tencent.bk.job.file_gateway.model.resp.inner.BatchTaskInfoDTO; import com.tencent.bk.job.file_gateway.model.resp.inner.TaskInfoDTO; -import com.tencent.bk.job.manage.common.consts.task.TaskStepTypeEnum; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.List; @@ -64,6 +63,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +/** + * 第三方源文件准备服务,用于在分发前控制第三方源文件的准备 + */ @Slf4j @Primary @Component @@ -98,6 +100,12 @@ public ThirdFilePrepareService(ResultHandleManager resultHandleManager, this.taskExecuteMQEventDispatcher = taskExecuteMQEventDispatcher; } + /** + * 将文件源文件下载任务的信息设置到分发源文件数据中 + * + * @param taskInfoDTO 文件源文件下载任务信息 + * @param fileSourceDTO 分发源文件数据 + */ private void setTaskInfoIntoThirdFileSource(TaskInfoDTO taskInfoDTO, FileSourceDTO fileSourceDTO) { String fileSourceTaskId = taskInfoDTO.getTaskId(); if (fileSourceDTO.getServers() == null) { @@ -109,9 +117,10 @@ private void setTaskInfoIntoThirdFileSource(TaskInfoDTO taskInfoDTO, FileSourceD fileSourceDTO.getServers().setIpList(hostDTOList); fileSourceDTO.setFileSourceTaskId(fileSourceTaskId); fileSourceDTO.getFiles().forEach(fileDetailDTO -> { - // 第二次处理,加上文件源名称的文件路径 - fileDetailDTO.setThirdFilePathWithFileSourceName(PathUtil.joinFilePath(taskInfoDTO.getFileSourceName(), - fileDetailDTO.getThirdFilePath())); + // 含文件源名称的文件路径 + fileDetailDTO.setThirdFilePathWithFileSourceName( + PathUtil.joinFilePath(taskInfoDTO.getFileSourceName(), fileDetailDTO.getThirdFilePath()) + ); }); } @@ -134,46 +143,12 @@ private void setBatchTaskInfoIntoThirdFileSource(BatchTaskInfoDTO batchTaskInfoD } } - public void retryPrepareFile( - Long stepInstanceId, - ThirdFilePrepareTaskResultHandler resultHandler - ) { - StepInstanceDTO stepInstance = taskInstanceService.getStepInstanceDetail(stepInstanceId); - int executeType = stepInstance.getExecuteType(); - // 当前仅有文件分发类步骤需要重试第三方文件源拉取 - if (TaskStepTypeEnum.FILE.getValue() != executeType) { - return; - } - List fileSourceList = stepInstance.getFileSourceList(); - Pair, List> thirdFileSource = parseThirdFileSource(fileSourceList); - List thirdFileSourceList = thirdFileSource.getLeft(); - List fileSourceTaskList = thirdFileSource.getRight(); - if (thirdFileSourceList.isEmpty()) { - return; - } - // 直接重新下载 - BatchTaskInfoDTO batchTaskInfoDTO = startFileSourceDownloadTask(stepInstance.getOperator(), - stepInstance.getAppId(), stepInstance.getId(), stepInstance.getExecuteCount(), fileSourceTaskList); - setBatchTaskInfoIntoThirdFileSource(batchTaskInfoDTO, thirdFileSourceList); - log.debug("new batchFileSourceTask: {}", batchTaskInfoDTO.getBatchTaskId()); - - // 更新文件源任务状态 - taskInstanceService.updateResolvedSourceFile(stepInstance.getId(), fileSourceList); - // 异步处理文件下载任务 - ThirdFilePrepareTask pullingResultTask = asyncWatchThirdFilePulling( - stepInstance, - fileSourceList, - batchTaskInfoDTO.getBatchTaskId(), - true, - resultHandler - ); - // 重试先同步处理 - while (!pullingResultTask.isReadyForNext()) { - ThreadUtils.sleep(100); - } - log.debug("continue to retry..."); - } - + /** + * 解析第三方文件源数据 + * + * @param fileSourceList 文件源列表 + * @return Pair<第三方文件源列表, 文件源文件任务列表> + */ private Pair, List> parseThirdFileSource( List fileSourceList ) { @@ -181,70 +156,74 @@ private Pair, List> parseThirdFileSou List fileSourceTaskList = new ArrayList<>(); for (FileSourceDTO fileSourceDTO : fileSourceList) { if (fileSourceDTO == null) { - log.warn("fileSourceDTO is null"); + log.warn("FileSourceDTO is null, continue"); continue; } Integer fileSourceId = fileSourceDTO.getFileSourceId(); - if (fileSourceId != null && fileSourceId > 0) { - List files = fileSourceDTO.getFiles(); - if (files == null) { - log.warn("files is null"); - continue; - } - for (FileDetailDTO file : files) { - if (file != null && StringUtils.isBlank(file.getThirdFilePath())) { - // 第一次处理,不含文件源名称的文件路径 - file.setThirdFilePath(file.getFilePath()); - } + if (fileSourceId == null || fileSourceId <= 0) { + log.warn("Invalid fileSourceId({}), continue", fileSourceId); + continue; + } + List files = fileSourceDTO.getFiles(); + if (CollectionUtils.isEmpty(files)) { + log.warn("Files is null or empty, continue"); + continue; + } + for (FileDetailDTO file : files) { + if (file != null && StringUtils.isBlank(file.getThirdFilePath())) { + // 不含文件源名称的文件源文件路径 + file.setThirdFilePath(file.getFilePath()); } - // 第三方COS等文件源的文件任务处理 - List filePaths = - files.parallelStream().map(FileDetailDTO::getThirdFilePath).collect(Collectors.toList()); - fileSourceTaskList.add(new FileSourceTaskContent(fileSourceId, filePaths)); - thirdFileSourceList.add(fileSourceDTO); } + // 收集第三方文件源文件路径 + List filePaths = files.parallelStream() + .map(FileDetailDTO::getThirdFilePath) + .collect(Collectors.toList()); + // 收集文件源文件任务 + fileSourceTaskList.add(new FileSourceTaskContent(fileSourceId, filePaths)); + // 收集文件源信息 + thirdFileSourceList.add(fileSourceDTO); } return Pair.of(thirdFileSourceList, fileSourceTaskList); } - public ThirdFilePrepareTask prepareThirdFileAsync( + public void prepareThirdFileAsync( StepInstanceDTO stepInstance, ThirdFilePrepareTaskResultHandler resultHandler ) { List fileSourceList = stepInstance.getFileSourceList(); - // 准备第三方源文件 + // 解析第三方源文件,收集任务数据 Pair, List> thirdFileSource = parseThirdFileSource(fileSourceList); List thirdFileSourceList = thirdFileSource.getLeft(); List fileSourceTaskList = thirdFileSource.getRight(); - if (thirdFileSourceList == null || thirdFileSourceList.isEmpty()) { - // TODO-Rolling - taskExecuteMQEventDispatcher.dispatchGseTaskEvent( - GseTaskEvent.startGseTask(stepInstance.getId(), stepInstance.getExecuteCount(), - stepInstance.getBatch(), null, null)); - return null; + if (CollectionUtils.isEmpty(thirdFileSourceList)) { + continueStepAtOnce(stepInstance); + return; } log.debug("[{}]: Start FileSourceBatchTask: {}", stepInstance.getUniqueKey(), fileSourceTaskList); - BatchTaskInfoDTO batchTaskInfoDTO = startFileSourceDownloadTask(stepInstance.getOperator(), - stepInstance.getAppId(), stepInstance.getId(), stepInstance.getExecuteCount(), fileSourceTaskList); - setBatchTaskInfoIntoThirdFileSource(batchTaskInfoDTO, thirdFileSourceList); - log.debug("[{}]: fileSourceList={}", stepInstance.getUniqueKey(), fileSourceList); + // 启动下载任务 + BatchTaskInfoDTO batchTaskInfoDTO = startFileSourceDownloadTask( + stepInstance.getOperator(), + stepInstance.getAppId(), + stepInstance.getId(), + stepInstance.getExecuteCount(), + fileSourceTaskList + ); log.info( "[{}]: fileSourceDownloadTask started, batchTaskId:{},taskInfoList={}", stepInstance.getUniqueKey(), batchTaskInfoDTO.getBatchTaskId(), batchTaskInfoDTO.getTaskInfoList() ); + // 填充任务信息到分发源文件数据 + setBatchTaskInfoIntoThirdFileSource(batchTaskInfoDTO, thirdFileSourceList); + log.debug("[{}]: fileSourceList={}", stepInstance.getUniqueKey(), fileSourceList); // 放进文件源下载任务进度表中 - FileSourceTaskLogDTO fileSourceTaskLogDTO = new FileSourceTaskLogDTO(); - fileSourceTaskLogDTO.setStepInstanceId(stepInstance.getId()); - fileSourceTaskLogDTO.setExecuteCount(stepInstance.getExecuteCount()); - fileSourceTaskLogDTO.setFileSourceBatchTaskId(batchTaskInfoDTO.getBatchTaskId()); - fileSourceTaskLogDTO.setStatus(TaskStatusEnum.INIT.getStatus().intValue()); - fileSourceTaskLogDTO.setStartTime(System.currentTimeMillis()); + FileSourceTaskLogDTO fileSourceTaskLogDTO = buildInitFileSourceTaskLog(stepInstance, batchTaskInfoDTO); fileSourceTaskLogDAO.saveFileSourceTaskLog(fileSourceTaskLogDTO); // 更新文件源任务状态 taskInstanceService.updateResolvedSourceFile(stepInstance.getId(), fileSourceList); - // 异步处理文件下载任务 + // 异步轮询文件下载任务 ThirdFilePrepareTask task = asyncWatchThirdFilePulling( stepInstance, fileSourceList, @@ -253,7 +232,41 @@ public ThirdFilePrepareTask prepareThirdFileAsync( resultHandler ); taskMap.put(stepInstance.getUniqueKey(), task); - return task; + } + + /** + * 立即继续步骤 + * + * @param stepInstance 步骤实例 + */ + private void continueStepAtOnce(StepInstanceDTO stepInstance) { + taskExecuteMQEventDispatcher.dispatchGseTaskEvent( + GseTaskEvent.startGseTask( + stepInstance.getId(), + stepInstance.getExecuteCount(), + stepInstance.getBatch(), + null, + null + ) + ); + } + + /** + * 构建文件源文件拉取任务的初始日志 + * + * @param stepInstance 步骤实例 + * @param batchTaskInfoDTO 文件源文件拉取任务信息 + * @return 文件源任务日志 + */ + private FileSourceTaskLogDTO buildInitFileSourceTaskLog(StepInstanceDTO stepInstance, + BatchTaskInfoDTO batchTaskInfoDTO) { + FileSourceTaskLogDTO fileSourceTaskLogDTO = new FileSourceTaskLogDTO(); + fileSourceTaskLogDTO.setStepInstanceId(stepInstance.getId()); + fileSourceTaskLogDTO.setExecuteCount(stepInstance.getExecuteCount()); + fileSourceTaskLogDTO.setFileSourceBatchTaskId(batchTaskInfoDTO.getBatchTaskId()); + fileSourceTaskLogDTO.setStatus(TaskStatusEnum.INIT.getStatus().intValue()); + fileSourceTaskLogDTO.setStartTime(System.currentTimeMillis()); + return fileSourceTaskLogDTO; } public void stopPrepareThirdFileAsync(StepInstanceDTO stepInstance) { @@ -267,33 +280,66 @@ public void clearPreparedTmpFile(long stepInstanceId) { StepInstanceDTO stepInstance = taskInstanceService.getStepInstanceDetail(stepInstanceId); // 找出所有第三方文件源的TaskId进行清理 List fileSourceList = stepInstance.getFileSourceList(); + List fileSourceTaskIdList = findFileSourceTaskIds(stepInstanceId, fileSourceList); + log.info("FileSourceTaskIds to be cleared:{}", fileSourceTaskIdList); + if (CollectionUtils.isEmpty(fileSourceTaskIdList)) { + return; + } + try { + // 调用file-gateway接口通知清理临时文件 + clearTaskFiles(fileSourceTaskIdList); + } catch (Throwable t) { + log.error("Fail to clearTaskFiles, fileSourceTaskIdList={}", fileSourceTaskIdList, t); + } + } + + /** + * 从源文件数据中找到所有的第三方文件源文件下载任务ID + * + * @param stepInstanceId 步骤Id + * @param fileSourceList 源文件数据列表 + * @return 所有的第三方文件源文件下载任务ID + */ + private List findFileSourceTaskIds(long stepInstanceId, List fileSourceList) { List fileSourceTaskIdList = new ArrayList<>(); for (FileSourceDTO fileSourceDTO : fileSourceList) { Integer fileSourceId = fileSourceDTO.getFileSourceId(); String fileSourceTaskId = fileSourceDTO.getFileSourceTaskId(); - if (fileSourceId != null && fileSourceId > 0) { - if (StringUtils.isBlank(fileSourceTaskId)) { - log.warn("no fileSourceTask executed for fileSourceId:{}, stepInstanceId:{}", fileSourceId, - stepInstanceId); - } else { - fileSourceTaskIdList.add(fileSourceTaskId); - } + if (fileSourceId == null || fileSourceId <= 0) { + continue; } - } - log.debug("FileSourceTaskIds to be cleared:{}", fileSourceTaskIdList); - if (!fileSourceTaskIdList.isEmpty()) { - try { - clearTaskFiles(fileSourceTaskIdList); - } catch (Throwable t) { - log.error("Fail to clearTaskFiles, fileSourceTaskIdList={}", fileSourceTaskIdList, t); + if (StringUtils.isBlank(fileSourceTaskId)) { + log.warn( + "no fileSourceTask executed for fileSourceId:{}, stepInstanceId:{}", + fileSourceId, + stepInstanceId + ); + } else { + fileSourceTaskIdList.add(fileSourceTaskId); } } + return fileSourceTaskIdList; } + /** + * 调用file-gateway接口通知清理某些任务产生的临时文件 + * + * @param taskIdList 任务Id列表 + */ private void clearTaskFiles(List taskIdList) { fileSourceTaskResource.clearTaskFiles(new ClearTaskFilesReq(taskIdList)); } + /** + * 异步监控第三方文件拉取过程 + * + * @param stepInstance 步骤实例 + * @param fileSourceList 源文件数据列表 + * @param batchTaskId 第三方文件拉取任务ID + * @param isForRetry 是否为步骤重试 + * @param resultHandler 结果处理器 + * @return 第三方源文件准备任务 + */ private ThirdFilePrepareTask asyncWatchThirdFilePulling( StepInstanceDTO stepInstance, List fileSourceList, @@ -309,12 +355,24 @@ private ThirdFilePrepareTask asyncWatchThirdFilePulling( isForRetry, new RecordableThirdFilePrepareTaskResultHandler(stepInstance, resultHandler) ); - batchResultHandleTask.initDependentService(fileSourceTaskResource, taskInstanceService, accountService, - hostService, logService, taskExecuteMQEventDispatcher, fileSourceTaskLogDAO); + batchResultHandleTask.initDependentService( + fileSourceTaskResource, taskInstanceService, accountService, + hostService, logService, taskExecuteMQEventDispatcher, fileSourceTaskLogDAO + ); resultHandleManager.handleDeliveredTask(batchResultHandleTask); return batchResultHandleTask; } + /** + * 开始第三方源文件下载任务 + * + * @param username 操作者用户名 + * @param appId Job业务ID + * @param stepInstanceId 步骤实例ID + * @param executeCount 重试次数 + * @param fileSourceTaskList 第三方文件源任务列表 + * @return 任务信息 + */ private BatchTaskInfoDTO startFileSourceDownloadTask(String username, Long appId, Long stepInstanceId, @@ -325,8 +383,10 @@ private BatchTaskInfoDTO startFileSourceDownloadTask(String username, req.setStepInstanceId(stepInstanceId); req.setExecuteCount(executeCount); req.setFileSourceTaskList(fileSourceTaskList); - InternalResponse resp = fileSourceTaskResource.startFileSourceBatchDownloadTask(username, - req); + InternalResponse resp = fileSourceTaskResource.startFileSourceBatchDownloadTask( + username, + req + ); if (resp.isSuccess()) { log.debug("startFileSourceBatchDownloadTask, req={}, resp={}", req, resp); return resp.getData(); @@ -336,6 +396,9 @@ private BatchTaskInfoDTO startFileSourceDownloadTask(String username, } } + /** + * 可记录的第三方源文件准备任务结果处理器 + */ class RecordableThirdFilePrepareTaskResultHandler implements ThirdFilePrepareTaskResultHandler { StepInstanceDTO stepInstance; diff --git a/src/backend/job-execute/service-job-execute/src/test/java/com/tencent/bk/job/execute/engine/util/TaskEvictPolicyTest.java b/src/backend/job-execute/service-job-execute/src/test/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyTest.java similarity index 87% rename from src/backend/job-execute/service-job-execute/src/test/java/com/tencent/bk/job/execute/engine/util/TaskEvictPolicyTest.java rename to src/backend/job-execute/service-job-execute/src/test/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyTest.java index 245591a99a..61575664cf 100644 --- a/src/backend/job-execute/service-job-execute/src/test/java/com/tencent/bk/job/execute/engine/util/TaskEvictPolicyTest.java +++ b/src/backend/job-execute/service-job-execute/src/test/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyTest.java @@ -22,17 +22,16 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.execute.engine.util; +package com.tencent.bk.job.execute.engine.evict; import com.fasterxml.jackson.core.type.TypeReference; import com.tencent.bk.job.common.util.json.JsonUtils; -import com.tencent.bk.job.execute.engine.evict.ComposedTaskEvictPolicy; -import com.tencent.bk.job.execute.engine.evict.ITaskEvictPolicy; import com.tencent.bk.job.execute.model.TaskInstanceDTO; import com.tencent.bk.job.execute.model.inner.AppCodeTaskEvictPolicyDTO; import com.tencent.bk.job.execute.model.inner.AppIdTaskEvictPolicyDTO; import com.tencent.bk.job.execute.model.inner.ComposedTaskEvictPolicyDTO; import com.tencent.bk.job.execute.model.inner.TaskEvictPolicyDTO; +import com.tencent.bk.job.execute.model.inner.TaskInstanceIdEvictPolicyDTO; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -43,11 +42,13 @@ public class TaskEvictPolicyTest { @Test public void testTaskEvictPolicy() { + TaskEvictPolicyDTO taskInstanceIdPolicy = new TaskInstanceIdEvictPolicyDTO(Arrays.asList(1001L, 1002L)); TaskEvictPolicyDTO appIdPolicy = new AppIdTaskEvictPolicyDTO(Arrays.asList(2L, 3L)); TaskEvictPolicyDTO appCodePolicy = new AppCodeTaskEvictPolicyDTO(Arrays.asList("appCode1", "appCode2")); // 测试策略OR组合 ComposedTaskEvictPolicyDTO composedPolicyDTO = new ComposedTaskEvictPolicyDTO( ComposedTaskEvictPolicy.ComposeOperator.OR, + taskInstanceIdPolicy, appIdPolicy, appCodePolicy ); @@ -64,21 +65,28 @@ public void testTaskEvictPolicy() { // 负例 taskInstance.setAppCode("appCode3"); assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(false); + taskInstance.setId(1003L); + taskInstance.setAppId(20L); + taskInstance.setAppCode("appCode3"); + assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(false); // 正例 + taskInstance.setId(1003L); taskInstance.setAppId(2L); taskInstance.setAppCode("appCode3"); assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(true); + taskInstance.setId(1003L); taskInstance.setAppId(20L); taskInstance.setAppCode("appCode2"); assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(true); - // 负例 + taskInstance.setId(1002L); taskInstance.setAppId(20L); - taskInstance.setAppCode("appCode3"); - assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(false); + taskInstance.setAppCode("appCode4"); + assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(true); // 测试策略AND组合 composedPolicyDTO = new ComposedTaskEvictPolicy( ComposedTaskEvictPolicy.ComposeOperator.AND, + taskInstanceIdPolicy, appIdPolicy, appCodePolicy ); @@ -89,13 +97,16 @@ public void testTaskEvictPolicy() { }); composedPolicy = new ComposedTaskEvictPolicy(composedPolicyDTO); // 负例 + taskInstance.setId(1001L); taskInstance.setAppId(2L); taskInstance.setAppCode("appCode3"); assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(false); + taskInstance.setId(1001L); taskInstance.setAppId(20L); taskInstance.setAppCode("appCode2"); assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(false); // 正例 + taskInstance.setId(1001L); taskInstance.setAppId(2L); taskInstance.setAppCode("appCode1"); assertThat(composedPolicy.needToEvict(taskInstance)).isEqualTo(true); diff --git a/src/backend/job-file-gateway/api-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/model/req/inner/FileSourceTaskContent.java b/src/backend/job-file-gateway/api-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/model/req/inner/FileSourceTaskContent.java index cd597dedf6..419b654d62 100644 --- a/src/backend/job-file-gateway/api-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/model/req/inner/FileSourceTaskContent.java +++ b/src/backend/job-file-gateway/api-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/model/req/inner/FileSourceTaskContent.java @@ -31,6 +31,9 @@ import java.util.List; +/** + * 单个文件源的文件任务内容 + */ @NoArgsConstructor @AllArgsConstructor @Data