diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 196b78f83c2..2a4ace09850 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -92,14 +92,7 @@ public ScheduleMessageService(final BrokerController brokerController) { this.brokerController = brokerController; this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver(); scheduledPersistService = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); - scheduledPersistService.scheduleAtFixedRate(() -> { - try { - ScheduleMessageService.this.persist(); - } catch (Throwable e) { - log.error("scheduleAtFixedRate flush exception", e); - } - }, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); + new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); } public static int queueId2DelayLevel(final int queueId) { @@ -161,15 +154,13 @@ public void start() { } } - this.deliverExecutorService.scheduleAtFixedRate(() -> { + scheduledPersistService.scheduleAtFixedRate(() -> { try { - if (started.get()) { - ScheduleMessageService.this.persist(); - } + ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } - }, 10000, this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); + }, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); } }