diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 4c66696e3c8..3c6b91ec018 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -375,7 +375,8 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo if (high == null || high == -1) { return 0; } - return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(topic, queueId, high, low, timestamp, minPhysicOffset); + return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(topic, queueId, high, low, timestamp, + minPhysicOffset, boundaryType); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java index 0a735ea27c1..c7d35fa8c0c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTable.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -180,10 +181,10 @@ public void destroyCQ(final String topic, final int queueId, WriteBatch writeBat } public long binarySearchInCQByTime(String topic, int queueId, long high, long low, long timestamp, - long minPhysicOffset) throws RocksDBException { - long result = 0; + long minPhysicOffset, BoundaryType boundaryType) throws RocksDBException { + long result = -1L; long targetOffset = -1L, leftOffset = -1L, rightOffset = -1L; - long leftValue = -1L, rightValue = -1L; + long ceiling = high, floor = low; while (high >= low) { long midOffset = low + ((high - low) >>> 1); ByteBuffer byteBuffer = getCQInKV(topic, queueId, midOffset); @@ -209,22 +210,64 @@ public long binarySearchInCQByTime(String topic, int queueId, long high, long lo } else if (storeTime > timestamp) { high = midOffset - 1; rightOffset = midOffset; - rightValue = storeTime; } else { low = midOffset + 1; leftOffset = midOffset; - leftValue = storeTime; } } if (targetOffset != -1) { + // offset next to it might also share the same store-timestamp. + switch (boundaryType) { + case LOWER: { + while (true) { + long nextOffset = targetOffset - 1; + if (nextOffset < floor) { + break; + } + ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset); + long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET); + if (storeTime != timestamp) { + break; + } + targetOffset = nextOffset; + } + break; + } + case UPPER: { + while (true) { + long nextOffset = targetOffset + 1; + if (nextOffset > ceiling) { + break; + } + ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset); + long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET); + if (storeTime != timestamp) { + break; + } + targetOffset = nextOffset; + } + break; + } + default: { + log.warn("Unknown boundary type"); + break; + } + } result = targetOffset; } else { - if (leftValue == -1) { - result = rightOffset; - } else if (rightValue == -1) { - result = leftOffset; - } else { - result = Math.abs(timestamp - leftValue) > Math.abs(timestamp - rightValue) ? rightOffset : leftOffset; + switch (boundaryType) { + case LOWER: { + result = rightOffset; + break; + } + case UPPER: { + result = leftOffset; + break; + } + default: { + log.warn("Unknown boundary type"); + break; + } } } return result;