Skip to content

Commit

Permalink
feature: Job 支持容器执行 - 脚本任务 TencentBlueKing#2631
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Dec 25, 2023
1 parent 06aefab commit 21579f2
Show file tree
Hide file tree
Showing 79 changed files with 1,940 additions and 1,383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public NotImplementedException(String message, Throwable cause, Integer errorCod
}

public NotImplementedException(String message, Throwable cause, Integer errorCode,
Object[] errorParams) {
Object[] errorParams) {
super(message, cause, ErrorType.UNIMPLEMENTED, errorCode, errorParams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ default List<Agent> buildAgents(Collection<String> agentIds, String user, String
.collect(Collectors.toList());
}

/**
* 设置 Agent 认证信息
*
* @param agents agent列表
* @param user 用户
* @param password 密码
* @return Agent
*/
default List<Agent> fillAgentAuthInfo(Collection<Agent> agents, String user, String password) {
return agents.stream().peek(agent -> {
agent.setUser(user);
agent.setPwd(password);
}).collect(Collectors.toList());
}

/**
* 构建目标Agent
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.tencent.bk.job.common.gse.v2.model.GetExecuteScriptResultRequest;
import com.tencent.bk.job.common.gse.v2.model.GetTransferFileResultRequest;
import com.tencent.bk.job.common.gse.v2.model.GseTaskResponse;
import com.tencent.bk.job.common.gse.v2.model.ScriptAgentTaskResult;
import com.tencent.bk.job.common.gse.v2.model.ScriptExecuteObjectTaskResult;
import com.tencent.bk.job.common.gse.v2.model.ScriptTaskResult;
import com.tencent.bk.job.common.gse.v2.model.SourceFile;
import com.tencent.bk.job.common.gse.v2.model.TargetFile;
Expand Down Expand Up @@ -283,10 +283,10 @@ private static api_host convertToApiHost(String cloudIp) {
private ScriptTaskResult toScriptTaskResult(api_task_detail_result resultV1) {
ScriptTaskResult result = new ScriptTaskResult();
if (CollectionUtils.isNotEmpty(resultV1.getResult())) {
List<ScriptAgentTaskResult> agentTaskResults =
List<ScriptExecuteObjectTaskResult> agentTaskResults =
resultV1.getResult().stream()
.map(agentTaskResultV1 -> {
ScriptAgentTaskResult agentTaskResult = new ScriptAgentTaskResult();
ScriptExecuteObjectTaskResult agentTaskResult = new ScriptExecuteObjectTaskResult();
agentTaskResult.setAgentId(agentTaskResultV1.getGse_composite_id() + ":"
+ agentTaskResultV1.getIp());
agentTaskResult.setAtomicTaskId(agentTaskResultV1.getAtomic_task_id());
Expand Down Expand Up @@ -439,8 +439,9 @@ private List<api_copy_fileinfoV2> toV1CopyFileInfoRequest(TransferFileRequest re
@Override
public FileTaskResult getTransferFileResult(GetTransferFileResultRequest request) {
api_map_rsp rsp;
if (CollectionUtils.isNotEmpty(request.getAgentIds())) {
rsp = pullCopyFileResult(request.getTaskId(), request.getAgentIds());
if (CollectionUtils.isNotEmpty(request.getAgents())) {
rsp = pullCopyFileResult(request.getTaskId(),
request.getAgents().stream().map(Agent::getAgentId).collect(Collectors.toList()));
} else if (StringUtils.isNotBlank(request.getTaskId())) {
rsp = pullCopyFileResult(request.getTaskId());
} else {
Expand Down Expand Up @@ -622,11 +623,11 @@ private api_stop_task_request toV1StopTaskRequest(TerminateGseTaskRequest reques
GseTaskTypeEnum taskType) {
api_stop_task_request stopRequest = new api_stop_task_request();
stopRequest.setStop_task_id(request.getTaskId());
if (CollectionUtils.isNotEmpty(request.getAgentIds())) {
stopRequest.setAgents(request.getAgentIds().stream()
.map(agentId ->
if (CollectionUtils.isNotEmpty(request.getAgents())) {
stopRequest.setAgents(request.getAgents().stream()
.map(agent ->
// 终止任务并不不需要账号密码,此处传入为了绕过thrift协议的校验
buildAgent(agentId, buildEmptyApiAuth()))
buildAgent(agent.getAgentId(), buildEmptyApiAuth()))
.collect(Collectors.toList()));
}
stopRequest.setType(taskType.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
@Slf4j
public class GseV2ApiClient extends AbstractBkApiClient implements IGseClient {

private static final String URI_ASYNC_EXECUTE_SCRIPT = "/api/v2/task/async_execute_script";
private static final String URI_GET_EXECUTE_SCRIPT_RESULT = "/api/v2/task/get_execute_script_result";
private static final String URI_LIST_AGENT_STATE = "/api/v2/cluster/list_agent_state";
private static final String URI_ASYNC_TRANSFER_FILE = "/api/v2/task/async_transfer_file";
private static final String URI_GET_TRANSFER_FILE_RESULT = "/api/v2/task/async/get_transfer_file_result";
private static final String URI_ASYNC_EXECUTE_SCRIPT = "/api/v2/task/extensions/async_execute_script";
private static final String URI_GET_EXECUTE_SCRIPT_RESULT = "/api/v2/task/extensions/get_execute_script_result";
private static final String URI_ASYNC_TRANSFER_FILE = "/api/v2/task/extensions/async_push_file";
private static final String URI_GET_TRANSFER_FILE_RESULT = "/api/v2/task/extensions/get_transfer_file_result";
private final BkApiAuthorization gseBkApiAuthorization;

public GseV2ApiClient(MeterRegistry meterRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ public class Agent {
@JsonProperty("bk_agent_id")
private String agentId;

/**
* 目标容器 ID, 空则为主机
*/
@JsonProperty("bk_container_id")
private String containerId;

/**
* 执行账号名
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.bk.job.common.gse.v2.model;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.tencent.bk.job.common.gse.constants.FileDistModeEnum;
import com.tencent.bk.job.common.gse.constants.FileTaskTypeEnum;
Expand Down Expand Up @@ -30,6 +31,12 @@ public class AtomicFileTaskResultContent {
@JsonProperty("source_agent_id")
private String sourceAgentId;

/**
* 文件源 container id
*/
@JsonProperty("source_container_id")
private String sourceContainerId;

/**
* 源文件目录,下发任务时指定的源文件路径
*/
Expand All @@ -43,11 +50,17 @@ public class AtomicFileTaskResultContent {
private String sourceFileName;

/**
* 分发目标主机 agent id
* 分发目标 agent id
*/
@JsonProperty("dest_agent_id")
private String destAgentId;

/**
* 分发目标容器 container id
*/
@JsonProperty("dest_container_id")
private String destContainerId;

/**
* 目标目录
*/
Expand Down Expand Up @@ -130,6 +143,10 @@ public class AtomicFileTaskResultContent {
*/
private FileTaskTypeEnum taskType;

@JsonIgnore
private ExecuteObjectGseKey sourceExecuteObjectGseKey;
@JsonIgnore
private ExecuteObjectGseKey destExecuteObjectGseKey;

public boolean isDownloadMode() {
return FileDistModeEnum.DOWNLOAD.getValue().equals(this.mode);
Expand Down Expand Up @@ -162,20 +179,25 @@ public String getStandardDestFilePath() {

public String getTaskId() {
if (taskId == null) {
this.taskId = buildTaskId(mode, sourceAgentId, getStandardSourceFilePath(), destAgentId,
getStandardDestFilePath());
this.taskId = buildTaskId(mode, getSourceExecuteObjectGseKey(), getStandardSourceFilePath(),
getDestExecuteObjectGseKey(), getStandardDestFilePath());
}
return taskId;
}

public static String buildTaskId(Integer mode, String sourceAgentId, String sourceFilePath, String destAgentId,
public static String buildTaskId(Integer mode,
ExecuteObjectGseKey sourceExecuteObjectGseKey,
String sourceFilePath,
ExecuteObjectGseKey destExecuteObjectGseKey,
String destFilePath) {
String taskId;
if (FileDistModeEnum.getFileDistMode(mode) == FileDistModeEnum.DOWNLOAD) {
taskId = concat(mode.toString(), sourceAgentId, GseFilePathUtils.standardizedGSEFilePath(sourceFilePath),
destAgentId, destFilePath);
taskId = concat(mode.toString(), sourceExecuteObjectGseKey.getKey(),
GseFilePathUtils.standardizedGSEFilePath(sourceFilePath),
destExecuteObjectGseKey.getKey(), destFilePath);
} else {
taskId = concat(mode.toString(), sourceAgentId, GseFilePathUtils.standardizedGSEFilePath(sourceFilePath));
taskId = concat(mode.toString(), sourceExecuteObjectGseKey.getKey(),
GseFilePathUtils.standardizedGSEFilePath(sourceFilePath));
}
return taskId;
}
Expand All @@ -199,4 +221,32 @@ private static String concat(String... strArgs) {
public boolean isApiProtocolBeforeV2() {
return this.protocolVersion == null || this.protocolVersion < 2;
}

@JsonIgnore
public ExecuteObjectGseKey getDestExecuteObjectGseKey() {
if (destExecuteObjectGseKey != null) {
return destExecuteObjectGseKey;
}
if (StringUtils.isNotEmpty(destContainerId)) {
// bk_container_id 不为空,说明是容器执行对象
destExecuteObjectGseKey = ExecuteObjectGseKey.ofContainer(destAgentId, destContainerId);
} else {
destExecuteObjectGseKey = ExecuteObjectGseKey.ofHost(destAgentId);
}
return destExecuteObjectGseKey;
}

@JsonIgnore
public ExecuteObjectGseKey getSourceExecuteObjectGseKey() {
if (sourceExecuteObjectGseKey != null) {
return sourceExecuteObjectGseKey;
}
if (StringUtils.isNotEmpty(sourceContainerId)) {
// bk_container_id 不为空,说明是容器执行对象
sourceExecuteObjectGseKey = ExecuteObjectGseKey.ofContainer(sourceAgentId, sourceContainerId);
} else {
sourceExecuteObjectGseKey = ExecuteObjectGseKey.ofHost(destAgentId);
}
return sourceExecuteObjectGseKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,39 @@
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.execute.engine.result;
package com.tencent.bk.job.common.gse.v2.model;

import com.tencent.bk.job.common.constant.ExecuteObjectTypeEnum;
import lombok.Getter;

import java.util.Objects;
import java.util.StringJoiner;

/**
* 执行对象KEY, 用于跟 GSE 交互
*/
@Getter
public class ExecuteObjectGseKey {
private final String key;
private String agentId;
private String containerId;
private String key;

private ExecuteObjectGseKey(String key) {
this.key = key;
private ExecuteObjectGseKey() {
}

public static ExecuteObjectGseKey ofHostKey(String agentId) {
return new ExecuteObjectGseKey(ExecuteObjectTypeEnum.HOST.getValue() + ":" + agentId);
public static ExecuteObjectGseKey ofHost(String agentId) {
ExecuteObjectGseKey executeObjectGseKey = new ExecuteObjectGseKey();
executeObjectGseKey.agentId = agentId;
executeObjectGseKey.key = ExecuteObjectTypeEnum.HOST.getValue() + ":" + agentId;
return executeObjectGseKey;
}

public static ExecuteObjectGseKey ofContainerKey(String containerId) {
return new ExecuteObjectGseKey(ExecuteObjectTypeEnum.CONTAINER.getValue() + ":" + containerId);
public static ExecuteObjectGseKey ofContainer(String agentId, String containerId) {
ExecuteObjectGseKey executeObjectGseKey = new ExecuteObjectGseKey();
executeObjectGseKey.agentId = agentId;
executeObjectGseKey.containerId = containerId;
executeObjectGseKey.key = ExecuteObjectTypeEnum.HOST.getValue() + ":" + agentId;
return executeObjectGseKey;
}

@Override
Expand All @@ -58,4 +69,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(key);
}

@Override
public String toString() {
return new StringJoiner(", ", ExecuteObjectGseKey.class.getSimpleName() + "[", "]")
.add("key='" + key + "'")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public static class AgentTask {
@JsonProperty("bk_agent_id")
private String agentId;

/**
* 目标容器 ID, 空则为主机
*/
@JsonProperty("bk_container_id")
private String containerId;

/**
* 脚本任务
*/
Expand All @@ -55,9 +61,10 @@ public static class AtomicTask {
private int offset;
}

public void addAgentTaskQuery(String agentId, Integer atomicTaskId, int offset) {
public void addAgentTaskQuery(ExecuteObjectGseKey executeObjectGseKey, Integer atomicTaskId, int offset) {
AgentTask agentTask = new AgentTask();
agentTask.setAgentId(agentId);
agentTask.setAgentId(executeObjectGseKey.getAgentId());
agentTask.setContainerId(executeObjectGseKey.getContainerId());
AtomicTask atomicTask = new AtomicTask();
atomicTask.setAtomicTaskId(atomicTaskId);
atomicTask.setOffset(offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
Expand All @@ -21,8 +23,30 @@ public class GetTransferFileResultRequest extends GseReq {
private String taskId;

/**
* 过滤结果的agentId
* 过滤结果的agent
*/
@JsonProperty("agent_id_list")
private List<String> agentIds;
@JsonProperty("agents")
private List<Agent> agents;

public void addAgentQuery(ExecuteObjectGseKey executeObjectGseKey) {
if (agents == null) {
agents = new ArrayList<>();
}
Agent agent = new Agent();
agent.setAgentId(executeObjectGseKey.getAgentId());
agent.setContainerId(executeObjectGseKey.getContainerId());
agents.add(agent);
}

public void batchAddAgentQuery(Collection<ExecuteObjectGseKey> executeObjectGseKeys) {
if (agents == null) {
agents = new ArrayList<>();
}
executeObjectGseKeys.forEach(executeObjectGseKey -> {
Agent agent = new Agent();
agent.setAgentId(executeObjectGseKey.getAgentId());
agent.setContainerId(executeObjectGseKey.getContainerId());
agents.add(agent);
});
}
}
Loading

0 comments on commit 21579f2

Please sign in to comment.