diff --git a/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/HeartBeatRedisLock.java b/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/HeartBeatRedisLock.java index 3e0d195166..dd93f0e750 100644 --- a/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/HeartBeatRedisLock.java +++ b/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/HeartBeatRedisLock.java @@ -60,19 +60,19 @@ public HeartBeatRedisLock(RedisTemplate redisTemplate, public LockResult lock() { boolean lockGotten; try { - lockGotten = LockUtils.tryGetReentrantLock(key, value, config.getExpireTimeMillis()); + lockGotten = LockUtils.tryGetDistributedLock(key, value, config.getExpireTimeMillis()); if (!lockGotten) { String realLockKey = getRealLockKey(); String realLockKeyValue = redisTemplate.opsForValue().get(realLockKey); log.info("Get lock {} fail, already held by {}", realLockKey, realLockKeyValue); - return LockResult.fail(); + return LockResult.fail(realLockKeyValue); } RedisKeyHeartBeatThread heartBeatThread = startRedisKeyHeartBeatThread(); - return LockResult.succeed(heartBeatThread); + return LockResult.succeed(value, heartBeatThread); } catch (Throwable t) { log.error("Get lock caught exception", t); } - return LockResult.fail(); + return LockResult.fail(null); } private String getRealLockKey() { diff --git a/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/LockResult.java b/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/LockResult.java index 89d67a0534..b133c7353e 100644 --- a/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/LockResult.java +++ b/src/backend/commons/common-redis/src/main/java/com/tencent/bk/job/common/redis/util/LockResult.java @@ -29,15 +29,20 @@ @Data public class LockResult { private boolean lockGotten; + private String lockValue; private RedisKeyHeartBeatThread heartBeatThread; - public LockResult(boolean lockGotten, RedisKeyHeartBeatThread heartBeatThread) { + public LockResult(boolean lockGotten, + String lockValue, + RedisKeyHeartBeatThread heartBeatThread) { this.lockGotten = lockGotten; + this.lockValue = lockValue; this.heartBeatThread = heartBeatThread; } - public LockResult(boolean lockGotten) { + public LockResult(boolean lockGotten, String lockValue) { this.lockGotten = lockGotten; + this.lockValue = lockValue; } public void tryToRelease() { @@ -46,12 +51,12 @@ public void tryToRelease() { } } - public static LockResult fail() { - return new LockResult(false); + public static LockResult fail(String lockValue) { + return new LockResult(false, lockValue); } - public static LockResult succeed(RedisKeyHeartBeatThread heartBeatThread) { - return new LockResult(true, heartBeatThread); + public static LockResult succeed(String lockValue, RedisKeyHeartBeatThread heartBeatThread) { + return new LockResult(true, lockValue, heartBeatThread); } } diff --git a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java index 57e839652a..18dec820ae 100644 --- a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java +++ b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java @@ -32,6 +32,9 @@ import com.tencent.bk.job.analysis.task.statistics.task.PastStatisticsMakeupTask; import com.tencent.bk.job.analysis.task.statistics.task.TaskInfo; import com.tencent.bk.job.analysis.task.statistics.task.WatchableTask; +import com.tencent.bk.job.common.redis.util.HeartBeatRedisLock; +import com.tencent.bk.job.common.redis.util.HeartBeatRedisLockConfig; +import com.tencent.bk.job.common.redis.util.LockResult; import com.tencent.bk.job.common.redis.util.LockUtils; import com.tencent.bk.job.common.redis.util.RedisKeyHeartBeatThread; import com.tencent.bk.job.common.util.ApplicationContextRegister; @@ -115,9 +118,9 @@ public class StatisticsTaskScheduler { @Autowired public StatisticsTaskScheduler(MeterRegistry meterRegistry, @Qualifier("currentStatisticsTaskExecutor") - ThreadPoolExecutor currentStatisticsTaskExecutor, + ThreadPoolExecutor currentStatisticsTaskExecutor, @Qualifier("pastStatisticsTaskExecutor") - ThreadPoolExecutor pastStatisticsTaskExecutor, + ThreadPoolExecutor pastStatisticsTaskExecutor, StatisticConfig statisticConfig, RedisTemplate redisTemplate, PastStatisticsMakeupTask pastStatisticsMakeupTask, @@ -399,19 +402,6 @@ private boolean needToRunStatisticTasks() { statisticConfig.getIntervalHours()); currentCycleHours = 1; } - - // 分布式唯一性保证 - boolean lockGotten = LockUtils.tryGetDistributedLock(REDIS_KEY_STATISTIC_JOB_LOCK, machineIp, 50); - if (!lockGotten) { - log.info("lock {} gotten by another machine, return", REDIS_KEY_STATISTIC_JOB_LOCK); - return false; - } - String runningMachine = redisTemplate.opsForValue().get(REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE); - if (StringUtils.isNotBlank(runningMachine)) { - //已有同步线程在跑,不再同步 - log.info("StatisticsTaskScheduler thread already running on {}", runningMachine); - return false; - } return true; } @@ -422,18 +412,25 @@ public void schedule() { if (!needToRunStatisticTasks()) { return; } - // 开一个心跳子线程,维护当前机器正在执行后台统计任务的状态 - long expireTimeMillis = 5000L; - long periodTimeMillis = 4000L; - RedisKeyHeartBeatThread statisticTaskSchedulerRedisKeyHeartBeatThread = new RedisKeyHeartBeatThread( + + // 分布式唯一性保证 + HeartBeatRedisLockConfig config = HeartBeatRedisLockConfig.getDefault(); + config.setHeartBeatThreadName("statisticTaskSchedulerRedisKeyHeartBeatThread"); + HeartBeatRedisLock lock = new HeartBeatRedisLock( redisTemplate, REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE, machineIp, - expireTimeMillis, - periodTimeMillis + config ); - statisticTaskSchedulerRedisKeyHeartBeatThread.setName("statisticTaskSchedulerRedisKeyHeartBeatThread"); - statisticTaskSchedulerRedisKeyHeartBeatThread.start(); + LockResult lockResult = lock.lock(); + if (!lockResult.isLockGotten()) { + log.info( + "lock {} gotten by another machine: {}, return", + REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE, + lockResult.getLockValue() + ); + return; + } // 统计任务开始 log.info("start StatisticsTaskScheduler at {},{}", TimeUtil.getCurrentTimeStr("HH:mm:ss"), @@ -467,20 +464,26 @@ public void schedule() { // 等待所有统计任务结束 futureList.forEach(future -> { try { - int timeoutMinutes = 10; + // 最长统计任务耗时约30min + int timeoutMinutes = 50; future.get(timeoutMinutes, TimeUnit.MINUTES); + updateDataUpdateTime(TimeUtil.getCurrentTimeStr()); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Exception in statistic task", e); + updateDataUpdateTime("After " + TimeUtil.getCurrentTimeStr()); } }); - // 向Redis写入统计数据更新时间,不过期 - redisTemplate.opsForValue().set(StatisticsConstants.KEY_DATA_UPDATE_TIME, TimeUtil.getCurrentTimeStr()); } catch (Throwable t) { log.error("Exception in StatisticsTaskScheduler", t); } finally { - statisticTaskSchedulerRedisKeyHeartBeatThread.setRunFlag(false); + lockResult.tryToRelease(); stopWatch.stop(); log.info(stopWatch.prettyPrint()); } } + + private void updateDataUpdateTime(String updateTimeStr) { + // 向Redis写入统计数据更新时间,不过期 + redisTemplate.opsForValue().set(StatisticsConstants.KEY_DATA_UPDATE_TIME, updateTimeStr); + } }