From f51c7267241427eb271752f1cca3283d499c10ed Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Sat, 30 Dec 2023 21:23:10 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20Job=20=E6=94=AF=E6=8C=81=E5=AE=B9?= =?UTF-8?q?=E5=99=A8=E6=89=A7=E8=A1=8C=20-=20=E8=84=9A=E6=9C=AC=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=20#2631?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bk/job/common/cc/sdk/BizCmdbClient.java | 37 +++ .../i18n/exception/message.properties | 1 + .../i18n/exception/message_en.properties | 1 + .../i18n/exception/message_en_US.properties | 1 + .../i18n/exception/message_zh.properties | 1 + .../i18n/exception/message_zh_CN.properties | 1 + .../bk/job/common/constant/ErrorCode.java | 2 + .../bk/job/common/model/dto/Container.java | 60 ++-- .../bk/job/common/model/dto/HostDTO.java | 7 + .../v2/model/AtomicFileTaskResultContent.java | 2 +- .../execute/engine/model/ExecuteObject.java | 6 +- .../job/execute/model/ExecuteObjectsDTO.java | 10 + .../bk/job/execute/model/StepInstanceDTO.java | 20 ++ .../model/TaskInstanceExecuteObjects.java | 75 +++++ .../job/execute/service/ContainerService.java | 43 +++ .../service/impl/ContainerServiceImpl.java | 101 ++++++ .../service/impl/TaskExecuteServiceImpl.java | 312 ++++++++++++------ 17 files changed, 553 insertions(+), 127 deletions(-) create mode 100644 src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java create mode 100644 src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ContainerService.java create mode 100644 src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ContainerServiceImpl.java diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java index eaaf042947..ac0b6ce8fb 100644 --- a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java @@ -105,6 +105,7 @@ import com.tencent.bk.job.common.model.error.ErrorType; import com.tencent.bk.job.common.util.FlowController; import com.tencent.bk.job.common.util.JobContextUtil; +import com.tencent.bk.job.common.util.PageUtil; import com.tencent.bk.job.common.util.ThreadUtils; import com.tencent.bk.job.common.util.TimeUtil; import com.tencent.bk.job.common.util.Utils; @@ -1439,4 +1440,40 @@ private void setSupplierAccount(EsbReq esbReq) { esbReq.setBkSupplierAccount(cmdbSupplierAccount); } } + + /** + * 根据容器ID批量查询容器 + * + * @param bizId CMDB 业务 ID + * @param containerIds 容器 ID 集合 + * @return 容器列表 + */ + public List listKubeContainerByIds(long bizId, Collection containerIds) { + ListKubeContainerByTopoReq req = makeCmdbBaseReq(ListKubeContainerByTopoReq.class); + + // 查询条件 + req.setBizId(bizId); + PropertyFilterDTO containerFilter = new PropertyFilterDTO(); + containerFilter.setCondition("AND"); + containerFilter.addRule(BaseRuleDTO.in(ContainerFields.ID, containerIds)); + req.setContainerFilter(containerFilter); + + // 返回参数设置 + req.setContainerFields(ContainerFields.FIELDS); + req.setPodFields(PodFields.FIELDS); + + return PageUtil.queryAllWithLoopPageQuery( + 500, + page -> listPageContainersByIds(req, page), + pageData -> pageData.getTotal().intValue(), + PageData::getData, + container -> container + ); + } + + private PageData listPageContainersByIds(ListKubeContainerByTopoReq req, + PageDTO page) { + req.setPage(new Page(page.getStart(), page.getLimit())); + return listKubeContainerByTopo(req); + } } diff --git a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message.properties b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message.properties index 4e4ebc7734..fb8474ef4a 100644 --- a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message.properties +++ b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message.properties @@ -179,6 +179,7 @@ 1244027=滚动批次不能大于{0} 1244028=步骤 [{0}] 的目标主机为空 1244029=步骤 [{0}] 的源文件主机为空 +1244030=作业引用的执行对象不存在。不存在的执行对象个数:{0},执行对象列表[{1}] ## 业务错误-定时任务(job-crontab) 1245006=定时任务执行时间已失效 diff --git a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en.properties b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en.properties index ce7cf7bfb3..d561c09cff 100644 --- a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en.properties +++ b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en.properties @@ -179,6 +179,7 @@ 1244027=Rolling batch can not be greater than {0} 1244028=Step [{0}] target host is empty 1244029=Step [{0}] source host is empty +1244030=Execute object referenced by the job does not exist. Number of non-existent execution objects: {0}, execution object list: [{1}] ## Business error - job-crontab 1245006=Cron job execution time already passed diff --git a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en_US.properties b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en_US.properties index ce7cf7bfb3..d561c09cff 100644 --- a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en_US.properties +++ b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_en_US.properties @@ -179,6 +179,7 @@ 1244027=Rolling batch can not be greater than {0} 1244028=Step [{0}] target host is empty 1244029=Step [{0}] source host is empty +1244030=Execute object referenced by the job does not exist. Number of non-existent execution objects: {0}, execution object list: [{1}] ## Business error - job-crontab 1245006=Cron job execution time already passed diff --git a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh.properties b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh.properties index 4e4ebc7734..fb8474ef4a 100644 --- a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh.properties +++ b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh.properties @@ -179,6 +179,7 @@ 1244027=滚动批次不能大于{0} 1244028=步骤 [{0}] 的目标主机为空 1244029=步骤 [{0}] 的源文件主机为空 +1244030=作业引用的执行对象不存在。不存在的执行对象个数:{0},执行对象列表[{1}] ## 业务错误-定时任务(job-crontab) 1245006=定时任务执行时间已失效 diff --git a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh_CN.properties b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh_CN.properties index b058a7153e..1731625c48 100644 --- a/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh_CN.properties +++ b/src/backend/commons/common-i18n/src/main/resources/i18n/exception/message_zh_CN.properties @@ -180,6 +180,7 @@ 1244027=滚动批次不能大于{0} 1244028=步骤 [{0}] 的目标主机为空 1244029=步骤 [{0}] 的源文件主机为空 +1244030=作业引用的执行对象不存在。不存在的执行对象个数:{0},执行对象列表[{1}] ## 业务错误-定时任务(job-crontab) 1245006=定时任务执行时间已失效 diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ErrorCode.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ErrorCode.java index d4ce0f7006..57f574ee26 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ErrorCode.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ErrorCode.java @@ -338,6 +338,8 @@ public class ErrorCode { public static final int STEP_TARGET_HOST_EMPTY = 1244028; // 步骤:{} 的源文件主机为空 public static final int STEP_SOURCE_HOST_EMPTY = 1244029; + // 执行对象不存在。无效的{0}个执行对象:[{1}] + public static final int EXECUTE_OBJECT_NOT_EXIST = 1244030; // 作业执行 end // 定时作业 start diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/Container.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/Container.java index 9b6a7272f3..fee02d8751 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/Container.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/Container.java @@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.Map; -import java.util.StringJoiner; +import java.util.Objects; /** * 作业执行对象-容器模型 @@ -58,19 +58,22 @@ public class Container implements Cloneable { @JsonProperty("containerId") private String containerId; + /** + * 容器名称 + */ + private String name; /** * 容器所在 Node 对应的主机ID */ - @JsonProperty("hostId") - private Long hostId; + @JsonProperty("nodeHostId") + private Long nodeHostId; /** * 容器所在 Node 对应的 Agent ID */ - @JsonProperty("agentId") - private String agentId; - + @JsonProperty("nodeAgentId") + private String nodeAgentId; /** * 容器所在集群 ID @@ -96,43 +99,54 @@ public class Container implements Cloneable { @JsonProperty("podLabels") private Map podLabels; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Container container = (Container) o; + return id.equals(container.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } @Override @SuppressWarnings("all") public Container clone() { Container clone = new Container(); clone.setId(id); - clone.setHostId(hostId); - clone.setAgentId(agentId); + clone.setNodeHostId(nodeHostId); + clone.setNodeAgentId(nodeAgentId); clone.setContainerId(containerId); clone.setPodLabels(podLabels); clone.setClusterId(clusterId); clone.setNamespace(namespace); clone.setPodName(podName); + clone.setName(name); return clone; } - @Override - public String toString() { - return new StringJoiner(", ", Container.class.getSimpleName() + "[", "]") - .add("id='" + id + "'") - .add("containerId='" + containerId + "'") - .add("hostId=" + hostId) - .add("agentId='" + agentId + "'") - .add("clusterId='" + clusterId + "'") - .add("namespace='" + namespace + "'") - .add("podName='" + podName + "'") - .add("podLabels=" + podLabels) - .toString(); - } - public ContainerVO toContainerVO() { ContainerVO vo = new ContainerVO(); vo.setId(id); + vo.setName(name); vo.setUid(containerId); - vo.setNodeHostId(hostId); + vo.setNodeHostId(nodeHostId); vo.setPodName(podName); vo.setPodLabels(podLabels); return vo; } + + public void updatePropsByContainer(Container container) { + this.containerId = container.getContainerId(); + this.nodeHostId = container.getNodeHostId(); + this.nodeAgentId = container.getNodeAgentId(); + this.clusterId = container.getClusterId(); + this.namespace = container.getNamespace(); + this.podName = container.getPodName(); + this.podLabels = container.getPodLabels(); + this.name = container.getName(); + } } diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java index 798e7f2b1e..c4a08fd673 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/HostDTO.java @@ -130,6 +130,10 @@ public HostDTO(Long bkCloudId, String ip) { this.ip = ip; } + public HostDTO(Long hostId) { + this.hostId = hostId; + } + public HostDTO(Long hostId, Long bkCloudId, String ip) { this.hostId = hostId; this.bkCloudId = bkCloudId; @@ -292,6 +296,9 @@ public String toStringBasic() { } public void updateByHost(HostDTO host) { + if (host == null) { + return; + } this.hostId = host.getHostId(); this.agentId = host.getAgentId(); this.bkCloudId = host.getBkCloudId(); diff --git a/src/backend/commons/gse-sdk/src/main/java/com/tencent/bk/job/common/gse/v2/model/AtomicFileTaskResultContent.java b/src/backend/commons/gse-sdk/src/main/java/com/tencent/bk/job/common/gse/v2/model/AtomicFileTaskResultContent.java index b24dc977e0..5a98ef484a 100644 --- a/src/backend/commons/gse-sdk/src/main/java/com/tencent/bk/job/common/gse/v2/model/AtomicFileTaskResultContent.java +++ b/src/backend/commons/gse-sdk/src/main/java/com/tencent/bk/job/common/gse/v2/model/AtomicFileTaskResultContent.java @@ -245,7 +245,7 @@ public ExecuteObjectGseKey getSourceExecuteObjectGseKey() { // bk_container_id 不为空,说明是容器执行对象 sourceExecuteObjectGseKey = ExecuteObjectGseKey.ofContainer(sourceAgentId, sourceContainerId); } else { - sourceExecuteObjectGseKey = ExecuteObjectGseKey.ofHost(destAgentId); + sourceExecuteObjectGseKey = ExecuteObjectGseKey.ofHost(sourceAgentId); } return sourceExecuteObjectGseKey; } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java index 59116a8178..8aaac28c06 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/model/ExecuteObject.java @@ -137,7 +137,7 @@ public ExecuteObjectGseKey toExecuteObjectGseKey() { if (isHostExecuteObject()) { executeObjectGseKey = ExecuteObjectGseKey.ofHost(host.getAgentId()); } else { - executeObjectGseKey = ExecuteObjectGseKey.ofContainer(container.getAgentId(), container.getContainerId()); + executeObjectGseKey = ExecuteObjectGseKey.ofContainer(container.getNodeAgentId(), container.getContainerId()); } return executeObjectGseKey; } @@ -147,7 +147,7 @@ public boolean isAgentIdEmpty() { if (isHostExecuteObject()) { return StringUtils.isEmpty(getHost().getAgentId()); } else { - return StringUtils.isEmpty(getContainer().getAgentId()); + return StringUtils.isEmpty(getContainer().getNodeAgentId()); } } @@ -156,7 +156,7 @@ public Agent toGseAgent() { if (isHostExecuteObject()) { agent.setAgentId(host.getAgentId()); } else { - agent.setAgentId(container.getAgentId()); + agent.setAgentId(container.getNodeAgentId()); agent.setContainerId(container.getContainerId()); } return agent; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectsDTO.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectsDTO.java index 3e99a70617..63149c69b0 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectsDTO.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/ExecuteObjectsDTO.java @@ -426,4 +426,14 @@ public List getDecorateExecuteObjects() { return Collections.emptyList(); } } + + /** + * 提取所有包含的容器执行对象 + * + * @return 容器执行对象列表 + */ + public List extractContainers() { + // 当前只支持静态选择容器 + return staticContainerList; + } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/StepInstanceDTO.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/StepInstanceDTO.java index bf999e9204..2fbbc32437 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/StepInstanceDTO.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/StepInstanceDTO.java @@ -27,12 +27,15 @@ import com.tencent.bk.job.common.constant.DuplicateHandlerEnum; import com.tencent.bk.job.common.constant.ErrorCode; import com.tencent.bk.job.common.exception.InternalException; +import com.tencent.bk.job.common.model.dto.Container; import com.tencent.bk.job.execute.engine.model.ExecuteObject; import com.tencent.bk.job.manage.common.consts.task.TaskStepTypeEnum; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import org.apache.commons.collections4.CollectionUtils; +import java.util.ArrayList; import java.util.List; /** @@ -253,4 +256,21 @@ public ExecuteObject findExecuteObjectByCompositeKey(ExecuteObjectCompositeKey e throw new InternalException("Not support method invoke for step", ErrorCode.INTERNAL_ERROR); } } + + public List extractStaticContainerList() { + List containers = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(targetExecuteObjects.getStaticContainerList())) { + containers.addAll(targetExecuteObjects.getStaticContainerList()); + } + if (isFileStep()) { + for (FileSourceDTO fileSource : fileSourceList) { + ExecuteObjectsDTO executeObjectsDTO = fileSource.getServers(); + if (executeObjectsDTO != null + && CollectionUtils.isNotEmpty(executeObjectsDTO.getStaticContainerList())) { + containers.addAll(executeObjectsDTO.getStaticContainerList()); + } + } + } + return containers; + } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java new file mode 100644 index 0000000000..ba9fbcd75d --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceExecuteObjects.java @@ -0,0 +1,75 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.model; + +import com.tencent.bk.job.common.model.dto.Container; +import com.tencent.bk.job.common.model.dto.HostDTO; +import lombok.Data; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * 作业实例中包含的执行对象 + */ +@Data +public class TaskInstanceExecuteObjects { + /** + * 当前作业实例是否包含容器执行对象 + */ + private boolean containsAnyContainer; + /** + * 当前作业实例是否包含主机执行对象 + */ + private boolean containsAnyHost; + + /** + * 合法的主机(在当前业务下) + */ + private List validHosts; + /** + * 不存在的主机 + */ + private List notExistHosts; + /** + * 在其他业务下的主机 + */ + private List notInAppHosts; + + /** + * 合法的容器(在当前业务下) + */ + private List validContainers; + /** + * 不存在的容器ID列表 + */ + private Set notExistContainerIds; + /** + * 主机白名单 + * key=hostId, value: 允许的操作列表 + */ + Map> whiteHostAllowActions; +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ContainerService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ContainerService.java new file mode 100644 index 0000000000..f0056ed87e --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ContainerService.java @@ -0,0 +1,43 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.service; + +import com.tencent.bk.job.common.model.dto.Container; + +import java.util.Collection; +import java.util.List; + +/** + * 容器服务 + */ +public interface ContainerService { + /** + * 根据容器 ID 批量查询容器列表 + * + * @param appId Job 业务 ID + * @param ids 容器 ID 列表 + */ + List listContainerByIds(long appId, Collection ids); +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ContainerServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ContainerServiceImpl.java new file mode 100644 index 0000000000..6af1727764 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ContainerServiceImpl.java @@ -0,0 +1,101 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.service.impl; + +import com.tencent.bk.job.common.cc.model.container.ContainerDetailDTO; +import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; +import com.tencent.bk.job.common.model.dto.Container; +import com.tencent.bk.job.common.model.dto.HostDTO; +import com.tencent.bk.job.common.model.dto.ResourceScope; +import com.tencent.bk.job.common.service.AppScopeMappingService; +import com.tencent.bk.job.execute.service.ContainerService; +import com.tencent.bk.job.execute.service.HostService; +import com.tencent.bk.job.manage.model.inner.ServiceListAppHostResultDTO; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class ContainerServiceImpl implements ContainerService { + private final HostService hostService; + private final BizCmdbClient cmdbClient; + private final AppScopeMappingService appScopeMappingService; + + @Autowired + public ContainerServiceImpl(HostService hostService, BizCmdbClient cmdbClient, + AppScopeMappingService appScopeMappingService) { + this.hostService = hostService; + this.cmdbClient = cmdbClient; + this.appScopeMappingService = appScopeMappingService; + } + + @Override + public List listContainerByIds(long appId, Collection ids) { + ResourceScope resourceScope = appScopeMappingService.getScopeByAppId(appId); + List containerDetailList = + cmdbClient.listKubeContainerByIds(Long.parseLong(resourceScope.getId()), ids); + if (CollectionUtils.isEmpty(containerDetailList)) { + return Collections.emptyList(); + } + List containers = containerDetailList.stream() + .map(this::toContainer).collect(Collectors.toList()); + + ServiceListAppHostResultDTO hostResult = + hostService.batchGetAppHosts( + appId, + containers.stream() + .map(container -> new HostDTO(container.getNodeHostId())) + .collect(Collectors.toList()), + false); + Map hostMap = hostResult.getValidHosts().stream() + .collect(Collectors.toMap(HostDTO::getHostId, host -> host)); + + containers.forEach(container -> { + HostDTO nodeHost = hostMap.get(container.getNodeHostId()); + container.setNodeAgentId(nodeHost.getAgentId()); + }); + + return containers; + } + + private Container toContainer(ContainerDetailDTO containerDetailDTO) { + Container container = new Container(); + container.setId(containerDetailDTO.getContainer().getId()); + container.setContainerId(containerDetailDTO.getContainer().getContainerUID()); + container.setName(containerDetailDTO.getContainer().getName()); + container.setPodName(containerDetailDTO.getPod().getName()); + container.setPodLabels(containerDetailDTO.getPod().getLabels()); + container.setNodeHostId(containerDetailDTO.getTopo().getHostId()); + return container; + } +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java index f2cea237b2..4f25189e0b 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java @@ -51,6 +51,7 @@ import com.tencent.bk.job.common.iam.model.AuthResult; import com.tencent.bk.job.common.model.InternalResponse; import com.tencent.bk.job.common.model.dto.AppResourceScope; +import com.tencent.bk.job.common.model.dto.Container; import com.tencent.bk.job.common.model.dto.HostDTO; import com.tencent.bk.job.common.service.AppScopeMappingService; import com.tencent.bk.job.common.service.feature.strategy.JobInstanceAttrToggleStrategy; @@ -79,6 +80,7 @@ import com.tencent.bk.job.execute.engine.listener.event.JobEvent; import com.tencent.bk.job.execute.engine.listener.event.StepEvent; import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher; +import com.tencent.bk.job.execute.engine.model.ExecuteObject; import com.tencent.bk.job.execute.engine.model.TaskVariableDTO; import com.tencent.bk.job.execute.engine.util.TimeoutUtils; import com.tencent.bk.job.execute.model.AccountDTO; @@ -96,7 +98,9 @@ import com.tencent.bk.job.execute.model.TaskExecuteParam; import com.tencent.bk.job.execute.model.TaskInfo; import com.tencent.bk.job.execute.model.TaskInstanceDTO; +import com.tencent.bk.job.execute.model.TaskInstanceExecuteObjects; import com.tencent.bk.job.execute.service.AccountService; +import com.tencent.bk.job.execute.service.ContainerService; import com.tencent.bk.job.execute.service.DangerousScriptCheckService; import com.tencent.bk.job.execute.service.HostService; import com.tencent.bk.job.execute.service.RollingConfigService; @@ -180,6 +184,7 @@ public class TaskExecuteServiceImpl implements TaskExecuteService { private final AppScopeMappingService appScopeMappingService; private final WhiteHostCache whiteHostCache; private final ServiceTaskTemplateResource taskTemplateResource; + private final ContainerService containerService; private static final Logger TASK_MONITOR_LOGGER = LoggerFactory.TASK_MONITOR_LOGGER; @@ -203,7 +208,8 @@ public TaskExecuteServiceImpl(AccountService accountService, RollingConfigService rollingConfigService, AppScopeMappingService appScopeMappingService, WhiteHostCache whiteHostCache, - ServiceTaskTemplateResource taskTemplateResource) { + ServiceTaskTemplateResource taskTemplateResource, + ContainerService containerService) { this.accountService = accountService; this.taskInstanceService = taskInstanceService; this.taskExecuteMQEventDispatcher = taskExecuteMQEventDispatcher; @@ -223,6 +229,7 @@ public TaskExecuteServiceImpl(AccountService accountService, this.appScopeMappingService = appScopeMappingService; this.whiteHostCache = whiteHostCache; this.taskTemplateResource = taskTemplateResource; + this.containerService = containerService; } @Override @@ -284,30 +291,37 @@ private TaskInstanceDTO executeFastTaskInternal(FastTaskDTO fastTask) { watch.stop(); // 获取执行对象 - watch.start("acquireAndSetHosts"); - ServiceListAppHostResultDTO hosts = - acquireAndSetHosts(taskInstance, Collections.singletonList(stepInstance), null); + watch.start("acquireAndSetExecuteObjects"); + TaskInstanceExecuteObjects taskInstanceExecuteObjects = new TaskInstanceExecuteObjects(); + acquireAndSetHosts(taskInstanceExecuteObjects, taskInstance, Collections.singletonList(stepInstance), null); + acquireAndSetContainers(taskInstanceExecuteObjects, taskInstance, Collections.singletonList(stepInstance)); + checkExecuteObjectExist(taskInstanceExecuteObjects); watch.stop(); - // 获取主机白名单 - watch.start("getHostAllowedActions"); - Map> hostAllowActions = getHostAllowedActions(taskInstance.getAppId(), - ListUtil.union(hosts.getValidHosts(), hosts.getNotInAppHosts())); - watch.stop(); + // 如果包含主机执行对象,需要获取主机白名单 + if (taskInstanceExecuteObjects.isContainsAnyHost()) { + watch.start("getHostAllowedActions"); + taskInstanceExecuteObjects.setWhiteHostAllowActions( + getHostAllowedActions( + taskInstance.getAppId(), + ListUtil.union(taskInstanceExecuteObjects.getValidHosts(), + taskInstanceExecuteObjects.getNotInAppHosts()))); + watch.stop(); + } - //检查执行对象 + //检查执行对象是否可用 watch.start("checkExecuteObject"); - checkExecuteObject(Collections.singletonList(stepInstance), hosts, hostAllowActions); + checkExecuteObjectAccessible(Collections.singletonList(stepInstance), taskInstanceExecuteObjects); watch.stop(); - // 检查步骤约束 - watch.start("checkStepInstanceConstraint"); - checkStepInstanceConstraint(taskInstance, Collections.singletonList(stepInstance)); + // 检查步骤 + watch.start("checkStepInstance"); + checkStepInstance(taskInstance, Collections.singletonList(stepInstance)); watch.stop(); // 鉴权 watch.start("authFastExecute"); - authFastExecute(taskInstance, stepInstance, hostAllowActions); + authFastExecute(taskInstance, stepInstance, taskInstanceExecuteObjects); watch.stop(); // 保存作业 @@ -679,8 +693,8 @@ private AuthResult authExecuteScript(TaskInstanceDTO taskInstance, return AuthResult.fail(); } - AuthResult accountAuthResult = executeAuthService.authAccountExecutable(username, new AppResourceScope(appId) - , accountId); + AuthResult accountAuthResult = executeAuthService.authAccountExecutable(username, + new AppResourceScope(appId), accountId); AuthResult serverAuthResult; ExecuteObjectsDTO servers = stepInstance.getTargetExecuteObjects().clone(); @@ -692,7 +706,6 @@ private AuthResult authExecuteScript(TaskInstanceDTO taskInstance, ScriptSourceEnum scriptSource = ScriptSourceEnum.getScriptSourceEnum(stepInstance.getScriptSource()); if (scriptSource == ScriptSourceEnum.CUSTOM) { // 快速执行脚本鉴权 - serverAuthResult = executeAuthService.authFastExecuteScript( username, new AppResourceScope(appId), servers); } else if (scriptSource == ScriptSourceEnum.QUOTED_APP) { @@ -768,9 +781,76 @@ private AuthResult authFileTransfer(TaskInstanceDTO taskInstance, return accountAuthResult.mergeAuthResult(serverAuthResult); } - private ServiceListAppHostResultDTO acquireAndSetHosts(TaskInstanceDTO taskInstance, - List stepInstances, - Collection variables) { + private void acquireAndSetContainers(TaskInstanceExecuteObjects taskInstanceExecuteObjects, + TaskInstanceDTO taskInstance, + List stepInstances) { + + Set queryContainerIds = new HashSet<>(); + for (StepInstanceDTO stepInstance : stepInstances) { + queryContainerIds.addAll( + stepInstance.extractStaticContainerList().stream() + .map(Container::getId) + .collect(Collectors.toList())); + } + if (CollectionUtils.isEmpty(queryContainerIds)) { + return; + } + + taskInstanceExecuteObjects.setContainsAnyContainer(true); + + List containers = containerService.listContainerByIds( + taskInstance.getAppId(), queryContainerIds); + + fillTaskInstanceContainerDetail(taskInstanceExecuteObjects, stepInstances, + containers.stream().collect(Collectors.toMap(Container::getId, container -> container))); + } + + private void fillTaskInstanceContainerDetail(TaskInstanceExecuteObjects taskInstanceExecuteObjects, + List stepInstanceList, + Map containerMap) { + Set notExistContainerIds = new HashSet<>(); + taskInstanceExecuteObjects.setNotExistContainerIds(notExistContainerIds); + + for (StepInstanceDTO stepInstance : stepInstanceList) { + if (!isStepContainsExecuteObject(stepInstance)) { + continue; + } + if (CollectionUtils.isNotEmpty(stepInstance.getTargetExecuteObjects().getStaticContainerList())) { + stepInstance.getTargetExecuteObjects().getStaticContainerList() + .forEach(container -> { + Container containDetail = containerMap.get(container.getId()); + if (containDetail == null) { + notExistContainerIds.add(container.getId()); + return; + } + container.updatePropsByContainer(containDetail); + }); + } + if (stepInstance.isFileStep()) { + for (FileSourceDTO fileSource : stepInstance.getFileSourceList()) { + ExecuteObjectsDTO executeObjectsDTO = fileSource.getServers(); + if (executeObjectsDTO != null + && CollectionUtils.isNotEmpty(executeObjectsDTO.getStaticContainerList())) { + executeObjectsDTO.getStaticContainerList() + .forEach(container -> { + Container containDetail = containerMap.get(container.getId()); + if (containDetail == null) { + notExistContainerIds.add(container.getId()); + return; + } + container.updatePropsByContainer(containDetail); + }); + } + } + } + } + + } + + private void acquireAndSetHosts(TaskInstanceExecuteObjects taskInstanceExecuteObjects, + TaskInstanceDTO taskInstance, + List stepInstances, + Collection variables) { StopWatch watch = new StopWatch("AcquireAndSetHosts"); try { long appId = taskInstance.getAppId(); @@ -799,24 +879,23 @@ private ServiceListAppHostResultDTO acquireAndSetHosts(TaskInstanceDTO taskInsta watch.stop(); if (CollectionUtils.isEmpty(queryHosts)) { - return ServiceListAppHostResultDTO.EMPTY; + return; } + taskInstanceExecuteObjects.setContainsAnyHost(true); + watch.start("batchGetAppHosts"); ServiceListAppHostResultDTO queryHostsResult = hostService.batchGetAppHosts(appId, queryHosts, needRefreshHostBkAgentId(taskInstance)); watch.stop(); - if (CollectionUtils.isNotEmpty(queryHostsResult.getNotExistHosts())) { - // 如果主机在cmdb不存在,直接报错 - throwHostInvalidException(queryHostsResult.getNotExistHosts()); - } + taskInstanceExecuteObjects.setValidHosts(queryHostsResult.getValidHosts()); + taskInstanceExecuteObjects.setNotExistHosts(queryHostsResult.getNotExistHosts()); + taskInstanceExecuteObjects.setNotInAppHosts(queryHostsResult.getNotInAppHosts()); watch.start("fillTaskInstanceHostDetail"); - fillTaskInstanceHostDetail(taskInstance, stepInstances, variables, queryHostsResult); + fillTaskInstanceHostDetail(taskInstance, stepInstances, variables, taskInstanceExecuteObjects); watch.stop(); - - return queryHostsResult; } finally { if (watch.isRunning()) { watch.stop(); @@ -906,50 +985,40 @@ private boolean needRefreshHostBkAgentId(TaskInstanceDTO taskInstance) { || StringUtils.equals(taskInstance.getAppCode(), "bk_nodeman")); } - private void checkExecuteObject(List stepInstances, - ServiceListAppHostResultDTO hosts, - Map> hostAllowActions) { - // 检查步骤引用的主机不为空 - stepInstances.forEach(this::checkStepInstanceHostNonEmpty); - - // 判断主机是否可以被当前作业使用 - checkExecuteObjectAccessible(stepInstances, hosts.getNotInAppHosts(), hostAllowActions); - } - /** * 判断执行对象是否可以被当前作业使用 * - * @param stepInstanceList 作业步骤列表 - * @param notInAppHosts 作业中包含的不在当前业务下的主机 - * @param whileHostAllowActions 主机白名单 + * @param stepInstanceList 作业步骤列表 + * @param taskInstanceExecuteObjects 作业实例中包含的执行对象 */ private void checkExecuteObjectAccessible(List stepInstanceList, - List notInAppHosts, - Map> whileHostAllowActions) { - if (CollectionUtils.isEmpty(notInAppHosts)) { + TaskInstanceExecuteObjects taskInstanceExecuteObjects) { + if (CollectionUtils.isEmpty(taskInstanceExecuteObjects.getNotInAppHosts())) { return; } + Map> whileHostAllowActions = taskInstanceExecuteObjects.getWhiteHostAllowActions(); log.info("Contains hosts not in app, check white host config. notInAppHosts: {}, whileHostAllowActions: {}", - notInAppHosts, whileHostAllowActions); - Map notInAppHostMap = notInAppHosts.stream() + taskInstanceExecuteObjects.getNotInAppHosts(), whileHostAllowActions); + Map notInAppHostMap = taskInstanceExecuteObjects.getNotInAppHosts().stream() .collect(Collectors.toMap(HostDTO::getHostId, host -> host)); // 非法的主机 Set invalidHosts = new HashSet<>(); for (StepInstanceDTO stepInstance : stepInstanceList) { - if (!isStepContainsHostProps(stepInstance)) { + if (!isStepContainsExecuteObject(stepInstance)) { continue; } TaskStepTypeEnum stepType = stepInstance.getStepType(); // 检查目标主机 - stepInstance.getTargetExecuteObjects().getIpList().forEach(host -> { - if (isHostUnAccessible(stepType, host, notInAppHostMap, whileHostAllowActions)) { - invalidHosts.add(host); - } - }); + stepInstance.getTargetExecuteObjects().getDecorateExecuteObjects().stream() + .filter(ExecuteObject::isHostExecuteObject) + .forEach(executeObject -> { + if (isHostUnAccessible(stepType, executeObject.getHost(), notInAppHostMap, whileHostAllowActions)) { + invalidHosts.add(executeObject.getHost()); + } + }); // 如果是文件分发任务,检查文件源 - checkFileSourceHostAccessible(invalidHosts, stepInstance, stepType, notInAppHostMap, - whileHostAllowActions); + checkFileSourceHostAccessible(invalidHosts, stepInstance, stepType, notInAppHostMap, whileHostAllowActions); } if (CollectionUtils.isNotEmpty(invalidHosts)) { @@ -975,14 +1044,17 @@ private void checkFileSourceHostAccessible(Set invalidHosts, // 远程文件分发需要校验文件源主机;其他类型不需要 if (fileSource.getFileType().equals(TaskFileTypeEnum.SERVER.getType())) { ExecuteObjectsDTO servers = fileSource.getServers(); - if (servers == null || CollectionUtils.isEmpty(servers.getIpList())) { + if (servers == null || CollectionUtils.isEmpty(servers.getDecorateExecuteObjects())) { continue; } - servers.getIpList().forEach(host -> { - if (isHostUnAccessible(stepType, host, notInAppHostMap, whileHostAllowActions)) { - invalidHosts.add(host); - } - }); + servers.getDecorateExecuteObjects().stream() + .filter(ExecuteObject::isHostExecuteObject) + .forEach(executeObject -> { + if (isHostUnAccessible(stepType, executeObject.getHost(), + notInAppHostMap, whileHostAllowActions)) { + invalidHosts.add(executeObject.getHost()); + } + }); } } } @@ -1076,7 +1148,7 @@ private void setHostsForTopoNode(ExecuteObjectsDTO servers, } private void checkStepInstanceHostNonEmpty(StepInstanceDTO stepInstance) { - if (!isStepContainsHostProps(stepInstance)) { + if (!isStepContainsExecuteObject(stepInstance)) { return; } ExecuteObjectsDTO targetServers = stepInstance.getTargetExecuteObjects(); @@ -1101,34 +1173,34 @@ private void checkStepInstanceHostNonEmpty(StepInstanceDTO stepInstance) { } } - private boolean isStepContainsHostProps(StepInstanceBaseDTO stepInstance) { - // 判断步骤是否包含主机信息 + private boolean isStepContainsExecuteObject(StepInstanceBaseDTO stepInstance) { + // 判断步骤是否包含执行对象 return !stepInstance.getExecuteType().equals(MANUAL_CONFIRM.getValue()); } private void fillTaskInstanceHostDetail(TaskInstanceDTO taskInstance, List stepInstanceList, Collection variables, - ServiceListAppHostResultDTO hosts) { + TaskInstanceExecuteObjects taskInstanceExecuteObjects) { - fillHostAgent(taskInstance, hosts); + fillHostAgent(taskInstance, taskInstanceExecuteObjects); Map hostMap = new HashMap<>(); - if (CollectionUtils.isNotEmpty(hosts.getValidHosts())) { - hosts.getValidHosts().forEach(host -> { + if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getValidHosts())) { + taskInstanceExecuteObjects.getValidHosts().forEach(host -> { hostMap.put("hostId:" + host.getHostId(), host); hostMap.put("hostIp:" + host.toCloudIp(), host); }); } - if (CollectionUtils.isNotEmpty(hosts.getNotInAppHosts())) { - hosts.getNotInAppHosts().forEach(host -> { + if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotInAppHosts())) { + taskInstanceExecuteObjects.getNotInAppHosts().forEach(host -> { hostMap.put("hostId:" + host.getHostId(), host); hostMap.put("hostIp:" + host.toCloudIp(), host); }); } for (StepInstanceDTO stepInstance : stepInstanceList) { - if (!isStepContainsHostProps(stepInstance)) { + if (!isStepContainsExecuteObject(stepInstance)) { continue; } // 目标主机设置主机详情 @@ -1146,21 +1218,23 @@ private void fillTaskInstanceHostDetail(TaskInstanceDTO taskInstance, } } - private void fillHostAgent(TaskInstanceDTO taskInstance, ServiceListAppHostResultDTO hosts) { + private void fillHostAgent(TaskInstanceDTO taskInstance, TaskInstanceExecuteObjects taskInstanceExecuteObjects) { boolean isUsingGseV2 = isUsingGseV2(taskInstance, - ListUtil.union(hosts.getValidHosts(), hosts.getNotInAppHosts())); + ListUtil.union(taskInstanceExecuteObjects.getValidHosts(), taskInstanceExecuteObjects.getNotInAppHosts())); /* * 后续下发任务给GSE会根据agentId路由请求到GSE1.0/2.0。如果要使用GSE2.0,那么直接使用原始bk_agent_id;如果要使用GSE1.0, * 按照{云区域ID:ip}的方式构造agent_id */ Set invalidAgentIdHosts = new HashSet<>(); - if (CollectionUtils.isNotEmpty(hosts.getValidHosts())) { - hosts.getValidHosts().forEach(host -> setHostAgentId(isUsingGseV2, host, invalidAgentIdHosts)); + if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getValidHosts())) { + taskInstanceExecuteObjects.getValidHosts() + .forEach(host -> setHostAgentId(isUsingGseV2, host, invalidAgentIdHosts)); } - if (CollectionUtils.isNotEmpty(hosts.getNotInAppHosts())) { - hosts.getNotInAppHosts().forEach(host -> setHostAgentId(isUsingGseV2, host, invalidAgentIdHosts)); + if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotInAppHosts())) { + taskInstanceExecuteObjects.getNotInAppHosts() + .forEach(host -> setHostAgentId(isUsingGseV2, host, invalidAgentIdHosts)); } if (CollectionUtils.isNotEmpty(invalidAgentIdHosts)) { @@ -1169,8 +1243,8 @@ private void fillHostAgent(TaskInstanceDTO taskInstance, ServiceListAppHostResul taskInstance.getAppId(), isUsingGseV2, invalidAgentIdHosts); } - setAgentStatus(hosts.getValidHosts()); - setAgentStatus(hosts.getNotInAppHosts()); + setAgentStatus(taskInstanceExecuteObjects.getValidHosts()); + setAgentStatus(taskInstanceExecuteObjects.getNotInAppHosts()); } private void setHostAgentId(boolean isUsingGseV2, HostDTO host, Set invalidAgentIdHosts) { @@ -1229,7 +1303,7 @@ private Set extractHosts(List stepInstanceList, Collection variables) { Set hosts = new HashSet<>(); for (StepInstanceDTO stepInstance : stepInstanceList) { - if (!isStepContainsHostProps(stepInstance)) { + if (!isStepContainsExecuteObject(stepInstance)) { continue; } if (stepInstance.getTargetExecuteObjects() != null) { @@ -1255,6 +1329,26 @@ private Set extractHosts(List stepInstanceList, return hosts; } + private void checkExecuteObjectExist(TaskInstanceExecuteObjects taskInstanceExecuteObjects) { + List notExistExecuteObjectList = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotExistHosts())) { + notExistExecuteObjectList.addAll(taskInstanceExecuteObjects.getNotExistHosts().stream() + .map(this::printHostIdOrIp).collect(Collectors.toList())); + } + if (CollectionUtils.isNotEmpty(taskInstanceExecuteObjects.getNotExistContainerIds())) { + notExistExecuteObjectList.addAll( + taskInstanceExecuteObjects.getNotExistContainerIds().stream() + .map(containerId -> "(container_id:" + containerId + ")") + .collect(Collectors.toList())); + } + if (CollectionUtils.isNotEmpty(notExistExecuteObjectList)) { + String executeObjectStr = StringUtils.join(notExistExecuteObjectList, ","); + log.warn("The following execute object are not exist, notExistExecuteObjectList={}", + notExistExecuteObjectList); + throw new FailedPreconditionException(ErrorCode.EXECUTE_OBJECT_NOT_EXIST, + new Object[]{notExistExecuteObjectList.size(), executeObjectStr}); + } + } private void throwHostInvalidException(Collection invalidHosts) { String hostListStr = StringUtils.join(invalidHosts.stream() @@ -1272,7 +1366,15 @@ private String printHostIdOrIp(HostDTO host) { } } - private void checkStepInstanceConstraint(TaskInstanceDTO taskInstance, List stepInstanceList) { + private void checkStepInstance(TaskInstanceDTO taskInstance, List stepInstanceList) { + // 检查步骤引用的主机不为空 + stepInstanceList.forEach(this::checkStepInstanceHostNonEmpty); + // 检查步骤的GSE原子任务上限 + checkStepInstanceAtomicTasksLimit(taskInstance, stepInstanceList); + } + + private void checkStepInstanceAtomicTasksLimit(TaskInstanceDTO taskInstance, + List stepInstanceList) { String appCode = taskInstance.getAppCode(); Long appId = taskInstance.getAppId(); String taskName = taskInstance.getName(); @@ -1400,26 +1502,34 @@ private TaskInstanceDTO executeJobPlanInternal(StopWatch watch, TaskExecuteParam batchCheckScriptMatchDangerousRule(taskInstance, stepInstanceList); watch.stop(); - // 获取主机列表 - watch.start("acquireAndSetHosts"); - ServiceListAppHostResultDTO hosts = - acquireAndSetHosts(taskInstance, stepInstanceList, finalVariableValueMap.values()); + // 获取执行对象 + watch.start("acquireAndSetExecuteObjects"); + TaskInstanceExecuteObjects taskInstanceExecuteObjects = new TaskInstanceExecuteObjects(); + acquireAndSetHosts(taskInstanceExecuteObjects, taskInstance, stepInstanceList, + finalVariableValueMap.values()); + acquireAndSetContainers(taskInstanceExecuteObjects, taskInstance, stepInstanceList); + checkExecuteObjectExist(taskInstanceExecuteObjects); watch.stop(); - // 获取主机白名单 - watch.start("getHostAllowedActions"); - Map> hostAllowActions = getHostAllowedActions(taskInstance.getAppId(), - ListUtil.union(hosts.getValidHosts(), hosts.getNotInAppHosts())); - watch.stop(); + // 如果包含主机执行对象,需要获取主机白名单 + if (taskInstanceExecuteObjects.isContainsAnyHost()) { + watch.start("getHostAllowedActions"); + taskInstanceExecuteObjects.setWhiteHostAllowActions( + getHostAllowedActions( + taskInstance.getAppId(), + ListUtil.union(taskInstanceExecuteObjects.getValidHosts(), + taskInstanceExecuteObjects.getNotInAppHosts()))); + watch.stop(); + } - //检查主机 + //检查执行对象是否可用 watch.start("checkExecuteObject"); - checkExecuteObject(stepInstanceList, hosts, hostAllowActions); + checkExecuteObjectAccessible(stepInstanceList, taskInstanceExecuteObjects); watch.stop(); - // 检查步骤约束 - watch.start("checkStepInstanceConstraint"); - checkStepInstanceConstraint(taskInstance, stepInstanceList); + // 检查步骤 + watch.start("checkStepInstance"); + checkStepInstance(taskInstance, stepInstanceList); watch.stop(); if (!executeParam.isSkipAuth()) { @@ -1795,9 +1905,11 @@ public TaskInstanceDTO createTaskInstanceForRedo(Long appId, Long taskInstanceId // 检查高危脚本 batchCheckScriptMatchDangerousRule(taskInstance, stepInstanceList); + // 获取并设置执行对象 + TaskInstanceExecuteObjects taskInstanceExecuteObjects = new TaskInstanceExecuteObjects(); // 获取主机列表 - ServiceListAppHostResultDTO hosts = acquireAndSetHosts(taskInstance, stepInstanceList, - finalVariableValueMap.values()); + acquireAndSetHosts(taskInstanceExecuteObjects, taskInstance, + stepInstanceList, finalVariableValueMap.values()); // 获取主机白名单 Map> hostAllowActions = getHostAllowedActions(taskInstance.getAppId(), @@ -1806,8 +1918,8 @@ public TaskInstanceDTO createTaskInstanceForRedo(Long appId, Long taskInstanceId // 检查主机合法性 checkExecuteObject(stepInstanceList, hosts, hostAllowActions); - // 检查步骤约束 - checkStepInstanceConstraint(taskInstance, stepInstanceList); + // 检查步骤 + checkStepInstance(taskInstance, stepInstanceList); authRedoJob(operator, appId, originTaskInstance, hostAllowActions); @@ -2142,7 +2254,7 @@ private void parseFileStepInstanceFromStepInstance(StepInstanceDTO stepInstance, } ExecuteObjectsDTO targetServers = buildFinalTargetServers(originStepInstance.getTargetExecuteObjects(), -variableValueMap); + variableValueMap); stepInstance.setTargetExecuteObjects(targetServers); }