Skip to content

Commit

Permalink
[ISSUE #7974] Add repeatedly read same offset log to find unexpected …
Browse files Browse the repository at this point in the history
…situations (#7975)
  • Loading branch information
lizhimins authored Mar 27, 2024
1 parent 59220d8 commit 093cb84
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
package org.apache.rocketmq.tieredstore.common;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

public class SelectBufferResult {

private final ByteBuffer byteBuffer;
private final long startOffset;
private final int size;
private final long tagCode;
private final AtomicLong accessCount;

public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int size, long tagCode) {
this.startOffset = startOffset;
this.byteBuffer = byteBuffer;
this.size = size;
this.tagCode = tagCode;
this.accessCount = new AtomicLong();
}

public ByteBuffer getByteBuffer() {
Expand All @@ -48,4 +51,8 @@ public int getSize() {
public long getTagCode() {
return tagCode;
}

public AtomicLong getAccessCount() {
return accessCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ private Cache<String, SelectBufferResult> initCache(MessageStoreConfig storeConf

return Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
.expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS)
// Clients may repeatedly request messages at the same offset in tiered storage,
// causing the request queue to become full. Using expire after read or write policy
// to refresh the cache expiration time.
.expireAfterAccess(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS)
.maximumWeight(memoryMaxSize)
// Using the buffer size of messages to calculate memory usage
.weigher((String key, SelectBufferResult buffer) -> buffer.getSize())
Expand All @@ -98,7 +101,15 @@ protected SelectBufferResult getMessageFromCache(FlatMessageFile flatFile, long
SelectBufferResult buffer = this.fetcherCache.getIfPresent(
String.format(CACHE_KEY_FORMAT, mq.getTopic(), mq.getQueueId(), offset));
// return duplicate buffer here
return buffer == null ? null : new SelectBufferResult(
if (buffer == null) {
return null;
}
long count = buffer.getAccessCount().incrementAndGet();
if (count % 1000L == 0L) {
log.warn("MessageFetcher fetch same offset message too many times, " +
"topic={}, queueId={}, offset={}, count={}", mq.getTopic(), mq.getQueueId(), offset, count);
}
return new SelectBufferResult(
buffer.getByteBuffer().asReadOnlyBuffer(), buffer.getStartOffset(), buffer.getSize(), buffer.getTagCode());
}

Expand Down

0 comments on commit 093cb84

Please sign in to comment.