Skip to content

Commit

Permalink
[ISSUE #8601]When isPopShouldStop hit,unlock queueLockManager (#8602)
Browse files Browse the repository at this point in the history
* fix:when isPopShouldStop hit, unlock queueLockManager

* fix:when isPopShouldStop hit, unlock queueLockManager

* fix: limit rate of appending commit in case of DLedger commit-log

Signed-off-by: Zhanhui Li <[email protected]>

---------

Signed-off-by: Zhanhui Li <[email protected]>
Co-authored-by: Zhanhui Li <[email protected]>
  • Loading branch information
leizhiyuan and lizhanhui committed Aug 30, 2024
1 parent b9d9b3f commit 1a15729
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
return future;
}

future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) {
POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId);
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
Expand All @@ -548,7 +549,6 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
}

try {
future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
true, lockKey, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.dledger;

import com.google.common.util.concurrent.RateLimiter;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
Expand Down Expand Up @@ -122,7 +123,13 @@ protected DefaultMessageStore createMessageStore(String base, boolean createAbor
}

protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) throws UnknownHostException {
RateLimiter rateLimiter = RateLimiter.create(100);
MessageStoreConfig storeConfig = messageStore.getMessageStoreConfig();
boolean limitAppendRate = storeConfig.isEnableDLegerCommitLog();
for (int i = 0; i < num; i++) {
if (limitAppendRate) {
rateLimiter.acquire();
}
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(queueId);
Expand Down

0 comments on commit 1a15729

Please sign in to comment.