From 1a15729ca962d76ffe044f6332ec711b1d7546bc Mon Sep 17 00:00:00 2001 From: Lei Zhiyuan Date: Fri, 30 Aug 2024 13:43:01 +0800 Subject: [PATCH] [ISSUE #8601]When isPopShouldStop hit,unlock queueLockManager (#8602) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix:when isPopShouldStop hit, unlock queueLockManager * fix:when isPopShouldStop hit, unlock queueLockManager * fix: limit rate of appending commit in case of DLedger commit-log Signed-off-by: Zhanhui Li --------- Signed-off-by: Zhanhui Li Co-authored-by: Zhanhui Li --- .../rocketmq/broker/processor/PopMessageProcessor.java | 2 +- .../rocketmq/store/dledger/MessageStoreTestBase.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 47ef8e4013b..5430fdec94d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -540,6 +540,7 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, return future; } + future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; @@ -548,7 +549,6 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, } try { - future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index a21806ffcf6..c4d9f0727b9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.dledger; +import com.google.common.util.concurrent.RateLimiter; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import java.io.File; @@ -122,7 +123,13 @@ protected DefaultMessageStore createMessageStore(String base, boolean createAbor } protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) throws UnknownHostException { + RateLimiter rateLimiter = RateLimiter.create(100); + MessageStoreConfig storeConfig = messageStore.getMessageStoreConfig(); + boolean limitAppendRate = storeConfig.isEnableDLegerCommitLog(); for (int i = 0; i < num; i++) { + if (limitAppendRate) { + rateLimiter.acquire(); + } MessageExtBrokerInner msgInner = buildMessage(); msgInner.setTopic(topic); msgInner.setQueueId(queueId);