Skip to content

Commit

Permalink
fix: 运营分析数据最后更新时间不准确 #2904
Browse files Browse the repository at this point in the history
  • Loading branch information
jsonwan committed Apr 19, 2024
1 parent 6d936be commit 23b97bc
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ public HeartBeatRedisLock(RedisTemplate<String, String> redisTemplate,
public LockResult lock() {
boolean lockGotten;
try {
lockGotten = LockUtils.tryGetReentrantLock(key, value, config.getExpireTimeMillis());
lockGotten = LockUtils.tryGetDistributedLock(key, value, config.getExpireTimeMillis());
if (!lockGotten) {
String realLockKey = getRealLockKey();
String realLockKeyValue = redisTemplate.opsForValue().get(realLockKey);
log.info("Get lock {} fail, already held by {}", realLockKey, realLockKeyValue);
return LockResult.fail();
return LockResult.fail(realLockKeyValue);
}
RedisKeyHeartBeatThread heartBeatThread = startRedisKeyHeartBeatThread();
return LockResult.succeed(heartBeatThread);
return LockResult.succeed(value, heartBeatThread);
} catch (Throwable t) {
log.error("Get lock caught exception", t);
}
return LockResult.fail();
return LockResult.fail(null);
}

private String getRealLockKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,20 @@
@Data
public class LockResult {
private boolean lockGotten;
private String lockValue;
private RedisKeyHeartBeatThread heartBeatThread;

public LockResult(boolean lockGotten, RedisKeyHeartBeatThread heartBeatThread) {
public LockResult(boolean lockGotten,
String lockValue,
RedisKeyHeartBeatThread heartBeatThread) {
this.lockGotten = lockGotten;
this.lockValue = lockValue;
this.heartBeatThread = heartBeatThread;
}

public LockResult(boolean lockGotten) {
public LockResult(boolean lockGotten, String lockValue) {
this.lockGotten = lockGotten;
this.lockValue = lockValue;
}

public void tryToRelease() {
Expand All @@ -46,12 +51,12 @@ public void tryToRelease() {
}
}

public static LockResult fail() {
return new LockResult(false);
public static LockResult fail(String lockValue) {
return new LockResult(false, lockValue);
}

public static LockResult succeed(RedisKeyHeartBeatThread heartBeatThread) {
return new LockResult(true, heartBeatThread);
public static LockResult succeed(String lockValue, RedisKeyHeartBeatThread heartBeatThread) {
return new LockResult(true, lockValue, heartBeatThread);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.tencent.bk.job.analysis.task.statistics.task.PastStatisticsMakeupTask;
import com.tencent.bk.job.analysis.task.statistics.task.TaskInfo;
import com.tencent.bk.job.analysis.task.statistics.task.WatchableTask;
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLock;
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLockConfig;
import com.tencent.bk.job.common.redis.util.LockResult;
import com.tencent.bk.job.common.redis.util.LockUtils;
import com.tencent.bk.job.common.redis.util.RedisKeyHeartBeatThread;
import com.tencent.bk.job.common.util.ApplicationContextRegister;
Expand Down Expand Up @@ -115,9 +118,9 @@ public class StatisticsTaskScheduler {
@Autowired
public StatisticsTaskScheduler(MeterRegistry meterRegistry,
@Qualifier("currentStatisticsTaskExecutor")
ThreadPoolExecutor currentStatisticsTaskExecutor,
ThreadPoolExecutor currentStatisticsTaskExecutor,
@Qualifier("pastStatisticsTaskExecutor")
ThreadPoolExecutor pastStatisticsTaskExecutor,
ThreadPoolExecutor pastStatisticsTaskExecutor,
StatisticConfig statisticConfig,
RedisTemplate<String, String> redisTemplate,
PastStatisticsMakeupTask pastStatisticsMakeupTask,
Expand Down Expand Up @@ -399,19 +402,6 @@ private boolean needToRunStatisticTasks() {
statisticConfig.getIntervalHours());
currentCycleHours = 1;
}

// 分布式唯一性保证
boolean lockGotten = LockUtils.tryGetDistributedLock(REDIS_KEY_STATISTIC_JOB_LOCK, machineIp, 50);
if (!lockGotten) {
log.info("lock {} gotten by another machine, return", REDIS_KEY_STATISTIC_JOB_LOCK);
return false;
}
String runningMachine = redisTemplate.opsForValue().get(REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE);
if (StringUtils.isNotBlank(runningMachine)) {
//已有同步线程在跑,不再同步
log.info("StatisticsTaskScheduler thread already running on {}", runningMachine);
return false;
}
return true;
}

Expand All @@ -422,18 +412,25 @@ public void schedule() {
if (!needToRunStatisticTasks()) {
return;
}
// 开一个心跳子线程,维护当前机器正在执行后台统计任务的状态
long expireTimeMillis = 5000L;
long periodTimeMillis = 4000L;
RedisKeyHeartBeatThread statisticTaskSchedulerRedisKeyHeartBeatThread = new RedisKeyHeartBeatThread(

// 分布式唯一性保证
HeartBeatRedisLockConfig config = HeartBeatRedisLockConfig.getDefault();
config.setHeartBeatThreadName("statisticTaskSchedulerRedisKeyHeartBeatThread");
HeartBeatRedisLock lock = new HeartBeatRedisLock(
redisTemplate,
REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE,
machineIp,
expireTimeMillis,
periodTimeMillis
config
);
statisticTaskSchedulerRedisKeyHeartBeatThread.setName("statisticTaskSchedulerRedisKeyHeartBeatThread");
statisticTaskSchedulerRedisKeyHeartBeatThread.start();
LockResult lockResult = lock.lock();
if (!lockResult.isLockGotten()) {
log.info(
"lock {} gotten by another machine: {}, return",
REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE,
lockResult.getLockValue()
);
return;
}

// 统计任务开始
log.info("start StatisticsTaskScheduler at {},{}", TimeUtil.getCurrentTimeStr("HH:mm:ss"),
Expand Down Expand Up @@ -467,20 +464,26 @@ public void schedule() {
// 等待所有统计任务结束
futureList.forEach(future -> {
try {
int timeoutMinutes = 10;
// 最长统计任务耗时约30min
int timeoutMinutes = 50;
future.get(timeoutMinutes, TimeUnit.MINUTES);
updateDataUpdateTime(TimeUtil.getCurrentTimeStr());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Exception in statistic task", e);
updateDataUpdateTime("After " + TimeUtil.getCurrentTimeStr());
}
});
// 向Redis写入统计数据更新时间,不过期
redisTemplate.opsForValue().set(StatisticsConstants.KEY_DATA_UPDATE_TIME, TimeUtil.getCurrentTimeStr());
} catch (Throwable t) {
log.error("Exception in StatisticsTaskScheduler", t);
} finally {
statisticTaskSchedulerRedisKeyHeartBeatThread.setRunFlag(false);
lockResult.tryToRelease();
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
}

private void updateDataUpdateTime(String updateTimeStr) {
// 向Redis写入统计数据更新时间,不过期
redisTemplate.opsForValue().set(StatisticsConstants.KEY_DATA_UPDATE_TIME, updateTimeStr);
}
}

0 comments on commit 23b97bc

Please sign in to comment.