Skip to content

Commit

Permalink
perf: file-gateway调度逻辑优化 TencentBlueKing#2977
Browse files Browse the repository at this point in the history
限定重调度区间为最近半小时
  • Loading branch information
jsonwan committed May 14, 2024
1 parent 81fe601 commit b0c0292
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public interface FileTaskDAO {

List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize);

List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte> statusSet, Integer start,
List<String> listTimeoutFileSourceTaskIds(Long startTimeMills,
Long endTimeMills,
Collection<Byte> statusSet, Integer start,
Integer pageSize);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,17 @@ public List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, I
}

@Override
public List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills,
Collection<Byte> statusSet, Integer start, Integer pageSize) {
public List<String> listTimeoutFileSourceTaskIds(Long startTimeMills,
Long endTimeMills,
Collection<Byte> statusSet,
Integer start,
Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (expireTimeMills != null) {
conditions.add(defaultTable.LAST_MODIFY_TIME.le(System.currentTimeMillis() - expireTimeMills));
if (startTimeMills != null) {
conditions.add(defaultTable.LAST_MODIFY_TIME.greaterOrEqual(startTimeMills));
}
if (endTimeMills != null) {
conditions.add(defaultTable.LAST_MODIFY_TIME.lessOrEqual(endTimeMills));
}
if (statusSet != null && !statusSet.isEmpty()) {
conditions.add(defaultTable.STATUS.in(statusSet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLock;
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLockConfig;
import com.tencent.bk.job.common.redis.util.LockResult;
import com.tencent.bk.job.common.util.TimeUtil;
import com.tencent.bk.job.common.util.ip.IpUtils;
import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.dao.filesource.FileTaskDAO;
Expand Down Expand Up @@ -101,16 +102,21 @@ private void reDispatchFileSourceTasks() {
StopWatch watch = new StopWatch("reDispatchFileSourceTasks");
watch.start("listTimeoutFileSourceTaskIds");
// 找出未结束且长时间无响应的任务,无响应且未结束的任务就应当被重调度了
long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L;

long intervalStart = computeReDispatchIntervalStart();
long intervalEnd = computeReDispatchIntervalEnd();
List<String> timeoutFileSourceTaskIdList = fileTaskDAO.listTimeoutFileSourceTaskIds(
fileSourceTaskStatusExpireTimeMills,
intervalStart,
intervalEnd,
TaskStatusEnum.getRunningStatusSet(),
0,
-1
);
log.info(
"find {} fileSourceTask to reDispatch: {}",
"find {} fileSourceTask between [{},{}] to reDispatch: {}",
timeoutFileSourceTaskIdList.size(),
TimeUtil.formatTime(intervalStart),
TimeUtil.formatTime(intervalEnd),
timeoutFileSourceTaskIdList
);
watch.stop();
Expand Down Expand Up @@ -140,4 +146,26 @@ private void reDispatchFileSourceTasks() {
);
}
}

/**
* 计算重调度区间开始时间
*
* @return 重调度区间开始时间(ms)
*/
private long computeReDispatchIntervalStart() {
// 只对最近半小时内的任务进行重调度
long reDispatchStartIntervalMills = 30 * 60 * 1000L;
return System.currentTimeMillis() - reDispatchStartIntervalMills;
}

/**
* 计算重调度区间结束时间
*
* @return 重调度区间结束时间(ms)
*/
private long computeReDispatchIntervalEnd() {
// 对已经超时未更新状态的任务进行重调度
long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L;
return System.currentTimeMillis() - fileSourceTaskStatusExpireTimeMills;
}
}

0 comments on commit b0c0292

Please sign in to comment.