Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: 执行引擎部分代码优化 #1616 #1627

Merged
merged 2 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ public interface ServiceTaskEvictPolicyResource {
@GetMapping("/current")
InternalResponse<ComposedTaskEvictPolicyDTO> getCurrentPolicy();

@ApiOperation(value = "设置任务驱逐策略,参考值:{\"@type\":\"ComposedTaskEvictPolicy\",\"operator\":\"OR\","
+ "\"policyList\":[{\"@type\":\"AppIdTaskEvictPolicy\",\"appIdsToEvict\":[2,3]},"
+ "{\"@type\":\"AppCodeTaskEvictPolicy\",\"appCodesToEvict\":[\"appCode1\",\"appCode2\"]}]}\n",
@ApiOperation(value = "设置任务驱逐策略,参考值:{\"@type\":\"ComposedTaskEvictPolicy\",\"operator\":\"OR\"," +
"\"policyList\":[{\"@type\":\"TaskInstanceIdEvictPolicy\",\"taskInstanceIdsToEvict\":[1001,1002]}," +
"{\"@type\":\"AppIdTaskEvictPolicy\",\"appIdsToEvict\":[2,3]},{\"@type\":\"AppCodeTaskEvictPolicy\"," +
"\"appCodesToEvict\":[\"appCode1\",\"appCode2\"]}]}",
produces = "application/json")
@PutMapping("/")
InternalResponse<Boolean> setPolicy(@RequestBody ComposedTaskEvictPolicyDTO policyDTO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* 根据创建任务使用的AppCode来驱逐任务的策略实体
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import java.util.List;

/**
* 根据Job业务Id来驱逐任务的策略实体
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,24 @@
import java.util.Arrays;
import java.util.List;

/**
* 任务驱逐组合策略实体
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class ComposedTaskEvictPolicyDTO extends TaskEvictPolicyDTO {

public static final String classType = "ComposedTaskEvictPolicy";

// 策略组合操作符,支持AND/OR
public enum ComposeOperator {
AND, OR
}

// 策略组合操作符
protected ComposeOperator operator = null;
// 子策略列表
protected List<TaskEvictPolicyDTO> policyList = new ArrayList<>();

public ComposedTaskEvictPolicyDTO(ComposeOperator operator, TaskEvictPolicyDTO... policyList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = TaskInstanceIdEvictPolicyDTO.class, name = TaskInstanceIdEvictPolicyDTO.classType),
@JsonSubTypes.Type(value = AppCodeTaskEvictPolicyDTO.class, name = AppCodeTaskEvictPolicyDTO.classType),
@JsonSubTypes.Type(value = AppIdTaskEvictPolicyDTO.class, name = AppIdTaskEvictPolicyDTO.classType),
@JsonSubTypes.Type(value = ComposedTaskEvictPolicyDTO.class, name = ComposedTaskEvictPolicyDTO.classType)})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.execute.model.inner;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.List;

/**
* 根据任务实例Id来驱逐任务的策略实体
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class TaskInstanceIdEvictPolicyDTO extends TaskEvictPolicyDTO {

public static final String classType = "TaskInstanceIdEvictPolicy";

protected List<Long> taskInstanceIdsToEvict;

public TaskInstanceIdEvictPolicyDTO(List<Long> taskInstanceIdsToEvict) {
this.taskInstanceIdsToEvict = taskInstanceIdsToEvict;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,12 @@
* 执行引擎任务驱逐策略接口类
*/
public interface ITaskEvictPolicy {

/**
* 判断一个任务实例是否需要被驱逐出执行引擎
*
* @param taskInstance 任务实例
* @return 布尔值,是否需要被驱逐
*/
boolean needToEvict(TaskInstanceDTO taskInstance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.tencent.bk.job.execute.model.inner.AppIdTaskEvictPolicyDTO;
import com.tencent.bk.job.execute.model.inner.ComposedTaskEvictPolicyDTO;
import com.tencent.bk.job.execute.model.inner.TaskEvictPolicyDTO;
import com.tencent.bk.job.execute.model.inner.TaskInstanceIdEvictPolicyDTO;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
Expand All @@ -43,20 +44,22 @@ public class PolicyFactory {
/**
* 根据策略数据实体,创建任务驱逐策略对象
*
* @param policyDTO 策略数据实体
* @param policy 策略数据实体
* @return 策略对象
*/
public static ITaskEvictPolicy createPolicyByDTO(TaskEvictPolicyDTO policyDTO) {
if (policyDTO instanceof AppCodeTaskEvictPolicyDTO) {
return new AppCodeTaskEvictPolicy(((AppCodeTaskEvictPolicyDTO) policyDTO).getAppCodesToEvict());
} else if (policyDTO instanceof AppIdTaskEvictPolicyDTO) {
return new AppIdTaskEvictPolicy(((AppIdTaskEvictPolicyDTO) policyDTO).getAppIdsToEvict());
} else if (policyDTO instanceof ComposedTaskEvictPolicyDTO) {
ComposedTaskEvictPolicyDTO composedPolicyDTO = (ComposedTaskEvictPolicyDTO) policyDTO;
public static ITaskEvictPolicy createPolicyByDTO(TaskEvictPolicyDTO policy) {
if (policy instanceof TaskInstanceIdEvictPolicyDTO) {
return new TaskInstanceIdEvictPolicy(((TaskInstanceIdEvictPolicyDTO) policy).getTaskInstanceIdsToEvict());
} else if (policy instanceof AppCodeTaskEvictPolicyDTO) {
return new AppCodeTaskEvictPolicy(((AppCodeTaskEvictPolicyDTO) policy).getAppCodesToEvict());
} else if (policy instanceof AppIdTaskEvictPolicyDTO) {
return new AppIdTaskEvictPolicy(((AppIdTaskEvictPolicyDTO) policy).getAppIdsToEvict());
} else if (policy instanceof ComposedTaskEvictPolicyDTO) {
ComposedTaskEvictPolicyDTO composedPolicyDTO = (ComposedTaskEvictPolicyDTO) policy;
List<TaskEvictPolicyDTO> policyDTOList = composedPolicyDTO.getPolicyList();
return new ComposedTaskEvictPolicy(composedPolicyDTO.getOperator(), policyDTOList);
} else {
log.error("TaskEvictPolicyDTO not support yet:{}", policyDTO);
log.error("TaskEvictPolicyDTO not support yet:{}", policy);
throw new InternalException(ErrorCode.NOT_SUPPORT_FEATURE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ public boolean shouldEvictTask(TaskInstanceDTO taskInstance) {
* 更新被驱逐的任务的状态为被丢弃状态
*
* @param taskInstance 任务实例
* @return 是否更新成功
*/
public boolean updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) {
public void updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) {
long endTime = System.currentTimeMillis();
Long taskInstanceId = stepInstance.getTaskInstanceId();
if (RunStatusEnum.isFinishedStatus(stepInstance.getStatus())) {
long totalTime = TaskCostCalculator.calculate(stepInstance.getStartTime(), endTime,
stepInstance.getTotalTime());
if (!RunStatusEnum.isFinishedStatus(stepInstance.getStatus())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不是应该等步骤状态被设置为FIINSH STATE , 才进行后续的任务状态更新么?为啥这次改动中把条件反过来呀?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是驱逐任务后更新任务状态的场景,本来就是要将运行中的状态改为已丢弃状态,应该是之前的代码重构把这里重构错了,这次改了回来。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

long totalTime = TaskCostCalculator.calculate(
stepInstance.getStartTime(),
endTime,
stepInstance.getTotalTime()
);
taskInstanceService.updateStepExecutionInfo(
stepInstance.getId(),
RunStatusEnum.ABANDONED,
Expand All @@ -90,9 +92,12 @@ public boolean updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanc
stepInstance.getStatus()
);
}
if (RunStatusEnum.isFinishedStatus(taskInstance.getStatus())) {
long totalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime,
taskInstance.getTotalTime());
if (!RunStatusEnum.isFinishedStatus(taskInstance.getStatus())) {
long totalTime = TaskCostCalculator.calculate(
taskInstance.getStartTime(),
endTime,
taskInstance.getTotalTime()
);
taskInstanceService.updateTaskExecutionInfo(
taskInstanceId,
RunStatusEnum.ABANDONED,
Expand All @@ -108,6 +113,5 @@ public boolean updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanc
taskInstance.getStatus()
);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ public class TaskEvictPolicyManager {

// 策略更新时间间隔:10s
private final int POLICY_UPDATE_INTERVAL_MILLS = 10000;
// 序列化后的策略字符串
private String policyJsonStr = null;
// 组合策略
private volatile ComposedTaskEvictPolicy policy = null;

// Redis操作模板接口
private final RedisTemplate<String, String> redisTemplate;

@Autowired
Expand All @@ -68,24 +71,25 @@ public ComposedTaskEvictPolicy getPolicy() {
public void updatePolicy() {
String loadedPolicyJsonStr = redisTemplate.opsForValue()
.get(RedisConstants.KEY_EXECUTE_TASK_EVICT_POLICY);
if (StringUtil.isDifferent(policyJsonStr, loadedPolicyJsonStr)) {
policyJsonStr = loadedPolicyJsonStr;
try {
policy = StringUtils.isBlank(loadedPolicyJsonStr) ? null : JsonUtils.fromJson(
loadedPolicyJsonStr, new TypeReference<ComposedTaskEvictPolicy>() {
}
);
log.info("loaded new policy:{}", loadedPolicyJsonStr);
} catch (Exception e) {
FormattingTuple message = MessageFormatter.format(
"Fail to parse taskEvictPolicy from {}",
loadedPolicyJsonStr
);
log.warn(message.getMessage(), e);
throw e;
}
} else {
// 策略无任何改变则无须解析更新
if (!StringUtil.isDifferent(policyJsonStr, loadedPolicyJsonStr)) {
log.debug("taskEvictPolicy not change");
return;
}
policyJsonStr = loadedPolicyJsonStr;
try {
policy = StringUtils.isBlank(loadedPolicyJsonStr) ? null : JsonUtils.fromJson(
loadedPolicyJsonStr, new TypeReference<ComposedTaskEvictPolicy>() {
}
);
log.info("loaded new policy:{}", loadedPolicyJsonStr);
} catch (Exception e) {
FormattingTuple message = MessageFormatter.format(
"Fail to parse taskEvictPolicy from {}",
loadedPolicyJsonStr
);
log.warn(message.getMessage(), e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.execute.engine.evict;

import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.model.inner.TaskInstanceIdEvictPolicyDTO;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* 驱逐策略:根据任务实例ID将任务驱逐出执行引擎
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class TaskInstanceIdEvictPolicy extends TaskInstanceIdEvictPolicyDTO implements ITaskEvictPolicy {

private Set<Long> taskInstanceIdSetToEvict;

public TaskInstanceIdEvictPolicy(List<Long> taskInstanceIdsToEvict) {
super(taskInstanceIdsToEvict);
// 使用查找速度为O(1)的Set
taskInstanceIdSetToEvict = new HashSet<>(taskInstanceIdsToEvict);
}

@Override
public boolean needToEvict(TaskInstanceDTO taskInstance) {
Long taskInstanceId = taskInstance.getId();
return taskInstanceIdSetToEvict.contains(taskInstanceId);
}
}
Loading