From 4d0435e99f41d8de34eb448d8f20e8cc5078e029 Mon Sep 17 00:00:00 2001 From: jsonwan Date: Thu, 30 Nov 2023 19:20:06 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=A7=A6=E5=8F=91=E5=BB=B6=E8=BF=9F=E4=BC=98=E5=8C=96=20#2073?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Quartz从MySQL改用内存JobStore; 2. 基于Redis实现高可用方案; 3. 定时任务更改通过MQ广播消息实现。 --- .../config/JobFunctionConfiguration.java | 50 ++++ .../crontab/constant/CrontabActionEnum.java | 58 ++++ .../bk/job/crontab/dao/CronJobDAO.java | 2 + .../job/crontab/dao/impl/CronJobDAOImpl.java | 24 ++ .../listener/CrontabEventListener.java | 119 ++++++++ .../crontab/listener/event/CrontabEvent.java | 100 +++++++ .../bk/job/crontab/listener/event/Event.java | 48 +++ .../model/dto/CronJobBasicInfoDTO.java | 51 ++++ .../crontab/mq/CrontabMQEventDispatcher.java | 55 ++++ .../service/CronJobBatchLoadService.java | 62 ++++ .../service/CronJobLoadingService.java | 29 ++ .../job/crontab/service/CronJobService.java | 8 + .../impl/CronJobBatchLoadServiceImpl.java | 96 ++++++ .../impl/CronJobLoadingServiceImpl.java | 94 ++++++ .../service/impl/CronJobServiceImpl.java | 276 ++++++++++-------- .../bk/job/crontab/task/ScheduledTasks.java | 61 ++++ .../crontab/timer/AbstractQuartzJobBean.java | 24 +- .../timer/executor/SimpleJobExecutor.java | 4 + .../templates/job-assemble/configmap.yaml | 39 ++- .../templates/job-crontab/configmap.yaml | 55 +++- ...#job#job-assemble#application-assemble.yml | 40 ++- .../#etc#job#job-crontab#job-crontab.yml | 52 +++- 22 files changed, 1163 insertions(+), 184 deletions(-) create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/config/JobFunctionConfiguration.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/constant/CrontabActionEnum.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/CrontabEvent.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/Event.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobBasicInfoDTO.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/mq/CrontabMQEventDispatcher.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobBatchLoadService.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobLoadingService.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java create mode 100644 src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/task/ScheduledTasks.java diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/config/JobFunctionConfiguration.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/config/JobFunctionConfiguration.java new file mode 100644 index 0000000000..765509dd43 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/config/JobFunctionConfiguration.java @@ -0,0 +1,50 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.config; + +import com.tencent.bk.job.crontab.listener.CrontabEventListener; +import com.tencent.bk.job.crontab.listener.event.CrontabEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.function.Consumer; + +/** + * spring cloud function 定义 + *

