diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 523b0c2cde3..bb58ea7dd52 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -82,7 +82,7 @@ private void initScheduleTask() { TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() -> tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> { if (!flatFile.getCompositeFlatFileLock().isLocked()) { - dispatchFlatFile(flatFile); + dispatchFlatFileAsync(flatFile); } }), 30, 10, TimeUnit.SECONDS); } @@ -180,10 +180,6 @@ public void dispatch(DispatchRequest request) { message.release(); flatFile.getCompositeFlatFileLock().unlock(); } - } else { - if (!flatFile.getCompositeFlatFileLock().isLocked()) { - this.dispatchFlatFileAsync(flatFile); - } } } @@ -199,6 +195,11 @@ public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile) { } public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer consumer) { + // Avoid dispatch tasks too much + if (TieredStoreExecutor.dispatchThreadPoolQueue.size() > + TieredStoreExecutor.QUEUE_CAPACITY * 0.75) { + return; + } TieredStoreExecutor.dispatchExecutor.execute(() -> { try { dispatchFlatFile(flatFile); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java index 23f1b01eacd..6eb3478b3d9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java @@ -27,7 +27,7 @@ public class TieredStoreExecutor { - private static final int QUEUE_CAPACITY = 10000; + public static final int QUEUE_CAPACITY = 10000; // Visible for monitor public static BlockingQueue dispatchThreadPoolQueue;