Skip to content

Commit

Permalink
Merge pull request #2921 from jsonwan/github_fix/analysis
Browse files Browse the repository at this point in the history
fix: 运营分析数据最后更新时间不准确 #2904
  • Loading branch information
wangyu096 authored Apr 19, 2024
2 parents 4a43fc6 + 7edb9f0 commit 0a61036
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ public HeartBeatRedisLock(RedisTemplate<String, String> redisTemplate,
public LockResult lock() {
boolean lockGotten;
try {
lockGotten = LockUtils.tryGetReentrantLock(key, value, config.getExpireTimeMillis());
lockGotten = LockUtils.tryGetDistributedLock(key, value, config.getExpireTimeMillis());
if (!lockGotten) {
String realLockKey = getRealLockKey();
String realLockKeyValue = redisTemplate.opsForValue().get(realLockKey);
log.info("Get lock {} fail, already held by {}", realLockKey, realLockKeyValue);
return LockResult.fail();
return LockResult.fail(realLockKeyValue);
}
RedisKeyHeartBeatThread heartBeatThread = startRedisKeyHeartBeatThread();
return LockResult.succeed(heartBeatThread);
return LockResult.succeed(value, heartBeatThread);
} catch (Throwable t) {
log.error("Get lock caught exception", t);
}
return LockResult.fail();
return LockResult.fail(null);
}

private String getRealLockKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,20 @@
@Data
public class LockResult {
private boolean lockGotten;
private String lockValue;
private RedisKeyHeartBeatThread heartBeatThread;

public LockResult(boolean lockGotten, RedisKeyHeartBeatThread heartBeatThread) {
public LockResult(boolean lockGotten,
String lockValue,
RedisKeyHeartBeatThread heartBeatThread) {
this.lockGotten = lockGotten;
this.lockValue = lockValue;
this.heartBeatThread = heartBeatThread;
}

public LockResult(boolean lockGotten) {
public LockResult(boolean lockGotten, String lockValue) {
this.lockGotten = lockGotten;
this.lockValue = lockValue;
}

public void tryToRelease() {
Expand All @@ -46,12 +51,12 @@ public void tryToRelease() {
}
}

public static LockResult fail() {
return new LockResult(false);
public static LockResult fail(String lockValue) {
return new LockResult(false, lockValue);
}

public static LockResult succeed(RedisKeyHeartBeatThread heartBeatThread) {
return new LockResult(true, heartBeatThread);
public static LockResult succeed(String lockValue, RedisKeyHeartBeatThread heartBeatThread) {
return new LockResult(true, lockValue, heartBeatThread);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import com.tencent.bk.job.analysis.task.statistics.task.PastStatisticsMakeupTask;
import com.tencent.bk.job.analysis.task.statistics.task.TaskInfo;
import com.tencent.bk.job.analysis.task.statistics.task.WatchableTask;
import com.tencent.bk.job.common.redis.util.LockUtils;
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLock;
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLockConfig;
import com.tencent.bk.job.common.redis.util.LockResult;
import com.tencent.bk.job.common.redis.util.RedisKeyHeartBeatThread;
import com.tencent.bk.job.common.util.ApplicationContextRegister;
import com.tencent.bk.job.common.util.TimeUtil;
Expand Down Expand Up @@ -74,7 +76,6 @@
public class StatisticsTaskScheduler {

private static final String machineIp = IpUtils.getFirstMachineIP();
private static final String REDIS_KEY_STATISTIC_JOB_LOCK = "statistic-job-lock";
private static final String REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE = "statistic-job-running-machine";
public static final AtomicLong rejectedStatisticsTaskNum = new AtomicLong(0);
private static final String REDIS_KEY_STATISTIC_JOB_INIT_MACHINE = "statistic-job-init-machine";
Expand All @@ -94,18 +95,6 @@ public class StatisticsTaskScheduler {
private ThreadPoolExecutor currentStatisticsTaskExecutor;
private ThreadPoolExecutor pastStatisticsTaskExecutor;

static {
List<String> keyList = Collections.singletonList(REDIS_KEY_STATISTIC_JOB_LOCK);
keyList.forEach(key -> {
try {
//进程重启首先尝试释放上次加上的锁避免死锁
LockUtils.releaseDistributedLock(key, machineIp);
} catch (Throwable t) {
log.info("Redis key:" + key + " does not need to be released, ignore");
}
});
}

private final StatisticConfig statisticConfig;
private final RedisTemplate<String, String> redisTemplate;
private final PastStatisticsMakeupTask pastStatisticsMakeupTask;
Expand All @@ -115,9 +104,9 @@ public class StatisticsTaskScheduler {
@Autowired
public StatisticsTaskScheduler(MeterRegistry meterRegistry,
@Qualifier("currentStatisticsTaskExecutor")
ThreadPoolExecutor currentStatisticsTaskExecutor,
ThreadPoolExecutor currentStatisticsTaskExecutor,
@Qualifier("pastStatisticsTaskExecutor")
ThreadPoolExecutor pastStatisticsTaskExecutor,
ThreadPoolExecutor pastStatisticsTaskExecutor,
StatisticConfig statisticConfig,
RedisTemplate<String, String> redisTemplate,
PastStatisticsMakeupTask pastStatisticsMakeupTask,
Expand Down Expand Up @@ -171,7 +160,7 @@ public boolean configThreads(Integer currentStatisticThreadsNum, Integer pastSta
if (currentStatisticThreadsNum != null && currentStatisticThreadsNum > 0) {
log.info("reconfig currentStatisticsTaskExecutor to {} threads", currentStatisticThreadsNum);
List<Runnable> canceledRunnableList = currentStatisticsTaskExecutor.shutdownNow();
if (canceledRunnableList.size() > 0) {
if (!canceledRunnableList.isEmpty()) {
log.warn("{} threads of canceled", canceledRunnableList.size());
}
currentStatisticsTaskExecutor = new ThreadPoolExecutor(
Expand All @@ -190,7 +179,7 @@ public boolean configThreads(Integer currentStatisticThreadsNum, Integer pastSta
log.info("reconfig pastStatisticsTaskExecutor to {} threads", pastStatisticThreadsNum);
pastStatisticsMakeupTask.setRunFlag(false);
List<Runnable> canceledRunnableList = pastStatisticsTaskExecutor.shutdownNow();
if (canceledRunnableList.size() > 0) {
if (!canceledRunnableList.isEmpty()) {
log.warn("{} threads of pastStatisticsTaskExecutor canceled", canceledRunnableList.size());
}
pastStatisticsTaskExecutor = new ThreadPoolExecutor(
Expand Down Expand Up @@ -262,7 +251,7 @@ public List<String> startTasks(String startDateStr, String endDateStr, List<Stri
}
int count = 0;
int maxPreviousDays = 1000;
while (targetDate.compareTo(startDate) >= 0 && count < maxPreviousDays) {
while (!targetDate.isBefore(startDate) && count < maxPreviousDays) {
startByTaskNameList(taskNameList, targetDate, startedTaskNames);
targetDate = targetDate.minusDays(1);
count += 1;
Expand Down Expand Up @@ -399,19 +388,6 @@ private boolean needToRunStatisticTasks() {
statisticConfig.getIntervalHours());
currentCycleHours = 1;
}

// 分布式唯一性保证
boolean lockGotten = LockUtils.tryGetDistributedLock(REDIS_KEY_STATISTIC_JOB_LOCK, machineIp, 50);
if (!lockGotten) {
log.info("lock {} gotten by another machine, return", REDIS_KEY_STATISTIC_JOB_LOCK);
return false;
}
String runningMachine = redisTemplate.opsForValue().get(REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE);
if (StringUtils.isNotBlank(runningMachine)) {
//已有同步线程在跑,不再同步
log.info("StatisticsTaskScheduler thread already running on {}", runningMachine);
return false;
}
return true;
}

Expand All @@ -422,18 +398,25 @@ public void schedule() {
if (!needToRunStatisticTasks()) {
return;
}
// 开一个心跳子线程,维护当前机器正在执行后台统计任务的状态
long expireTimeMillis = 5000L;
long periodTimeMillis = 4000L;
RedisKeyHeartBeatThread statisticTaskSchedulerRedisKeyHeartBeatThread = new RedisKeyHeartBeatThread(

// 分布式唯一性保证
HeartBeatRedisLockConfig config = HeartBeatRedisLockConfig.getDefault();
config.setHeartBeatThreadName("statisticTaskSchedulerRedisKeyHeartBeatThread");
HeartBeatRedisLock lock = new HeartBeatRedisLock(
redisTemplate,
REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE,
machineIp,
expireTimeMillis,
periodTimeMillis
config
);
statisticTaskSchedulerRedisKeyHeartBeatThread.setName("statisticTaskSchedulerRedisKeyHeartBeatThread");
statisticTaskSchedulerRedisKeyHeartBeatThread.start();
LockResult lockResult = lock.lock();
if (!lockResult.isLockGotten()) {
log.info(
"lock {} gotten by another machine: {}, return",
REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE,
lockResult.getLockValue()
);
return;
}

// 统计任务开始
log.info("start StatisticsTaskScheduler at {},{}", TimeUtil.getCurrentTimeStr("HH:mm:ss"),
Expand Down Expand Up @@ -467,20 +450,26 @@ public void schedule() {
// 等待所有统计任务结束
futureList.forEach(future -> {
try {
int timeoutMinutes = 10;
// 最长统计任务耗时约30min
int timeoutMinutes = 60;
future.get(timeoutMinutes, TimeUnit.MINUTES);
updateDataUpdateTime(TimeUtil.getCurrentTimeStr());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Exception in statistic task", e);
updateDataUpdateTime("After " + TimeUtil.getCurrentTimeStr());
}
});
// 向Redis写入统计数据更新时间,不过期
redisTemplate.opsForValue().set(StatisticsConstants.KEY_DATA_UPDATE_TIME, TimeUtil.getCurrentTimeStr());
} catch (Throwable t) {
log.error("Exception in StatisticsTaskScheduler", t);
} finally {
statisticTaskSchedulerRedisKeyHeartBeatThread.setRunFlag(false);
lockResult.tryToRelease();
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
}

private void updateDataUpdateTime(String updateTimeStr) {
// 向Redis写入统计数据更新时间,不过期
redisTemplate.opsForValue().set(StatisticsConstants.KEY_DATA_UPDATE_TIME, updateTimeStr);
}
}

0 comments on commit 0a61036

Please sign in to comment.