+ * 注意:方法名与配置文件中的spring.cloud.function.definition对应,修改需要注意!!! + */ +@Configuration +@Slf4j +public class JobFunctionConfiguration { + @Bean + public Consumer handleCrontabFanoutEvent(@Autowired CrontabEventListener crontabEventListener) { + log.info("Init handleCrontabFanoutEvent consumer"); + return crontabEventListener::handleEvent; + } + +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/constant/CrontabActionEnum.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/constant/CrontabActionEnum.java new file mode 100644 index 0000000000..ce55b55c48 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/constant/CrontabActionEnum.java @@ -0,0 +1,58 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.constant; + +/** + * 定时任务相关操作 + */ +public enum CrontabActionEnum { + /** + * 添加定时任务 + */ + ADD_CRON(1), + /** + * 删除定时任务 + */ + DELETE_CRON(2); + + private final int value; + + CrontabActionEnum(int val) { + this.value = val; + } + + public static CrontabActionEnum valueOf(int value) { + for (CrontabActionEnum crontabAction : values()) { + if (crontabAction.getValue() == value) { + return crontabAction; + } + } + throw new IllegalArgumentException("No CrontabActionEnum constant: " + value); + } + + public int getValue() { + return value; + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java index 9aadb5e032..c130e2cd6b 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/CronJobDAO.java @@ -26,6 +26,7 @@ import com.tencent.bk.job.common.model.BaseSearchCondition; import com.tencent.bk.job.common.model.PageData; +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobWithVarsDTO; @@ -137,6 +138,7 @@ PageData listPageCronJobsByCondition(CronJobInfoDTO cronJobCondi Integer countCronJob(Long appId, Boolean active, Boolean cron); + List listEnabledCronBasicInfoForUpdate(int start, int limit); // 新增 diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java index c04a5634dc..1a2b71f8dd 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/dao/impl/CronJobDAOImpl.java @@ -30,6 +30,7 @@ import com.tencent.bk.job.common.model.dto.UserRoleInfoDTO; import com.tencent.bk.job.common.util.json.JsonUtils; import com.tencent.bk.job.crontab.dao.CronJobDAO; +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobVariableDTO; import com.tencent.bk.job.crontab.model.dto.CronJobWithVarsDTO; @@ -45,6 +46,7 @@ import org.jooq.OrderField; import org.jooq.Record1; import org.jooq.Record21; +import org.jooq.Record3; import org.jooq.Record4; import org.jooq.Record5; import org.jooq.Result; @@ -527,6 +529,28 @@ assert record != null; return record.value1(); } + @Override + public List listEnabledCronBasicInfoForUpdate(int start, int limit) { + List conditions = new ArrayList<>(); + conditions.add(TABLE.IS_DELETED.eq(UByte.valueOf(0))); + conditions.add(TABLE.IS_ENABLE.eq(UByte.valueOf(1))); + Result> records = context + .select(TABLE.ID, TABLE.APP_ID, TABLE.NAME) + .from(TABLE) + .where(conditions) + .orderBy(TABLE.ID) + .limit(start, limit) + .forUpdate() + .fetch(); + return records.map(record -> { + CronJobBasicInfoDTO cronJobBasicInfoDTO = new CronJobBasicInfoDTO(); + cronJobBasicInfoDTO.setId(record.get(TABLE.ID).longValue()); + cronJobBasicInfoDTO.setAppId(record.get(TABLE.APP_ID).longValue()); + cronJobBasicInfoDTO.setName(record.get(TABLE.NAME)); + return cronJobBasicInfoDTO; + }); + } + private CronJobInfoDTO convertToCronJobDTO(Record21 record) { if (record == null) { diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java new file mode 100644 index 0000000000..bc218ad1f8 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/CrontabEventListener.java @@ -0,0 +1,119 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.listener; + +import com.tencent.bk.job.crontab.constant.CrontabActionEnum; +import com.tencent.bk.job.crontab.listener.event.CrontabEvent; +import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; +import com.tencent.bk.job.crontab.service.CronJobService; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.helpers.MessageFormatter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 定时任务事件处理 + */ +@Component("crontabEventListener") +@Slf4j +public class CrontabEventListener { + + private final CronJobService cronJobService; + + @Autowired + public CrontabEventListener(CronJobService cronJobService) { + this.cronJobService = cronJobService; + } + + + /** + * 处理定时任务相关的事件 + * + * @param crontabEvent 定时任务相关的事件 + */ + public void handleEvent(CrontabEvent crontabEvent) { + log.info("Handle crontab event, event: {}, duration: {}ms", crontabEvent, crontabEvent.duration()); + long appId = crontabEvent.getAppId(); + long cronJobId = crontabEvent.getCronJobId(); + CrontabActionEnum action = CrontabActionEnum.valueOf(crontabEvent.getAction()); + try { + switch (action) { + case ADD_CRON: + CronJobInfoDTO cronJobInfoDTO = cronJobService.getCronJobInfoById(cronJobId); + refreshCronJobInQuartz(cronJobInfoDTO); + break; + case DELETE_CRON: + deleteCronJobFromQuartz(appId, cronJobId); + break; + default: + log.error("Invalid crontabEvent action: {}", action); + } + } catch (Throwable e) { + String errorMsg = MessageFormatter.format( + "Handle crontab event error, appId={}, cronJobId={}", + appId, + cronJobId + ).getMessage(); + log.error(errorMsg, e); + } + } + + private void refreshCronJobInQuartz(CronJobInfoDTO cronJobInfoDTO) { + if (cronJobInfoDTO == null) { + return; + } + if (cronJobInfoDTO.getEnable()) { + // 开启定时任务 + boolean result = cronJobService.addJobToQuartz(cronJobInfoDTO.getAppId(), cronJobInfoDTO.getId()); + log.info( + "add cronJob({},{}) to quartz, result={}", + cronJobInfoDTO.getAppId(), + cronJobInfoDTO.getId(), + result + ); + } else { + // 关闭定时任务 + boolean result = cronJobService.deleteJobFromQuartz(cronJobInfoDTO.getAppId(), cronJobInfoDTO.getId()); + log.info( + "delete cronJob({},{}) from quartz, result={}", + cronJobInfoDTO.getAppId(), + cronJobInfoDTO.getId(), + result + ); + } + } + + private void deleteCronJobFromQuartz(long appId, long cronJobId) { + // 删除定时任务 + boolean result = cronJobService.deleteJobFromQuartz(appId, cronJobId); + log.info( + "delete cronJob({},{}) from quartz, result={}", + appId, + cronJobId, + result + ); + } + +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/CrontabEvent.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/CrontabEvent.java new file mode 100644 index 0000000000..baf09b953f --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/CrontabEvent.java @@ -0,0 +1,100 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.listener.event; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.tencent.bk.job.crontab.constant.CrontabActionEnum; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.time.LocalDateTime; +import java.util.StringJoiner; + +/** + * 定时任务事件 + */ +@Getter +@Setter +@NoArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CrontabEvent extends Event { + /** + * 定时任务操作 + * + * @see CrontabActionEnum + */ + private int action; + /** + * Job业务ID + */ + private Long appId; + /** + * 定时任务ID + */ + private Long cronJobId; + + /** + * 构造添加定时任务事件 + * + * @param appId Job业务ID + * @param cronJobId 定时任务ID + * @return 事件 + */ + public static CrontabEvent addCron(long appId, long cronJobId) { + CrontabEvent crontabEvent = new CrontabEvent(); + crontabEvent.setAppId(appId); + crontabEvent.setCronJobId(cronJobId); + crontabEvent.setAction(CrontabActionEnum.ADD_CRON.getValue()); + crontabEvent.setTime(LocalDateTime.now()); + return crontabEvent; + } + + /** + * 构造删除定时任务事件 + * + * @param appId Job业务ID + * @param cronJobId 定时任务ID + * @return 事件 + */ + public static CrontabEvent deleteCron(long appId, long cronJobId) { + CrontabEvent crontabEvent = new CrontabEvent(); + crontabEvent.setAppId(appId); + crontabEvent.setCronJobId(cronJobId); + crontabEvent.setAction(CrontabActionEnum.DELETE_CRON.getValue()); + crontabEvent.setTime(LocalDateTime.now()); + return crontabEvent; + } + + @Override + public String toString() { + return new StringJoiner(", ", CrontabEvent.class.getSimpleName() + "[", "]") + .add("action=" + action) + .add("appId=" + appId) + .add("cronJobId=" + cronJobId) + .add("time=" + time) + .toString(); + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/Event.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/Event.java new file mode 100644 index 0000000000..ab37c2170b --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/listener/event/Event.java @@ -0,0 +1,48 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.listener.event; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.tencent.bk.job.common.util.date.DateUtils; +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Event { + /** + * 事件发生时间 + */ + protected LocalDateTime time; + + public long duration() { + if (time != null) { + return DateUtils.calculateMillsBetweenDateTime(time, LocalDateTime.now()); + } else { + return 0; + } + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobBasicInfoDTO.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobBasicInfoDTO.java new file mode 100644 index 0000000000..4cf07b2ef4 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/model/dto/CronJobBasicInfoDTO.java @@ -0,0 +1,51 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.model.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +@Data +@EqualsAndHashCode +@NoArgsConstructor +@AllArgsConstructor +public class CronJobBasicInfoDTO { + /** + * 定时任务 ID + */ + private Long id; + + /** + * 业务 ID + */ + private Long appId; + + /** + * 定时任务名称 + */ + private String name; +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/mq/CrontabMQEventDispatcher.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/mq/CrontabMQEventDispatcher.java new file mode 100644 index 0000000000..b86ccc5898 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/mq/CrontabMQEventDispatcher.java @@ -0,0 +1,55 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.mq; + +import com.tencent.bk.job.crontab.listener.event.CrontabEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class CrontabMQEventDispatcher { + + private final StreamBridge streamBridge; + + @Autowired + public CrontabMQEventDispatcher(StreamBridge streamBridge) { + this.streamBridge = streamBridge; + } + + /** + * 广播定时任务事件 + * + * @param crontabEvent 定时任务事件 + */ + public void broadCastCrontabEvent(CrontabEvent crontabEvent) { + log.info("Begin to broadcast crontab event, event: {}", crontabEvent); + String crontabFanoutOutput = "crontabFanout-out-0"; + streamBridge.send(crontabFanoutOutput, crontabEvent); + log.info("Broadcast crontab event successfully, event: {}", crontabEvent); + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobBatchLoadService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobBatchLoadService.java new file mode 100644 index 0000000000..5b83cc01b3 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobBatchLoadService.java @@ -0,0 +1,62 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service; + +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; +import lombok.Data; + +import java.util.List; + +public interface CronJobBatchLoadService { + + /** + * 将DB中的定时任务批量加载至Quartz + * + * @param start 起始位置 + * @param limit 一批定时任务的数量 + * @return 加载结果数据 + */ + CronLoadResult batchLoadCronToQuartz(int start, int limit); + + @Data + class CronLoadResult { + /** + * 从DB中获取到的定时任务数量 + */ + int fetchNum; + /** + * 加载成功的定时任务数量 + */ + int successNum; + /** + * 加载失败的定时任务数量 + */ + int failedNum; + /** + * 加载失败的定时任务列表 + */ + List failedCronList; + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobLoadingService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobLoadingService.java new file mode 100644 index 0000000000..03646c7e29 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobLoadingService.java @@ -0,0 +1,29 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service; + +public interface CronJobLoadingService { + void loadAllCronJob(); +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java index 75c76e8cd7..b2adfc0670 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/CronJobService.java @@ -24,9 +24,11 @@ package com.tencent.bk.job.crontab.service; +import com.tencent.bk.job.common.exception.ServiceException; import com.tencent.bk.job.common.model.BaseSearchCondition; import com.tencent.bk.job.common.model.PageData; import com.tencent.bk.job.crontab.model.BatchUpdateCronJobReq; +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; import com.tencent.bk.job.crontab.model.inner.ServiceInnerCronJobInfoDTO; import com.tencent.bk.job.crontab.model.inner.request.ServiceAddInnerCronJobRequestDTO; @@ -249,4 +251,10 @@ PageData listPageCronJobInfos(CronJobInfoDTO cronJobCondition, boolean isExistAnyAppCronJob(Long appId); Integer countCronJob(Long appId, Boolean active, Boolean cron); + + boolean addJobToQuartz(long appId, long cronJobId) throws ServiceException; + + boolean deleteJobFromQuartz(long appId, long cronJobId); + + List listEnabledCronBasicInfoForUpdate(int start, int limit); } diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java new file mode 100644 index 0000000000..ebf568377a --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobBatchLoadServiceImpl.java @@ -0,0 +1,96 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service.impl; + +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; +import com.tencent.bk.job.crontab.service.CronJobBatchLoadService; +import com.tencent.bk.job.crontab.service.CronJobService; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.helpers.MessageFormatter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Service +public class CronJobBatchLoadServiceImpl implements CronJobBatchLoadService { + + private final CronJobService cronJobService; + + @Autowired + public CronJobBatchLoadServiceImpl(CronJobService cronJobService) { + this.cronJobService = cronJobService; + } + + @Override + @Transactional(rollbackFor = {Exception.class, Error.class}) + public CronLoadResult batchLoadCronToQuartz(int start, int limit) { + int successNum = 0; + int failedNum = 0; + List failedCronList = new ArrayList<>(); + List cronJobBasicInfoList = cronJobService.listEnabledCronBasicInfoForUpdate(start, limit); + for (CronJobBasicInfoDTO cronJobBasicInfoDTO : cronJobBasicInfoList) { + boolean result = false; + try { + result = cronJobService.addJobToQuartz( + cronJobBasicInfoDTO.getAppId(), + cronJobBasicInfoDTO.getId() + ); + if (result) { + successNum += 1; + } else { + failedNum += 1; + failedCronList.add(cronJobBasicInfoDTO); + } + } catch (Exception e) { + failedNum += 1; + failedCronList.add(cronJobBasicInfoDTO); + String message = MessageFormatter.format( + "Fail to addJobToQuartz, cronJob={}", + cronJobBasicInfoDTO + ).getMessage(); + log.warn(message, e); + } + if (log.isDebugEnabled()) { + log.debug( + "load cronJob({},{},{}), result={}", + cronJobBasicInfoDTO.getAppId(), + cronJobBasicInfoDTO.getId(), + cronJobBasicInfoDTO.getName(), + result + ); + } + } + CronLoadResult cronLoadResult = new CronLoadResult(); + cronLoadResult.setFetchNum(cronJobBasicInfoList.size()); + cronLoadResult.setSuccessNum(successNum); + cronLoadResult.setFailedNum(failedNum); + cronLoadResult.setFailedCronList(failedCronList); + return cronLoadResult; + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java new file mode 100644 index 0000000000..7ccf386bb4 --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobLoadingServiceImpl.java @@ -0,0 +1,94 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.service.impl; + +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; +import com.tencent.bk.job.crontab.service.CronJobBatchLoadService; +import com.tencent.bk.job.crontab.service.CronJobLoadingService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Service +public class CronJobLoadingServiceImpl implements CronJobLoadingService { + + private final CronJobBatchLoadService cronJobBatchLoadService; + private volatile boolean loadingCronToQuartz = false; + + @Autowired + public CronJobLoadingServiceImpl(CronJobBatchLoadService cronJobBatchLoadService) { + this.cronJobBatchLoadService = cronJobBatchLoadService; + } + + @Override + public void loadAllCronJob() { + try { + if (loadingCronToQuartz) { + log.info("Last loading not finish, ignore"); + return; + } + loadingCronToQuartz = true; + loadAllCronJobToQuartz(); + } catch (Exception e) { + log.warn("Fail to loadAllCronJob", e); + } finally { + loadingCronToQuartz = false; + } + } + + private void loadAllCronJobToQuartz() { + int start = 0; + int limit = 100; + int fetchNum = 0; + int successNum = 0; + int failedNum = 0; + List failedCronList = new ArrayList<>(); + do { + CronJobBatchLoadService.CronLoadResult loadResult = cronJobBatchLoadService.batchLoadCronToQuartz( + start, + limit + ); + fetchNum += loadResult.getFetchNum(); + successNum += loadResult.getSuccessNum(); + failedNum += loadResult.getFailedNum(); + if (CollectionUtils.isNotEmpty(loadResult.getFailedCronList())) { + failedCronList.addAll(loadResult.getFailedCronList()); + } + start += limit; + } while (fetchNum > 0); + log.info( + "CronJobs load from db finished: fetchNum={}, successNum={}, failedNum={}, failedCronList={}", + fetchNum, + successNum, + failedNum, + failedCronList + ); + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java index 67e006d803..fbb5c3c680 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/service/impl/CronJobServiceImpl.java @@ -42,7 +42,6 @@ import com.tencent.bk.job.common.model.PageData; import com.tencent.bk.job.common.model.dto.AppResourceScope; import com.tencent.bk.job.common.model.dto.HostDTO; -import com.tencent.bk.job.common.redis.util.LockUtils; import com.tencent.bk.job.common.util.JobContextUtil; import com.tencent.bk.job.common.util.date.DateUtils; import com.tencent.bk.job.common.util.json.JsonUtils; @@ -50,7 +49,9 @@ import com.tencent.bk.job.crontab.constant.CronConstants; import com.tencent.bk.job.crontab.dao.CronJobDAO; import com.tencent.bk.job.crontab.exception.TaskExecuteAuthFailedException; +import com.tencent.bk.job.crontab.listener.event.CrontabEvent; import com.tencent.bk.job.crontab.model.BatchUpdateCronJobReq; +import com.tencent.bk.job.crontab.model.dto.CronJobBasicInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobInfoDTO; import com.tencent.bk.job.crontab.model.dto.CronJobVariableDTO; import com.tencent.bk.job.crontab.model.dto.InnerCronJobInfoDTO; @@ -58,6 +59,7 @@ import com.tencent.bk.job.crontab.model.inner.ServerDTO; import com.tencent.bk.job.crontab.model.inner.ServiceInnerCronJobInfoDTO; import com.tencent.bk.job.crontab.model.inner.request.ServiceAddInnerCronJobRequestDTO; +import com.tencent.bk.job.crontab.mq.CrontabMQEventDispatcher; import com.tencent.bk.job.crontab.service.CronJobService; import com.tencent.bk.job.crontab.service.ExecuteTaskService; import com.tencent.bk.job.crontab.service.HostService; @@ -110,6 +112,7 @@ public class CronJobServiceImpl implements CronJobService { private final CronAuthService cronAuthService; private final ExecuteTaskService executeTaskService; private final HostService hostService; + private final CrontabMQEventDispatcher crontabMQEventDispatcher; @Autowired public CronJobServiceImpl(CronJobDAO cronJobDAO, @@ -117,13 +120,15 @@ public CronJobServiceImpl(CronJobDAO cronJobDAO, TaskPlanService taskPlanService, CronAuthService cronAuthService, ExecuteTaskService executeTaskService, - HostService hostService) { + HostService hostService, + CrontabMQEventDispatcher crontabMQEventDispatcher) { this.cronJobDAO = cronJobDAO; this.quartzTaskHandler = quartzTaskHandler; this.taskPlanService = taskPlanService; this.cronAuthService = cronAuthService; this.executeTaskService = executeTaskService; this.hostService = hostService; + this.crontabMQEventDispatcher = crontabMQEventDispatcher; } private static String getJobName(long appId, long cronJobId) { @@ -255,7 +260,7 @@ public CronJobInfoDTO updateCronJobInfo(String username, CronJobInfoDTO cronJobI executeTaskService.authExecuteTask(cronJobInfo.getAppId(), cronJobInfo.getTaskPlanId(), cronJobInfo.getId(), cronJobInfo.getName(), taskVariables, cronJobInfo.getLastModifyUser()); if (cronJobDAO.updateCronJobById(cronJobInfo)) { - addJob(cronJobInfo.getAppId(), cronJobInfo.getId()); + informAllToAddJobToQuartz(cronJobInfo.getAppId(), cronJobInfo.getId()); } else { throw new InternalException(ErrorCode.UPDATE_CRON_JOB_FAILED); } @@ -265,7 +270,7 @@ public CronJobInfoDTO updateCronJobInfo(String username, CronJobInfoDTO cronJobI } } else { if (cronJobDAO.updateCronJobById(cronJobInfo)) { - deleteJob(cronJobInfo.getAppId(), cronJobInfo.getId()); + informAllToDeleteJobFromQuartz(cronJobInfo.getAppId(), cronJobInfo.getId()); } else { throw new InternalException(ErrorCode.UPDATE_CRON_JOB_FAILED); } @@ -381,7 +386,7 @@ public Boolean deleteCronJobInfo(String username, Long appId, Long cronJobId) { ActionAuditContext.current().setInstanceName(cron.getName()); if (cronJobDAO.deleteCronJobById(appId, cronJobId)) { - deleteJob(appId, cronJobId); + informAllToDeleteJobFromQuartz(appId, cronJobId); return true; } return false; @@ -428,7 +433,7 @@ public Boolean changeCronJobEnableStatus(String username, Long appId, Long cronJ executeTaskService.authExecuteTask(appId, originCronJobInfo.getTaskPlanId(), cronJobId, originCronJobInfo.getName(), taskVariables, username); if (cronJobDAO.updateCronJobById(cronJobInfo)) { - return addJob(appId, cronJobId); + return informAllToAddJobToQuartz(appId, cronJobId); } else { return false; } @@ -438,7 +443,7 @@ public Boolean changeCronJobEnableStatus(String username, Long appId, Long cronJ } } else { if (cronJobDAO.updateCronJobById(cronJobInfo)) { - return deleteJob(appId, cronJobId); + return informAllToDeleteJobFromQuartz(appId, cronJobId); } else { return false; } @@ -456,7 +461,7 @@ public Boolean disableExpiredCronJob(Long appId, Long cronJobId, String lastModi cronJobInfo.setLastModifyTime(lastModifyTime); cronJobInfo.setEnable(false); if (cronJobDAO.updateCronJobById(cronJobInfo)) { - return deleteJob(appId, cronJobId); + return informAllToDeleteJobFromQuartz(appId, cronJobId); } else { return false; } @@ -627,7 +632,7 @@ public Boolean batchUpdateCronJob(String username, Long appId, BatchUpdateCronJo cronJobInfo.getId(), originCronJobInfo.getName(), taskVariables, JobContextUtil.getUsername()); if (cronJobDAO.updateCronJobById(cronJobInfoFromReq)) { - addJob(appId, cronJobInfo.getId()); + informAllToAddJobToQuartz(appId, cronJobInfo.getId()); } } catch (TaskExecuteAuthFailedException e) { log.error("Error while pre auth cron execute!", e); @@ -635,7 +640,7 @@ public Boolean batchUpdateCronJob(String username, Long appId, BatchUpdateCronJo } } else { if (cronJobDAO.updateCronJobById(cronJobInfoFromReq)) { - deleteJob(appId, cronJobInfo.getId()); + informAllToDeleteJobFromQuartz(appId, cronJobInfo.getId()); } } CronJobInfoDTO updateCronJobInfo = getCronJobInfoById(appId, cronJobInfo.getId()); @@ -715,149 +720,164 @@ private ServiceInnerCronJobInfoDTO fromQuartzJob(QuartzJobInfoDTO jobInfo) { return innerCronJobInfoDTO; } - private boolean addJob(long appId, long cronJobId) throws ServiceException { + private boolean informAllToAddJobToQuartz(long appId, long cronJobId) throws ServiceException { + try { + crontabMQEventDispatcher.broadCastCrontabEvent(CrontabEvent.addCron(appId, cronJobId)); + return true; + } catch (Exception e) { + log.error("Fail to broadCast addCronEvent", e); + return false; + } + } + + private void checkCronRelatedPlan(Long appId, Long taskPlanId) throws ServiceException { + if (taskPlanService.getPlanBasicInfoById(appId, taskPlanId) == null) { + throw new NotFoundException(ErrorCode.TASK_PLAN_NOT_EXIST); + } + } + + private boolean informAllToDeleteJobFromQuartz(long appId, long cronJobId) { + try { + crontabMQEventDispatcher.broadCastCrontabEvent(CrontabEvent.deleteCron(appId, cronJobId)); + return true; + } catch (Exception e) { + log.error("Fail to broadCast deleteCronEvent", e); + return false; + } + } + + @Override + public boolean addJobToQuartz(long appId, long cronJobId) throws ServiceException { if (appId <= 0 || cronJobId <= 0) { return false; } - String lockKey = appId + ":" + cronJobId; - if (LockUtils.tryGetDistributedLock(lockKey, JobContextUtil.getRequestId(), 60_000)) { - try { - CronJobInfoDTO cronJobInfo = getCronJobInfoById(appId, cronJobId); - if (StringUtils.isBlank(cronJobInfo.getCronExpression()) - && cronJobInfo.getExecuteTime() < DateUtils.currentTimeSeconds()) { - throw new FailedPreconditionException(ErrorCode.CRON_JOB_TIME_PASSED); + try { + CronJobInfoDTO cronJobInfo = getCronJobInfoById(appId, cronJobId); + if (StringUtils.isBlank(cronJobInfo.getCronExpression()) + && cronJobInfo.getExecuteTime() < DateUtils.currentTimeSeconds()) { + throw new FailedPreconditionException(ErrorCode.CRON_JOB_TIME_PASSED); + } + checkCronRelatedPlan(cronJobInfo.getAppId(), cronJobInfo.getTaskPlanId()); + QuartzTrigger trigger = null; + if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { + QuartzTriggerBuilder cronTriggerBuilder = + QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.CRON) + .withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) + .withCronExpression(cronJobInfo.getCronExpression()) + .withMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING); + if (cronJobInfo.getEndTime() > 0) { + if (cronJobInfo.getEndTime() < DateUtils.currentTimeSeconds()) { + throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); + } else { + cronTriggerBuilder = + cronTriggerBuilder.endAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getEndTime()))); + } } - checkCronRelatedPlan(cronJobInfo.getAppId(), cronJobInfo.getTaskPlanId()); - QuartzTrigger trigger = null; + trigger = cronTriggerBuilder.build(); + } else if (cronJobInfo.getExecuteTime() > DateUtils.currentTimeSeconds()) { + trigger = QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.SIMPLE) + .withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) + .startAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getExecuteTime()))).withRepeatCount(0) + .withIntervalInHours(1) + .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) + .build(); + } + if (trigger == null) { + throw new InvalidParamException(ErrorCode.ILLEGAL_PARAM); + } + + QuartzJob job = + QuartzJobBuilder.newJob().withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) + .forJob(SimpleJobExecutor.class) + .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) + .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) + .withTrigger(trigger) + .build(); + + try { + quartzTaskHandler + .deleteJob(JobKey.jobKey(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); + quartzTaskHandler.addJob(job); + } catch (SchedulerException e) { + log.error("Error while add job to quartz!", e); + throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); + } + + if (cronJobInfo.getNotifyOffset() > 0) { + long notifyTime = 0L; if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { - QuartzTriggerBuilder cronTriggerBuilder = - QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.CRON) - .withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .withCronExpression(cronJobInfo.getCronExpression()) - .withMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING); if (cronJobInfo.getEndTime() > 0) { - if (cronJobInfo.getEndTime() < DateUtils.currentTimeSeconds()) { - throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); - } else { - cronTriggerBuilder = - cronTriggerBuilder.endAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getEndTime()))); - } + notifyTime = cronJobInfo.getEndTime() - cronJobInfo.getNotifyOffset(); } - trigger = cronTriggerBuilder.build(); - } else if (cronJobInfo.getExecuteTime() > DateUtils.currentTimeSeconds()) { - trigger = QuartzTriggerBuilder.newTrigger().ofType(QuartzTrigger.TriggerType.SIMPLE) - .withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .startAt(Date.from(Instant.ofEpochSecond(cronJobInfo.getExecuteTime()))).withRepeatCount(0) - .withIntervalInHours(1) - .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) - .build(); + } else { + notifyTime = cronJobInfo.getExecuteTime() - cronJobInfo.getNotifyOffset(); } - if (trigger == null) { - throw new InvalidParamException(ErrorCode.ILLEGAL_PARAM); + if (notifyTime < DateUtils.currentTimeSeconds()) { + throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); } - QuartzJob job = - QuartzJobBuilder.newJob().withIdentity(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .forJob(SimpleJobExecutor.class) - .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) - .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) - .withTrigger(trigger) - .build(); + QuartzTrigger notifyTrigger = QuartzTriggerBuilder.newTrigger() + .ofType(QuartzTrigger.TriggerType.SIMPLE) + .withIdentity(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) + .startAt(Date.from(Instant.ofEpochSecond(notifyTime))).withRepeatCount(0).withIntervalInHours(1) + .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) + .build(); + + QuartzJob notifyJob = QuartzJobBuilder.newJob() + .withIdentity(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) + .forJob(NotifyJobExecutor.class) + .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) + .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) + .withTrigger(notifyTrigger) + .build(); try { - quartzTaskHandler - .deleteJob(JobKey.jobKey(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - quartzTaskHandler.addJob(job); + quartzTaskHandler.deleteJob( + JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); + quartzTaskHandler.addJob(notifyJob); } catch (SchedulerException e) { log.error("Error while add job to quartz!", e); throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); } - - if (cronJobInfo.getNotifyOffset() > 0) { - long notifyTime = 0L; - if (StringUtils.isNotBlank(cronJobInfo.getCronExpression())) { - if (cronJobInfo.getEndTime() > 0) { - notifyTime = cronJobInfo.getEndTime() - cronJobInfo.getNotifyOffset(); - } - } else { - notifyTime = cronJobInfo.getExecuteTime() - cronJobInfo.getNotifyOffset(); - } - if (notifyTime < DateUtils.currentTimeSeconds()) { - throw new FailedPreconditionException(ErrorCode.END_TIME_OR_NOTIFY_TIME_ALREADY_PASSED); - } - - QuartzTrigger notifyTrigger = QuartzTriggerBuilder.newTrigger() - .ofType(QuartzTrigger.TriggerType.SIMPLE) - .withIdentity(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .startAt(Date.from(Instant.ofEpochSecond(notifyTime))).withRepeatCount(0).withIntervalInHours(1) - .withMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) - .build(); - - QuartzJob notifyJob = QuartzJobBuilder.newJob() - .withIdentity(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId)) - .forJob(NotifyJobExecutor.class) - .usingJobData(CronConstants.JOB_DATA_KEY_APP_ID_STR, String.valueOf(appId)) - .usingJobData(CronConstants.JOB_DATA_KEY_CRON_JOB_ID_STR, String.valueOf(cronJobId)) - .withTrigger(notifyTrigger) - .build(); - - try { - quartzTaskHandler.deleteJob( - JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - quartzTaskHandler.addJob(notifyJob); - } catch (SchedulerException e) { - log.error("Error while add job to quartz!", e); - throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); - } - } else { - try { - quartzTaskHandler.deleteJob( - JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - } catch (SchedulerException e) { - log.error("Error while add job to quartz!", e); - throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); - } + } else { + try { + quartzTaskHandler.deleteJob( + JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); + } catch (SchedulerException e) { + log.error("Error while add job to quartz!", e); + throw new InternalException("Add to quartz failed!", e, ErrorCode.INTERNAL_ERROR); } - return true; - } catch (ServiceException e) { - deleteJob(appId, cronJobId); - log.debug("Error while schedule job", e); - throw e; - } catch (Exception e) { - deleteJob(appId, cronJobId); - log.error("Unknown exception while process cron status change!", e); - throw new InternalException(ErrorCode.UPDATE_CRON_JOB_FAILED); - } finally { - LockUtils.releaseDistributedLock(lockKey, JobContextUtil.getRequestId()); } - } else { - throw new InternalException(ErrorCode.ACQUIRE_CRON_JOB_LOCK_FAILED); - } - } - - private void checkCronRelatedPlan(Long appId, Long taskPlanId) throws ServiceException { - if (taskPlanService.getPlanBasicInfoById(appId, taskPlanId) == null) { - throw new NotFoundException(ErrorCode.TASK_PLAN_NOT_EXIST); + return true; + } catch (ServiceException e) { + informAllToDeleteJobFromQuartz(appId, cronJobId); + log.debug("Error while schedule job", e); + throw e; + } catch (Exception e) { + informAllToDeleteJobFromQuartz(appId, cronJobId); + log.error("Unknown exception while process cron status change!", e); + throw new InternalException(ErrorCode.UPDATE_CRON_JOB_FAILED); } } - private boolean deleteJob(long appId, long cronJobId) { + @Override + public boolean deleteJobFromQuartz(long appId, long cronJobId) { if (appId <= 0 || cronJobId <= 0) { return false; } - String lockKey = appId + ":" + cronJobId; - if (LockUtils.tryGetDistributedLock(lockKey, JobContextUtil.getRequestId(), 60_000)) { - try { - quartzTaskHandler.deleteJob(JobKey.jobKey(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - quartzTaskHandler - .deleteJob(JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); - return true; - } catch (SchedulerException e) { - log.error("Error while delete job!", e); - } finally { - LockUtils.releaseDistributedLock(lockKey, JobContextUtil.getRequestId()); - } + try { + quartzTaskHandler.deleteJob(JobKey.jobKey(getJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); + quartzTaskHandler + .deleteJob(JobKey.jobKey(getNotifyJobName(appId, cronJobId), getJobGroup(appId, cronJobId))); + return true; + } catch (SchedulerException e) { + log.error("Error while delete job!", e); } return false; } + + @Override + public List listEnabledCronBasicInfoForUpdate(int start, int limit) { + return cronJobDAO.listEnabledCronBasicInfoForUpdate(start, limit); + } } diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/task/ScheduledTasks.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/task/ScheduledTasks.java new file mode 100644 index 0000000000..7d3442389f --- /dev/null +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/task/ScheduledTasks.java @@ -0,0 +1,61 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.crontab.task; + +import com.tencent.bk.job.crontab.service.CronJobLoadingService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Slf4j +@Component("jobCrontabScheduledTasks") +@EnableScheduling +public class ScheduledTasks { + + private final CronJobLoadingService cronJobLoadingService; + + @Autowired + public ScheduledTasks(CronJobLoadingService cronJobLoadingService) { + this.cronJobLoadingService = cronJobLoadingService; + } + + /** + * 每间隔30min更新一次定时任务数据到Quartz内存 + */ + @Scheduled(initialDelay = 5 * 1000, fixedDelay = 30 * 60 * 1000) + public void loadCronToQuartz() { + log.info("loadCronToQuartz"); + long start = System.currentTimeMillis(); + try { + cronJobLoadingService.loadAllCronJob(); + } catch (Exception e) { + log.error("loadCronToQuartz fail", e); + } finally { + log.info("loadCronToQuartz end, duration={}ms", System.currentTimeMillis() - start); + } + } +} diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/AbstractQuartzJobBean.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/AbstractQuartzJobBean.java index dcc3f51441..5016e4bfeb 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/AbstractQuartzJobBean.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/AbstractQuartzJobBean.java @@ -26,6 +26,7 @@ import com.tencent.bk.job.common.redis.util.LockUtils; import com.tencent.bk.job.common.util.JobContextUtil; +import com.tencent.bk.job.common.util.TimeUtil; import com.tencent.bk.job.crontab.metrics.CronMetricsConstants; import com.tencent.bk.job.crontab.metrics.ScheduleMeasureService; import io.micrometer.core.instrument.Tag; @@ -85,17 +86,21 @@ protected void executeInternal(@NotNull JobExecutionContext context) { if (redisLockGotten) { executeInternalInternal(context); } else { - log.warn("{}|Job {} key {} execute aborted. Acquire lock failed!", executeId, name(), - getLockKey(context)); + if (log.isDebugEnabled()) { + log.debug( + "{}|Job {} key {} execute aborted. Acquire lock failed!", + executeId, + name(), + getLockKey(context) + ); + } } - if (log.isDebugEnabled()) { log.debug("{}|Job {} key {} execute finished.", executeId, name(), getLockKey(context)); } } catch (JobExecutionException e) { log.error("fail to executeInternal", e); } finally { - LockUtils.releaseDistributedLock(getLockKey(context), executeId); recordCronTimeConsuming(context, redisLockGotten, startTimeMills); span.end(); } @@ -105,6 +110,17 @@ private void recordCronTimeConsuming(JobExecutionContext context, boolean redisLockGotten, long startTimeMills) { long timeConsumingMills = System.currentTimeMillis() - startTimeMills; + String timeFormat = "yyyy-MM-dd HH:mm:ss.SSS"; + log.info( + "CronJob finished: {}, " + + "redisLockGotten={}, scheduledFireTime={}, fireTime={}, fireDelay={}ms, executeDuration={}ms", + getLockKey(context), + redisLockGotten, + TimeUtil.formatTime(context.getScheduledFireTime().getTime(), timeFormat), + TimeUtil.formatTime(context.getFireTime().getTime(), timeFormat), + context.getFireTime().getTime() - context.getScheduledFireTime().getTime(), + timeConsumingMills + ); scheduleMeasureService.recordCronTimeConsuming( name(), context, diff --git a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/executor/SimpleJobExecutor.java b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/executor/SimpleJobExecutor.java index 174b19a964..c18208c914 100644 --- a/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/executor/SimpleJobExecutor.java +++ b/src/backend/job-crontab/service-job-crontab/src/main/java/com/tencent/bk/job/crontab/timer/executor/SimpleJobExecutor.java @@ -114,6 +114,10 @@ protected void executeInternalInternal(JobExecutionContext context) { if (log.isDebugEnabled()) { log.debug("Get cronjob info return|{}", cronJobInfo); } + if (!cronJobInfo.getEnable()) { + log.error("cronJob {} scheduled unexpectedly, do not execute", cronJobInfo); + return; + } List variables = cronJobInfo.getVariableValue(); List taskVariables = null; diff --git a/support-files/kubernetes/charts/bk-job/templates/job-assemble/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-assemble/configmap.yaml index 5c5958a22d..3302015685 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-assemble/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-assemble/configmap.yaml @@ -40,7 +40,7 @@ data: password: ${rabbitmq-password} {{- end }} virtual-host: {{ include "job.rabbitmq.vhost" . }} - source: task;step;gseTask;resultHandleTaskResume;notifyMsg;callback + source: task;step;gseTask;resultHandleTaskResume;notifyMsg;callback;crontabFanout bindings: handleJobEvent-in-0: destination: task @@ -114,6 +114,16 @@ data: binder: jobCommon consumer: concurrency: 5 + handleCrontabFanoutEvent-in-0: + destination: crontab.fanout + binder: jobCommon + consumer: + concurrency: 5 + crontabFanout-out-0: + destination: crontab.fanout + binder: jobCommon + consumer: + concurrency: 5 rabbit: bindings: handleJobEvent-in-0: @@ -152,8 +162,18 @@ data: callback-out-0: consumer: maxConcurrency: 10 + handleCrontabFanoutEvent-in-0: + consumer: + maxConcurrency: 5 + exchangeType: fanout + crontabFanout-out-0: + producer: + exchangeType: fanout + consumer: + maxConcurrency: 5 + exchangeType: fanout function: - definition: handleJobEvent;handleStepEvent;handleGseTaskEvent;handleResultHandleResumeEvent;handleNotifyMsg;handleCallbackMsg;busConsumer + definition: handleJobEvent;handleStepEvent;handleGseTaskEvent;handleResultHandleResumeEvent;handleNotifyMsg;handleCallbackMsg;busConsumer;handleCrontabFanoutEvent datasource: job-manage: driver-class-name: {{ include "job.jdbcMysqlDriverClass" . }} @@ -267,22 +287,13 @@ data: uri: {{ include "job.mongodb.connect.uri" . | quote }} database: "joblog" quartz: - # 使用数据库存储 - job-store-type: jdbc - jdbc: - # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,手动创建表结构 - initialize-schema: never + job-store-type: MEMORY properties: org: quartz: jobStore: - class: org.springframework.scheduling.quartz.LocalDataSourceJobStore - clusterCheckinInterval: 20000 - driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate - isClustered: true - selectWithLockSQL: SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE - tablePrefix: QRTZ_ - useProperties: true + class: org.quartz.simpl.RAMJobStore + misfireThreshold: 60000 plugin: shutdownhook: class: org.quartz.plugins.management.ShutdownHookPlugin diff --git a/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml index 4e885c02a3..df91a5f51a 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-crontab/configmap.yaml @@ -31,6 +31,46 @@ data: password: ${rabbitmq-password} {{- end }} virtual-host: {{ include "job.rabbitmq.vhost" . }} + jobCrontab: + type: rabbit + environment: + spring: + rabbitmq: + host: {{ include "job.rabbitmq.host" . }} + port: {{ include "job.rabbitmq.port" . }} + username: {{ include "job.rabbitmq.username" . }} + {{ if .Values.externalRabbitMQ.existingPasswordSecret }} + password: {{ .Values.externalRabbitMQ.existingPasswordKey | default "rabbitmq-password" | printf "${%s}" }} + {{- else -}} + password: ${rabbitmq-password} + {{- end }} + virtual-host: {{ include "job.rabbitmq.vhost" . }} + source: crontabFanout + bindings: + handleCrontabFanoutEvent-in-0: + destination: crontab.fanout + binder: jobCrontab + consumer: + concurrency: 5 + crontabFanout-out-0: + destination: crontab.fanout + binder: jobCrontab + consumer: + concurrency: 5 + rabbit: + bindings: + handleCrontabFanoutEvent-in-0: + consumer: + maxConcurrency: 5 + exchangeType: fanout + crontabFanout-out-0: + producer: + exchangeType: fanout + consumer: + maxConcurrency: 5 + exchangeType: fanout + function: + definition: handleCrontabFanoutEvent datasource: job-crontab: driver-class-name: {{ include "job.jdbcMysqlDriverClass" . }} @@ -52,22 +92,13 @@ data: max-wait: 1ms shutdown-timeout: 100ms quartz: - # 使用数据库存储 - job-store-type: jdbc - jdbc: - # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,手动创建表结构 - initialize-schema: never + job-store-type: MEMORY properties: org: quartz: jobStore: - class: org.springframework.scheduling.quartz.LocalDataSourceJobStore - clusterCheckinInterval: 20000 - driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate - isClustered: true - selectWithLockSQL: SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE - tablePrefix: QRTZ_ - useProperties: true + class: org.quartz.simpl.RAMJobStore + misfireThreshold: 60000 plugin: shutdownhook: class: org.quartz.plugins.management.ShutdownHookPlugin diff --git a/support-files/templates/#etc#job#job-assemble#application-assemble.yml b/support-files/templates/#etc#job#job-assemble#application-assemble.yml index 895a7daac8..c8ccc99568 100644 --- a/support-files/templates/#etc#job#job-assemble#application-assemble.yml +++ b/support-files/templates/#etc#job#job-assemble#application-assemble.yml @@ -35,7 +35,7 @@ spring: username: __BK_JOB_RABBITMQ_USERNAME__ password: __BK_JOB_RABBITMQ_PASSWORD__ virtual-host: __BK_JOB_RABBITMQ_VHOST__ - source: task;step;gseTask;resultHandleResume;notifyMsg;callback + source: task;step;gseTask;resultHandleResume;notifyMsg;callback;crontabFanout bindings: handleJobEvent-in-0: destination: task @@ -109,6 +109,16 @@ spring: binder: jobCommon consumer: concurrency: 5 + handleCrontabFanoutEvent-in-0: + destination: crontab.fanout + binder: jobCommon + consumer: + concurrency: 5 + crontabFanout-out-0: + destination: crontab.fanout + binder: jobCommon + consumer: + concurrency: 5 rabbit: bindings: handleJobEvent-in-0: @@ -147,8 +157,18 @@ spring: callback-out-0: consumer: maxConcurrency: 10 + handleCrontabFanoutEvent-in-0: + consumer: + maxConcurrency: 5 + exchangeType: fanout + crontabFanout-out-0: + producer: + exchangeType: fanout + consumer: + maxConcurrency: 5 + exchangeType: fanout function: - definition: handleJobEvent;handleStepEvent;handleGseTaskEvent;handleResultHandleResumeEvent;handleNotifyMsg;handleCallbackMsg;busConsumer + definition: handleJobEvent;handleStepEvent;handleGseTaskEvent;handleResultHandleResumeEvent;handleNotifyMsg;handleCallbackMsg;busConsumer;handleCrontabFanoutEvent datasource: job-manage: driver-class-name: io.opentelemetry.instrumentation.jdbc.OpenTelemetryDriver @@ -264,22 +284,14 @@ spring: mongodb: uri: __BK_JOB_MONGODB_URI__ quartz: - # 使用数据库存储 - job-store-type: jdbc - jdbc: - # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,手动创建表结构 - initialize-schema: never + # 使用内存存储 + job-store-type: MEMORY properties: org: quartz: jobStore: - class: org.springframework.scheduling.quartz.LocalDataSourceJobStore - clusterCheckinInterval: 20000 - driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate - isClustered: true - selectWithLockSQL: SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE - tablePrefix: QRTZ_ - useProperties: true + class: org.quartz.simpl.RAMJobStore + misfireThreshold: 60000 plugin: shutdownhook: class: org.quartz.plugins.management.ShutdownHookPlugin diff --git a/support-files/templates/#etc#job#job-crontab#job-crontab.yml b/support-files/templates/#etc#job#job-crontab#job-crontab.yml index ddd97d5125..159d66a8db 100644 --- a/support-files/templates/#etc#job#job-crontab#job-crontab.yml +++ b/support-files/templates/#etc#job#job-crontab#job-crontab.yml @@ -15,6 +15,42 @@ spring: username: __BK_JOB_RABBITMQ_USERNAME__ password: __BK_JOB_RABBITMQ_PASSWORD__ virtual-host: __BK_JOB_RABBITMQ_VHOST__ + jobCrontab: + type: rabbit + environment: + spring: + rabbitmq: + host: __BK_JOB_RABBITMQ_HOST__ + port: __BK_JOB_RABBITMQ_PORT__ + username: __BK_JOB_RABBITMQ_USERNAME__ + password: __BK_JOB_RABBITMQ_PASSWORD__ + virtual-host: __BK_JOB_RABBITMQ_VHOST__ + source: crontabFanout + bindings: + handleCrontabFanoutEvent-in-0: + destination: crontab.fanout + binder: jobCrontab + consumer: + concurrency: 5 + crontabFanout-out-0: + destination: crontab.fanout + binder: jobCrontab + consumer: + concurrency: 5 + rabbit: + bindings: + handleCrontabFanoutEvent-in-0: + consumer: + maxConcurrency: 5 + exchangeType: fanout + crontabFanout-out-0: + producer: + exchangeType: fanout + consumer: + maxConcurrency: 5 + exchangeType: fanout + function: + definition: handleCrontabFanoutEvent datasource: job-crontab: driver-class-name: io.opentelemetry.instrumentation.jdbc.OpenTelemetryDriver @@ -60,22 +96,14 @@ spring: max-wait: 1ms shutdown-timeout: 100ms quartz: - # 使用数据库存储 - job-store-type: jdbc - jdbc: - # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,手动创建表结构 - initialize-schema: never + # 使用内存存储 + job-store-type: MEMORY properties: org: quartz: jobStore: - class: org.springframework.scheduling.quartz.LocalDataSourceJobStore - clusterCheckinInterval: 20000 - driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate - isClustered: true - selectWithLockSQL: SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE - tablePrefix: QRTZ_ - useProperties: true + class: org.quartz.simpl.RAMJobStore + misfireThreshold: 60000 plugin: shutdownhook: class: org.quartz.plugins.management.ShutdownHookPlugin