Skip to content

Commit

Permalink
[ISSUE #6881] Fix scheduled messages are replayed bug (#6882)
Browse files Browse the repository at this point in the history
* fix scheduled messages are replayed bug

* scheduledPersistService reset to final and constructed in the constructor
  • Loading branch information
gaoyf authored Jun 11, 2023
1 parent 6eac107 commit 0f1ff25
Showing 1 changed file with 4 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,7 @@ public ScheduleMessageService(final BrokerController brokerController) {
this.brokerController = brokerController;
this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
scheduledPersistService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
scheduledPersistService.scheduleAtFixedRate(() -> {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
}

public static int queueId2DelayLevel(final int queueId) {
Expand Down Expand Up @@ -161,15 +154,13 @@ public void start() {
}
}

this.deliverExecutorService.scheduleAtFixedRate(() -> {
scheduledPersistService.scheduleAtFixedRate(() -> {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}

Expand Down

0 comments on commit 0f1ff25

Please sign in to comment.