diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java index f7485d9445..aa49eb3f4d 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java @@ -44,7 +44,9 @@ public interface FileTaskDAO { List listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize); - List listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection statusSet, Integer start, + List listTimeoutFileSourceTaskIds(Long startTimeMills, + Long endTimeMills, + Collection statusSet, Integer start, Integer pageSize); List listFileTasks(String fileSourceTaskId); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java index 2416b9d090..f7f139506d 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java @@ -184,11 +184,17 @@ public List listFileTasks(String fileSourceTaskId, Integer start, I } @Override - public List listTimeoutFileSourceTaskIds(Long expireTimeMills, - Collection statusSet, Integer start, Integer pageSize) { + public List listTimeoutFileSourceTaskIds(Long startTimeMills, + Long endTimeMills, + Collection statusSet, + Integer start, + Integer pageSize) { List conditions = new ArrayList<>(); - if (expireTimeMills != null) { - conditions.add(defaultTable.LAST_MODIFY_TIME.le(System.currentTimeMillis() - expireTimeMills)); + if (startTimeMills != null) { + conditions.add(defaultTable.LAST_MODIFY_TIME.greaterOrEqual(startTimeMills)); + } + if (endTimeMills != null) { + conditions.add(defaultTable.LAST_MODIFY_TIME.lessOrEqual(endTimeMills)); } if (statusSet != null && !statusSet.isEmpty()) { conditions.add(defaultTable.STATUS.in(statusSet)); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java index 2fd3f3fd7e..8622441008 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java @@ -27,6 +27,7 @@ import com.tencent.bk.job.common.redis.util.HeartBeatRedisLock; import com.tencent.bk.job.common.redis.util.HeartBeatRedisLockConfig; import com.tencent.bk.job.common.redis.util.LockResult; +import com.tencent.bk.job.common.util.TimeUtil; import com.tencent.bk.job.common.util.ip.IpUtils; import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum; import com.tencent.bk.job.file_gateway.dao.filesource.FileTaskDAO; @@ -101,16 +102,21 @@ private void reDispatchFileSourceTasks() { StopWatch watch = new StopWatch("reDispatchFileSourceTasks"); watch.start("listTimeoutFileSourceTaskIds"); // 找出未结束且长时间无响应的任务,无响应且未结束的任务就应当被重调度了 - long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L; + + long intervalStart = computeReDispatchIntervalStart(); + long intervalEnd = computeReDispatchIntervalEnd(); List timeoutFileSourceTaskIdList = fileTaskDAO.listTimeoutFileSourceTaskIds( - fileSourceTaskStatusExpireTimeMills, + intervalStart, + intervalEnd, TaskStatusEnum.getRunningStatusSet(), 0, -1 ); log.info( - "find {} fileSourceTask to reDispatch: {}", + "find {} fileSourceTask between [{},{}] to reDispatch: {}", timeoutFileSourceTaskIdList.size(), + TimeUtil.formatTime(intervalStart), + TimeUtil.formatTime(intervalEnd), timeoutFileSourceTaskIdList ); watch.stop(); @@ -140,4 +146,26 @@ private void reDispatchFileSourceTasks() { ); } } + + /** + * 计算重调度区间开始时间 + * + * @return 重调度区间开始时间(ms) + */ + private long computeReDispatchIntervalStart() { + // 只对最近半小时内的任务进行重调度 + long reDispatchStartIntervalMills = 30 * 60 * 1000L; + return System.currentTimeMillis() - reDispatchStartIntervalMills; + } + + /** + * 计算重调度区间结束时间 + * + * @return 重调度区间结束时间(ms) + */ + private long computeReDispatchIntervalEnd() { + // 对已经超时未更新状态的任务进行重调度 + long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L; + return System.currentTimeMillis() - fileSourceTaskStatusExpireTimeMills; + } }