From a18cad7826fe4883a7676c50e16527d74f8f8394 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 11 Apr 2023 16:22:17 +0800 Subject: [PATCH 01/11] Fix the issue that expectLogicOffset is greater than currentLogicOffset in consumeQueue build when the message is illegal --- .../org/apache/rocketmq/store/CommitLog.java | 13 ++++- .../apache/rocketmq/store/ConsumeQueue.java | 47 ++++++++++++++----- .../rocketmq/store/DefaultMessageStore.java | 14 +++++- .../apache/rocketmq/store/MessageStore.java | 11 ++++- .../store/dledger/DLedgerCommitLog.java | 9 +++- .../plugin/AbstractPluginMessageStore.java | 9 +++- .../store/queue/BatchConsumeQueue.java | 45 +++++++++++------- .../store/queue/ConsumeQueueInterface.java | 10 +++- .../store/queue/ConsumeQueueStore.java | 13 +++-- ...Assigner.java => QueueOffsetOperator.java} | 37 +++++++++------ .../rocketmq/store/MultiDispatchTest.java | 2 +- 11 files changed, 150 insertions(+), 60 deletions(-) rename store/src/main/java/org/apache/rocketmq/store/queue/{QueueOffsetAssigner.java => QueueOffsetOperator.java} (73%) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d7e141d31c5..75b4042dc32 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -824,7 +824,7 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke needAssignOffset = false; } if (needAssignOffset) { - defaultMessageStore.assignOffset(msg, getMessageNum(msg)); + defaultMessageStore.assignOffset(msg); } PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); @@ -892,6 +892,10 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke } finally { putMessageLock.unlock(); } + // Increase queue offset when messages are successfully written + if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { + this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg)); + } } finally { topicQueueLock.unlock(topicQueueKey); } @@ -990,7 +994,7 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc topicQueueLock.lock(topicQueueKey); try { - defaultMessageStore.assignOffset(messageExtBatch, (short) putMessageContext.getBatchSize()); + defaultMessageStore.assignOffset(messageExtBatch); putMessageLock.lock(); try { @@ -1041,6 +1045,11 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc } finally { putMessageLock.unlock(); } + + // Increase queue offset when messages are successfully written + if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { + this.defaultMessageStore.increaseOffset(messageExtBatch, (short) putMessageContext.getBatchSize()); + } } finally { topicQueueLock.unlock(topicQueueKey); } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index d1c24ee35fb..76cf468dc2a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -38,7 +38,7 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.queue.FileQueueLifeCycle; -import org.apache.rocketmq.store.queue.QueueOffsetAssigner; +import org.apache.rocketmq.store.queue.QueueOffsetOperator; import org.apache.rocketmq.store.queue.ReferredIterator; public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { @@ -54,8 +54,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { * │ Store Unit │ * │ │ * - * ConsumeQueue's store unit. Size: - * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes + * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes */ public static final int CQ_STORE_UNIT_SIZE = 20; public static final int MSG_TAG_OFFSET_INDEX = 12; @@ -785,10 +784,9 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String } @Override - public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, - short messageNum) { + public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { String topicQueueKey = getTopic() + "-" + getQueueId(); - long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum); + long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey); msg.setQueueOffset(queueOffset); // For LMQ if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { @@ -803,7 +801,7 @@ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageEx for (int i = 0; i < queues.length; i++) { String key = queueKey(queues[i], msg); if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - queueOffsets[i] = queueOffsetAssigner.assignLmqOffset(key, (short) 1); + queueOffsets[i] = queueOffsetOperator.getLmqOffset(key); } } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, @@ -811,6 +809,29 @@ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageEx removeWaitStorePropertyString(msg); } + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, + short messageNum) { + String topicQueueKey = getTopic() + "-" + getQueueId(); + queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); + + // For LMQ + if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + return; + } + String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + if (StringUtils.isBlank(multiDispatchQueue)) { + return; + } + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msg); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + queueOffsetOperator.increaseLmqOffset(key, (short) 1); + } + } + } + public String queueKey(String queueName, MessageExtBrokerInner msgInner) { StringBuilder keyBuilder = new StringBuilder(); keyBuilder.append(queueName); @@ -968,7 +989,7 @@ private class ConsumeQueueIterator implements ReferredIterator { private int relativePos = 0; public ConsumeQueueIterator(SelectMappedBufferResult sbr) { - this.sbr = sbr; + this.sbr = sbr; if (sbr != null && sbr.getByteBuffer() != null) { relativePos = sbr.getByteBuffer().position(); } @@ -988,11 +1009,11 @@ public CqUnit next() { if (!hasNext()) { return null; } - long queueOffset = (sbr.getStartOffset() + sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE; + long queueOffset = (sbr.getStartOffset() + sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE; CqUnit cqUnit = new CqUnit(queueOffset, - sbr.getByteBuffer().getLong(), - sbr.getByteBuffer().getInt(), - sbr.getByteBuffer().getLong()); + sbr.getByteBuffer().getLong(), + sbr.getByteBuffer().getInt(), + sbr.getByteBuffer().getLong()); if (isExtAddr(cqUnit.getTagsCode())) { ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); @@ -1003,7 +1024,7 @@ public CqUnit next() { } else { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content! addr={}, offsetPy={}, sizePy={}, topic={}", - cqUnit.getTagsCode(), cqUnit.getPos(), cqUnit.getPos(), getTopic()); + cqUnit.getTagsCode(), cqUnit.getPos(), cqUnit.getPos(), getTopic()); } } return cqUnit; diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index e51132bbf23..7d9cfd865c7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -2039,11 +2039,21 @@ public boolean isSyncMaster() { } @Override - public void assignOffset(MessageExtBrokerInner msg, short messageNum) { + public void assignOffset(MessageExtBrokerInner msg) { final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - this.consumeQueueStore.assignQueueOffset(msg, messageNum); + this.consumeQueueStore.assignQueueOffset(msg); + } + } + + + @Override + public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { + final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); + + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + this.consumeQueueStore.increaseQueueOffset(msg, messageNum); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index f77739fc475..9730aef4f56 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -728,13 +728,20 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma boolean isSyncMaster(); /** - * Assign an queue offset and increase it. If there is a race condition, you need to lock/unlock this method + * Assign a message to queue offset. If there is a race condition, you need to lock/unlock this method * yourself. * * @param msg message + */ + void assignOffset(MessageExtBrokerInner msg); + + /** + * Increase queue offset in memory table. If there is a race condition, you need to lock/unlock this method + * + * @param msg message * @param messageNum message num */ - void assignOffset(MessageExtBrokerInner msg, short messageNum); + void increaseOffset(MessageExtBrokerInner msg, short messageNum); /** * get all topic config diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 39906eae094..ec5e86d704d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -435,7 +435,7 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId(); topicQueueLock.lock(topicQueueKey); try { - defaultMessageStore.assignOffset(msg, getMessageNum(msg)); + defaultMessageStore.assignOffset(msg); encodeResult = this.messageSerializer.serialize(msg); if (encodeResult.status != AppendMessageStatus.PUT_OK) { @@ -475,6 +475,8 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); } + + defaultMessageStore.increaseOffset(msg, getMessageNum(msg)); } finally { topicQueueLock.unlock(topicQueueKey); } @@ -556,7 +558,7 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess int batchNum = encodeResult.batchData.size(); topicQueueLock.lock(encodeResult.queueOffsetKey); try { - defaultMessageStore.assignOffset(messageExtBatch, (short) batchNum); + defaultMessageStore.assignOffset(messageExtBatch); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config msgIdBuilder.setLength(0); @@ -616,6 +618,9 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); } + + defaultMessageStore.increaseOffset(messageExtBatch, (short) batchNum); + } finally { topicQueueLock.unlock(encodeResult.queueOffsetKey); } diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 3f43adc12d8..e5eeb14e3cc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -591,8 +591,13 @@ public boolean isSyncMaster() { } @Override - public void assignOffset(MessageExtBrokerInner msg, short messageNum) { - next.assignOffset(msg, messageNum); + public void assignOffset(MessageExtBrokerInner msg) { + next.assignOffset(msg); + } + + @Override + public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { + next.increaseOffset(msg, messageNum); } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index e60f09bce27..07f550294b3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -54,9 +54,8 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy * │ Store Unit │ * │ │ * - * BatchConsumeQueue's store unit. Size: - * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Store time(8) + - * msgBaseOffset(8) + batchSize(2) + compactedOffset(4) + reserved(4)= 46 Bytes + * BatchConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Store + * time(8) + msgBaseOffset(8) + batchSize(2) + compactedOffset(4) + reserved(4)= 46 Bytes */ public static final int CQ_STORE_UNIT_SIZE = 46; public static final int MSG_TAG_OFFSET_INDEX = 12; @@ -353,7 +352,7 @@ public boolean isFirstFileExist() { @Override public void truncateDirtyLogicFiles(long phyOffset) { - long oldMinOffset = minOffsetInQueue; + long oldMinOffset = minOffsetInQueue; long oldMaxOffset = maxOffsetInQueue; int logicFileSize = this.mappedFileSize; @@ -515,10 +514,10 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { } @Override - public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) { + public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { String topicQueueKey = getTopic() + "-" + getQueueId(); - long queueOffset = queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum); + long queueOffset = queueOffsetOperator.getBatchQueueOffset(topicQueueKey); if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(queueOffset)); @@ -527,7 +526,15 @@ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageEx msg.setQueueOffset(queueOffset); } - public boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime, + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, + short messageNum) { + String topicQueueKey = getTopic() + "-" + getQueueId(); + queueOffsetOperator.increaseBatchQueueOffset(topicQueueKey, messageNum); + } + + public boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, + final long storeTime, final long msgBaseOffset, final short batchSize) { if (offset <= this.maxMsgPhyOffsetInCommitLog) { @@ -611,8 +618,9 @@ private static int ceil(int pos) { } /** - * Gets SelectMappedBufferResult by batch-message offset - * Node: the caller is responsible for the release of SelectMappedBufferResult + * Gets SelectMappedBufferResult by batch-message offset Node: the caller is responsible for the release of + * SelectMappedBufferResult + * * @param msgOffset * @return SelectMappedBufferResult */ @@ -696,6 +704,7 @@ public MappedFile searchOffsetFromFiles(long msgOffset) { /** * Find the message whose timestamp is the smallest, greater than or equal to the given time. + * * @param timestamp * @return */ @@ -794,8 +803,8 @@ private MappedFile searchTimeFromFiles(long timestamp) { } } else { //The max timestamp of this file is smaller than the given timestamp, so double check the previous file - if (i + 1 <= mappedFileNum - 1) { - mappedFile = mappedFileQueue.getMappedFiles().get(i + 1); + if (i + 1 <= mappedFileNum - 1) { + mappedFile = mappedFileQueue.getMappedFiles().get(i + 1); targetBcq = mappedFile; break; } else { @@ -809,10 +818,11 @@ private MappedFile searchTimeFromFiles(long timestamp) { } /** - * Find the offset of which the value is equal or larger than the given targetValue. - * If there are many values equal to the target, then find the earliest one. + * Find the offset of which the value is equal or larger than the given targetValue. If there are many values equal + * to the target, then find the earliest one. */ - public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift, + public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, + final int unitShift, long targetValue) { int mid = -1; while (left <= right) { @@ -830,7 +840,7 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, if (tmpValue >= targetValue) { return mid; } else { - left = mid + unitSize; + left = mid + unitSize; } } else { //mid is actually in the mid @@ -845,8 +855,8 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, } /** - * Here is vulnerable, the min value of the bytebuffer must be smaller or equal then the given value. - * Otherwise it may get -1 + * Here is vulnerable, the min value of the bytebuffer must be smaller or equal then the given value. Otherwise it + * may get -1 */ protected int binarySearch(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift, long targetValue) { @@ -989,6 +999,7 @@ public long rollNextFile(long nextBeginOffset) { /** * Batch msg offset (deep logic offset) + * * @return max deep offset */ @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java index 76242a5e3b3..e09d1304e36 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java @@ -143,9 +143,17 @@ public interface ConsumeQueueInterface { * Assign queue offset. * @param queueOffsetAssigner the delegated queue offset assigner * @param msg message itself + */ + void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg); + + + /** + * increase queue offset. + * @param queueOffsetAssigner the delegated queue offset assigner + * @param msg message itself * @param messageNum message number */ - void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum); + void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum); /** * Estimate number of records matching given filter. diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 90f2e74aadb..6031b815bfa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -61,7 +61,7 @@ public class ConsumeQueueStore { protected final DefaultMessageStore messageStore; protected final MessageStoreConfig messageStoreConfig; - protected final QueueOffsetAssigner queueOffsetAssigner = new QueueOffsetAssigner(); + protected final QueueOffsetOperator queueOffsetAssigner = new QueueOffsetOperator(); protected final ConcurrentMap> consumeQueueTable; // Should be careful, do not change the topic config @@ -95,7 +95,7 @@ public void correctMinOffset(ConsumeQueueInterface consumeQueue, long minCommitL * Apply the dispatched request and build the consume queue. This function should be idempotent. * * @param consumeQueue consume queue - * @param request dispatch request + * @param request dispatch request */ public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) { consumeQueue.putMessagePositionInfoWrapper(request); @@ -386,9 +386,14 @@ public void setBatchTopicQueueTable(ConcurrentMap batchTopicQueueT this.queueOffsetAssigner.setBatchTopicQueueTable(batchTopicQueueTable); } - public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum) { + public void assignQueueOffset(MessageExtBrokerInner msg) { ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId()); - consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum); + consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg); + } + + public void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum) { + ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId()); + consumeQueue.increaseQueueOffset(this.queueOffsetAssigner, msg, messageNum); } public void updateQueueOffset(String topic, int queueId, long offset) { diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java similarity index 73% rename from store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java rename to store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java index fe8586f6ddf..128e0f1ed42 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java @@ -28,35 +28,44 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; /** - * QueueOffsetAssigner is a component for assigning offsets for queues. + * QueueOffsetOperator is a component for operating offsets for queues. */ -public class QueueOffsetAssigner { +public class QueueOffsetOperator { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private ConcurrentMap topicQueueTable = new ConcurrentHashMap<>(1024); private ConcurrentMap batchTopicQueueTable = new ConcurrentHashMap<>(1024); private ConcurrentMap lmqTopicQueueTable = new ConcurrentHashMap<>(1024); - public long assignQueueOffset(String topicQueueKey, short messageNum) { - Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L); - this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum); - return queueOffset; + public long getQueueOffset(String topicQueueKey) { + return ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L); + } + + public void increaseQueueOffset(String topicQueueKey, short messageNum) { + Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); + topicQueueTable.put(topicQueueKey, queueOffset + messageNum); } public void updateQueueOffset(String topicQueueKey, long offset) { this.topicQueueTable.put(topicQueueKey, offset); } - public long assignBatchQueueOffset(String topicQueueKey, short messageNum) { - Long topicOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); - this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum); - return topicOffset; + public long getBatchQueueOffset(String topicQueueKey) { + return ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); + } + + public void increaseBatchQueueOffset(String topicQueueKey, short messageNum) { + Long batchQueueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); + this.batchTopicQueueTable.put(topicQueueKey, batchQueueOffset + messageNum); + } + + public long getLmqOffset(String topicQueueKey) { + return ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L); } - public long assignLmqOffset(String topicQueueKey, short messageNum) { - Long topicOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L); - this.lmqTopicQueueTable.put(topicQueueKey, topicOffset + messageNum); - return topicOffset; + public void increaseLmqOffset(String topicQueueKey, short messageNum) { + Long lmqOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L); + this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum); } public long currentQueueOffset(String topicQueueKey) { diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java index daa17eef88e..6003303557a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -78,7 +78,7 @@ public void queueKey() { @Test public void wrapMultiDispatch() { MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue(); - messageStore.assignOffset(messageExtBrokerInner, (short) 1); + messageStore.assignOffset(messageExtBrokerInner); assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0"); } From b7d9bf5837fa77a9dc1416501517d9eb6f8d70d0 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 11 Apr 2023 17:20:19 +0800 Subject: [PATCH 02/11] Add new UT --- .../store/queue/QueueOffsetOperator.java | 3 +- .../store/DefaultMessageStoreTest.java | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java index 128e0f1ed42..a25a4f5dc4d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java @@ -69,7 +69,8 @@ public void increaseLmqOffset(String topicQueueKey, short messageNum) { } public long currentQueueOffset(String topicQueueKey) { - return this.topicQueueTable.get(topicQueueKey); + Long currentQueueOffset = this.topicQueueTable.get(topicQueueKey); + return currentQueueOffset == null ? 0L : currentQueueOffset; } public long currentBatchQueueOffset(String topicQueueKey) { diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 46751571666..aeb3ab17f64 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -29,6 +29,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -63,6 +64,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @RunWith(MockitoJUnitRunner.class) @@ -369,6 +371,17 @@ public void testGetStoreTime_PhyOffsetIsLessThanCommitLogMinOffset() { assertThat(storeTime).isEqualTo(-1); } + @Test + public void testPutMessage_whenMessagePropertyIsTooLong() { + String topicName = "messagePropertyIsTooLongTest"; + MessageExtBrokerInner illegalMessage = buildSpecifyLengthPropertyMessage("123".getBytes(StandardCharsets.UTF_8), topicName, Short.MAX_VALUE + 1); + assertEquals(messageStore.putMessage(illegalMessage).getPutMessageStatus(), PutMessageStatus.PROPERTIES_SIZE_EXCEEDED); + assertEquals(0L, messageStore.getQueueStore().getMaxOffset(topicName, 0).longValue()); + MessageExtBrokerInner normalMessage = buildSpecifyLengthPropertyMessage("123".getBytes(StandardCharsets.UTF_8), topicName, 100); + assertEquals(messageStore.putMessage(normalMessage).getPutMessageStatus(), PutMessageStatus.PUT_OK); + assertEquals(1L, messageStore.getQueueStore().getMaxOffset(topicName, 0).longValue()); + } + private DefaultMessageStore getDefaultMessageStore() { return (DefaultMessageStore) this.messageStore; } @@ -437,6 +450,26 @@ private MessageExtBrokerInner buildMessage(byte[] messageBody, String topic) { return msg; } + private MessageExtBrokerInner buildSpecifyLengthPropertyMessage(byte[] messageBody, String topic, int length) { + StringBuilder stringBuilder = new StringBuilder(); + Random random = new Random(); + for (int i = 0; i < length; i++) { + stringBuilder.append(random.nextInt(10)); + } + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.putUserProperty("test", stringBuilder.toString()); + msg.setTopic(topic); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(messageBody); + msg.setQueueId(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(storeHost); + msg.setBornHost(bornHost); + msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); + return msg; + } + private MessageExtBrokerInner buildIPv6HostMessage(byte[] messageBody, String topic) { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic(topic); From 8fa3f393f489d429dc7ede48d2fe40c362126dd6 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 11 Apr 2023 17:40:41 +0800 Subject: [PATCH 03/11] Fix bug that UT can not pass --- .../store/queue/QueueOffsetOperator.java | 2 +- .../DefaultMessageStoreCleanFilesTest.java | 35 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java index a25a4f5dc4d..bfb56ea7f7c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java @@ -42,7 +42,7 @@ public long getQueueOffset(String topicQueueKey) { } public void increaseQueueOffset(String topicQueueKey, short messageNum) { - Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); + Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L); topicQueueTable.put(topicQueueKey, queueOffset + messageNum); } diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 601d50c0f52..7d7562bb692 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -118,8 +118,6 @@ public void testIsSpaceFullMultiCommitLogStorePath() throws Exception { assertEquals(3, paths.length); initMessageStore(config, diskSpaceCleanForciblyRatio); - - // build and put 55 messages, exactly one message per CommitLog file. buildAndPutMessagesToMessageStore(msgCount); MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog(); @@ -172,6 +170,8 @@ public void testDeleteExpiredFilesByTimeUp() throws Exception { MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + + int fileCountIndexFile = getFileCountIndexFile(); assertEquals(fileCountIndexFile, getIndexFileList().size()); @@ -273,8 +273,8 @@ public void testDeleteFilesImmediatelyBySpaceFull() throws Exception { // magic code 10 reference to MappedFileQueue#DELETE_FILES_BATCH_MAX for (int a = 1, fileCount = fileCountCommitLog; - a <= (int) Math.ceil((double) fileCountCommitLog / 10) && fileCount >= 10; - a++, fileCount -= 10) { + a <= (int) Math.ceil((double) fileCountCommitLog / 10) && fileCount >= 10; + a++, fileCount -= 10) { cleanCommitLogService.run(); cleanConsumeQueueService.run(); @@ -339,28 +339,28 @@ public void testDeleteExpiredFilesManually() throws Exception { } private DefaultMessageStore.CleanCommitLogService getCleanCommitLogService() - throws Exception { + throws Exception { Field serviceField = messageStore.getClass().getDeclaredField("cleanCommitLogService"); serviceField.setAccessible(true); DefaultMessageStore.CleanCommitLogService cleanCommitLogService = - (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore); + (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore); serviceField.setAccessible(false); return cleanCommitLogService; } private DefaultMessageStore.CleanConsumeQueueService getCleanConsumeQueueService() - throws Exception { + throws Exception { Field serviceField = messageStore.getClass().getDeclaredField("cleanConsumeQueueService"); serviceField.setAccessible(true); DefaultMessageStore.CleanConsumeQueueService cleanConsumeQueueService = - (DefaultMessageStore.CleanConsumeQueueService) serviceField.get(messageStore); + (DefaultMessageStore.CleanConsumeQueueService) serviceField.get(messageStore); serviceField.setAccessible(false); return cleanConsumeQueueService; } private MappedFileQueue getMappedFileQueueConsumeQueue() - throws Exception { + throws Exception { ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueueTable().get(topic).get(queueId); Field queueField = consumeQueue.getClass().getDeclaredField("mappedFileQueue"); queueField.setAccessible(true); @@ -452,8 +452,9 @@ private void expireFiles(MappedFileQueue commitLogQueue, int expireCount) { } } - private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception { - initMessageStore(genMessageStoreConfig(deleteWhen,diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio); + private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, + double diskSpaceCleanForciblyRatio) throws Exception { + initMessageStore(genMessageStoreConfig(deleteWhen, diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio); } private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxUsedSpaceRatio) { @@ -474,16 +475,17 @@ private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxU messageStoreConfig.setDiskMaxUsedSpaceRatio(diskMaxUsedSpaceRatio); String storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator - + "DefaultMessageStoreCleanFilesTest-" + UUID.randomUUID(); + + "DefaultMessageStoreCleanFilesTest-" + UUID.randomUUID(); String storePathCommitLog = storePathRootDir + File.separator + "commitlog"; messageStoreConfig.setStorePathRootDir(storePathRootDir); messageStoreConfig.setStorePathCommitLog(storePathCommitLog); return messageStoreConfig; } - private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception { + private void initMessageStore(MessageStoreConfig messageStoreConfig, + double diskSpaceCleanForciblyRatio) throws Exception { messageStore = new DefaultMessageStore(messageStoreConfig, - new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig()); + new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig()); cleanCommitLogService = getCleanCommitLogService(); cleanConsumeQueueService = getCleanConsumeQueueService(); @@ -499,7 +501,8 @@ private void initMessageStore(MessageStoreConfig messageStoreConfig, double disk putFiledBackToMessageStore(cleanCommitLogService); } - private void putFiledBackToMessageStore(DefaultMessageStore.CleanCommitLogService cleanCommitLogService) throws Exception { + private void putFiledBackToMessageStore( + DefaultMessageStore.CleanCommitLogService cleanCommitLogService) throws Exception { Field cleanCommitLogServiceField = DefaultMessageStore.class.getDeclaredField("cleanCommitLogService"); cleanCommitLogServiceField.setAccessible(true); cleanCommitLogServiceField.set(messageStore, cleanCommitLogService); @@ -509,7 +512,7 @@ private void putFiledBackToMessageStore(DefaultMessageStore.CleanCommitLogServic private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map properties) { + byte[] filterBitMap, Map properties) { } } From 3c865e27279f6d101d7ed0c7f087d4833e70dbfa Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Wed, 12 Apr 2023 19:40:02 +0800 Subject: [PATCH 04/11] Fix the comment --- .../org/apache/rocketmq/store/queue/BatchConsumeQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 07f550294b3..d6145bb82fe 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -818,8 +818,8 @@ private MappedFile searchTimeFromFiles(long timestamp) { } /** - * Find the offset of which the value is equal or larger than the given targetValue. If there are many values equal - * to the target, then find the earliest one. + * Find the offset of which the value is equal or larger than the given targetValue. + * If there are multiple values equal to the target, return the earliest one. */ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift, From 783c62b1e3377a9ccb1af484c46119336b8bc47c Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 17 Apr 2023 15:07:27 +0800 Subject: [PATCH 05/11] Refactor the relevant logic so that encode message is outside the topicQueue lock --- .../org/apache/rocketmq/store/CommitLog.java | 373 ++++++++++-------- .../apache/rocketmq/store/ConsumeQueue.java | 93 +---- .../rocketmq/store/DefaultMessageStore.java | 19 - .../rocketmq/store/MessageExtEncoder.java | 124 +++++- .../apache/rocketmq/store/MessageStore.java | 16 - .../apache/rocketmq/store/MultiDispatch.java | 73 ++++ .../store/dledger/DLedgerCommitLog.java | 205 +++++----- .../plugin/AbstractPluginMessageStore.java | 10 - .../store/queue/BatchConsumeQueue.java | 39 +- .../store/queue/ConsumeQueueInterface.java | 17 +- .../store/queue/ConsumeQueueStore.java | 45 ++- .../store/queue/QueueOffsetOperator.java | 8 - .../rocketmq/store/AppendCallbackTest.java | 6 +- .../rocketmq/store/MultiDispatchTest.java | 14 +- .../rocketmq/store/kv/CompactionLogTest.java | 27 +- .../store/queue/SparseConsumeQueueTest.java | 2 +- 16 files changed, 575 insertions(+), 496 deletions(-) create mode 100644 store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 75b4042dc32..68a4487bb98 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -31,12 +31,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -51,6 +53,7 @@ import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.logfile.MappedFile; @@ -77,12 +80,12 @@ public class CommitLog implements Swappable { protected final PutMessageLock putMessageLock; - protected final TopicQueueLock topicQueueLock; - private volatile Set fullStorePaths = Collections.emptySet(); private final FlushDiskWatcher flushDiskWatcher; + protected final MultiDispatch multiDispatch; + protected int commitLogSize; public CommitLog(final DefaultMessageStore messageStore) { @@ -101,22 +104,26 @@ public CommitLog(final DefaultMessageStore messageStore) { this.flushManager = new DefaultFlushManager(); - this.appendMessageCallback = new DefaultAppendMessageCallback(); + this.appendMessageCallback = new DefaultAppendMessageCallback(messageStore.getMessageStoreConfig()); putMessageThreadLocal = new ThreadLocal() { @Override protected PutMessageThreadLocal initialValue() { - return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + return new PutMessageThreadLocal(messageStore.getMessageStoreConfig()); } }; this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); this.flushDiskWatcher = new FlushDiskWatcher(); - this.topicQueueLock = new TopicQueueLock(); + this.multiDispatch = new MultiDispatch(defaultMessageStore); this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); } + public int getCommitLogSize() { + return commitLogSize; + } + public void setFullStorePaths(Set fullStorePaths) { this.fullStorePaths = fullStorePaths; } @@ -744,7 +751,7 @@ public void updateMaxMessageSize(PutMessageThreadLocal putMessageThreadLocal) { // dynamically adjust maxMessageSize, but not support DLedger mode temporarily int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(); if (newMaxMessageSize >= 10 && - putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) { + putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) { putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize); } } @@ -780,11 +787,6 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke msg.setStoreHostAddressV6Flag(); } - PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); - updateMaxMessageSize(putMessageThreadLocal); - String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg); - long elapsedTimeInLock = 0; - MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); long currOffset; @@ -815,89 +817,72 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke } } - topicQueueLock.lock(topicQueueKey); - try { + PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); + updateMaxMessageSize(putMessageThreadLocal); - boolean needAssignOffset = true; - if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() - && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { - needAssignOffset = false; - } - if (needAssignOffset) { - defaultMessageStore.assignOffset(msg); - } + PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); + msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); - PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); - if (encodeResult != null) { - return CompletableFuture.completedFuture(encodeResult); - } - msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); - PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); + if (encodeResult != null) { + return CompletableFuture.completedFuture(encodeResult); + } - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; + String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg); + PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); - // Here settings are stored timestamp, in order to ensure an orderly - // global - if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { - msg.setStoreTimestamp(beginLockTimestamp); - } + long elapsedTimeInLock = 0; + MappedFile unlockMappedFile = null; - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); - } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; - result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); - switch (result.getStatus()) { - case PUT_OK: - onCommitLogAppend(msg, result, mappedFile); - break; - case END_OF_FILE: - onCommitLogAppend(msg, result, mappedFile); - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); - } - result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); - if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { - onCommitLogAppend(msg, result, mappedFile); - } - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); - case UNKNOWN_ERROR: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - default: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - } + // Here settings are stored timestamp, in order to ensure an orderly + if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { + msg.setStoreTimestamp(beginLockTimestamp); + } - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } - // Increase queue offset when messages are successfully written - if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { - this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg)); + if (null == mappedFile) { + log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } + + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); + switch (result.getStatus()) { + case PUT_OK: + onCommitLogAppend(msg, result, mappedFile); + break; + case END_OF_FILE: + onCommitLogAppend(msg, result, mappedFile); + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); + } + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); + if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { + onCommitLogAppend(msg, result, mappedFile); + } + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + default: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + } finally { - topicQueueLock.unlock(topicQueueKey); + beginTimeInLock = 0; + putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { @@ -956,7 +941,6 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas(); boolean needHandleHA = needHandleHA(messageExtBatch); - if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) { if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); @@ -992,66 +976,50 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); - topicQueueLock.lock(topicQueueKey); + putMessageLock.lock(); try { - defaultMessageStore.assignOffset(messageExtBatch); + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; - putMessageLock.lock(); - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; - - // Here settings are stored timestamp, in order to ensure an orderly - // global - messageExtBatch.setStoreTimestamp(beginLockTimestamp); - - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); - } - - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); - switch (result.getStatus()) { - case PUT_OK: - break; - case END_OF_FILE: - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); - } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); - case UNKNOWN_ERROR: - default: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - } + // Here settings are stored timestamp, in order to ensure an orderly + // global + messageExtBatch.setStoreTimestamp(beginLockTimestamp); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } - // Increase queue offset when messages are successfully written - if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { - this.defaultMessageStore.increaseOffset(messageExtBatch, (short) putMessageContext.getBatchSize()); + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); + } + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + case UNKNOWN_ERROR: + default: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; } finally { - topicQueueLock.unlock(topicQueueKey); + beginTimeInLock = 0; + putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { @@ -1656,14 +1624,88 @@ class DefaultAppendMessageCallback implements AppendMessageCallback { // Store the message content private final ByteBuffer msgStoreItemMemory; - DefaultAppendMessageCallback() { + private final MessageStoreConfig messageStoreConfig; + + DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) { this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); + this.messageStoreConfig = messageStoreConfig; + } + + public AppendMessageResult handlePropertiesForSpecialMsg(ByteBuffer preEncodeBuffer, + final MessageExtBrokerInner msgInner, boolean isInnerBatchMsg, boolean isMultiDispatchMsg) { + + if (!isInnerBatchMsg && !isMultiDispatchMsg) { + return null; + } + + if (isInnerBatchMsg) { + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_INNER_BASE, String.valueOf(msgInner.getQueueOffset())); + } + + if (isMultiDispatchMsg) { + multiDispatch.wrapMultiDispatch(msgInner); + } + + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); + } + + int msgLenWithoutProperties = preEncodeBuffer.getInt(0); + + int msgLen = msgLenWithoutProperties + 2 + propertiesLength; + + // Exceeds the maximum message + if (msgLen > this.messageStoreConfig.getMaxMessageSize()) { + log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize()); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + + // Back filling total message length + preEncodeBuffer.putInt(0, msgLen); + + // Modify position to msgLenWithoutProperties + preEncodeBuffer.position(msgLenWithoutProperties); + + preEncodeBuffer.putShort((short) propertiesLength); + + if (propertiesLength > 0) { + preEncodeBuffer.put(propertiesData); + } + + return null; } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
+ final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); + long queueOffset = 0L; + // Transaction messages that require special handling + queueOffset = (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) ? + CommitLog.this.defaultMessageStore.getQueueStore().getQueueOffset(msgInner.getTopic(), msgInner.getQueueId()) : 0L; + + ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); + msgInner.setQueueOffset(queueOffset); + boolean isInnerBatchMsg = isInnerBatchMsg(msgInner); + boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); + AppendMessageResult appendMessageResult = handlePropertiesForSpecialMsg(preEncodeBuffer, msgInner, isInnerBatchMsg, isMultiDispatchMsg); + if (appendMessageResult != null) { + return appendMessageResult; + } + + final int msgLen = preEncodeBuffer.getInt(0); + preEncodeBuffer.position(0); + preEncodeBuffer.limit(msgLen); + // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); @@ -1677,29 +1719,6 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer return UtilAll.bytes2string(msgIdBuffer.array()); }; - // Record ConsumeQueue information - Long queueOffset = msgInner.getQueueOffset(); - - // this msg maybe a inner-batch msg. - short messageNum = getMessageNum(msgInner); - - // Transaction messages that require special handling - final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); - switch (tranType) { - // Prepared and Rollback message is not consumed, will not enter the consume queue - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - queueOffset = 0L; - break; - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - default: - break; - } - - ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); - final int msgLen = preEncodeBuffer.getInt(0); - // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); @@ -1735,6 +1754,19 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer.put(preEncodeBuffer); CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS"); msgInner.setEncodedBuff(null); + + // this msg maybe an inner-batch msg. + short messageNum = getMessageNum(msgInner); + + // transaction messages that require special handling + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + CommitLog.this.defaultMessageStore.getQueueStore().increaseQueueOffset(msgInner.getTopic(), msgInner.getQueueId(), messageNum); + // for lmq + if (isMultiDispatchMsg) { + CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); + } + } + return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum); } @@ -1745,7 +1777,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue information - Long queueOffset = messageExtBatch.getQueueOffset(); + long queueOffset = CommitLog.this.defaultMessageStore.getQueueStore().getQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId()); long beginQueueOffset = queueOffset; int totalMsgLen = 0; int msgNum = 0; @@ -1824,12 +1856,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); result.setMsgNum(msgNum); + CommitLog.this.defaultMessageStore.getQueueStore().increaseQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), (short) msgNum); + return result; } } - class DefaultFlushManager implements FlushManager { private final FlushCommitLogService flushCommitLogService; @@ -1937,10 +1970,6 @@ public void shutdown() { } - public int getCommitLogSize() { - return commitLogSize; - } - public MappedFileQueue getMappedFileQueue() { return mappedFileQueue; } @@ -1966,4 +1995,12 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) { public FlushManager getFlushManager() { return flushManager; } + + public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { + return !StringUtils.isBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + } + + public static boolean isInnerBatchMsg(MessageExtBrokerInner msg) { + return MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 76cf468dc2a..667af64e348 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -26,10 +26,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -731,10 +728,7 @@ private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) { } String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); - if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { - return false; - } - return true; + return !StringUtils.isBlank(multiDispatchQueue) && !StringUtils.isBlank(multiQueueOffset); } private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) { @@ -757,7 +751,6 @@ private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) { doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId); } - return; } private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset, @@ -783,80 +776,6 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String } } - @Override - public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey); - msg.setQueueOffset(queueOffset); - // For LMQ - if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - Long[] queueOffsets = new Long[queues.length]; - for (int i = 0; i < queues.length; i++) { - String key = queueKey(queues[i], msg); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - queueOffsets[i] = queueOffsetOperator.getLmqOffset(key); - } - } - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, - StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); - removeWaitStorePropertyString(msg); - } - - @Override - public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, - short messageNum) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); - - // For LMQ - if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - for (int i = 0; i < queues.length; i++) { - String key = queueKey(queues[i], msg); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - queueOffsetOperator.increaseLmqOffset(key, (short) 1); - } - } - } - - public String queueKey(String queueName, MessageExtBrokerInner msgInner) { - StringBuilder keyBuilder = new StringBuilder(); - keyBuilder.append(queueName); - keyBuilder.append('-'); - int queueId = msgInner.getQueueId(); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { - queueId = 0; - } - keyBuilder.append(queueId); - return keyBuilder.toString(); - } - - private void removeWaitStorePropertyString(MessageExtBrokerInner msgInner) { - if (msgInner.getProperties().containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { - // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. - // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. - String waitStoreMsgOKValue = msgInner.getProperties().remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later - msgInner.getProperties().put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); - } else { - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - } - } - private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { @@ -1127,6 +1046,16 @@ public long getMaxOffsetInQueue() { return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; } + @Override + public long getQueueOffset(QueueOffsetOperator queueOffsetOperator) { + return queueOffsetOperator.getQueueOffset(topic + "-" + queueId); + } + + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, short messageNum) { + queueOffsetOperator.increaseQueueOffset(topic + "-" + queueId, messageNum); + } + @Override public void checkSelf() { mappedFileQueue.checkSelf(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7d9cfd865c7..213f62a3b08 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -2038,25 +2038,6 @@ public boolean isSyncMaster() { return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole(); } - @Override - public void assignOffset(MessageExtBrokerInner msg) { - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - this.consumeQueueStore.assignQueueOffset(msg); - } - } - - - @Override - public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - this.consumeQueueStore.increaseQueueOffset(msg, messageNum); - } - } - @Override public ConcurrentMap getTopicConfigs() { return this.consumeQueueStore.getTopicConfigs(); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index ee609a337bc..9fff8eb14c3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import org.apache.rocketmq.store.config.MessageStoreConfig; public class MessageExtEncoder { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -38,13 +39,17 @@ public class MessageExtEncoder { private int maxMessageBodySize; // The maximum length of the full message. private int maxMessageSize; - public MessageExtEncoder(final int maxMessageBodySize) { + + private MessageStoreConfig messageStoreConfig; + + public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) { ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + this.messageStoreConfig = messageStoreConfig; + this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize(); //Reserve 64kb for encoding buffer outside body int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE; byteBuf = alloc.directBuffer(maxMessageSize); - this.maxMessageBodySize = maxMessageBodySize; this.maxMessageSize = maxMessageSize; } @@ -73,8 +78,103 @@ public static int calMsgLength(MessageVersion messageVersion, + 2 + (Math.max(propertiesLength, 0)); //propertiesLength } + public static int calMsgLengthNoProperties(MessageVersion messageVersion, + int sysFlag, int bodyLength, int topicLength) { + + int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; + int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20; + + return 4 //TOTALSIZE + + 4 //MAGICCODE + + 4 //BODYCRC + + 4 //QUEUEID + + 4 //FLAG + + 8 //QUEUEOFFSET + + 8 //PHYSICALOFFSET + + 4 //SYSFLAG + + 8 //BORNTIMESTAMP + + bornhostLength //BORNHOST + + 8 //STORETIMESTAMP + + storehostAddressLength //STOREHOSTADDRESS + + 4 //RECONSUMETIMES + + 8 //Prepared Transaction Offset + + 4 + (Math.max(bodyLength, 0)) //BODY + + messageVersion.getTopicLengthSize() + topicLength; //TOPIC + } + + public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) { + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + // Exceeds the maximum message body + if (bodyLength > this.maxMessageBodySize) { + CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageBodySize); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength); + + // 1 TOTALSIZE + this.byteBuf.writeInt(msgLenNoProperties); + // 2 MAGICCODE + this.byteBuf.writeInt(msgInner.getVersion().getMagicCode()); + // 3 BODYCRC + this.byteBuf.writeInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.byteBuf.writeInt(msgInner.getQueueId()); + // 5 FLAG + this.byteBuf.writeInt(msgInner.getFlag()); + // 6 QUEUEOFFSET, need update later + this.byteBuf.writeLong(0); + // 7 PHYSICALOFFSET, need update later + this.byteBuf.writeLong(0); + // 8 SYSFLAG + this.byteBuf.writeInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.byteBuf.writeLong(msgInner.getBornTimestamp()); + + // 10 BORNHOST + ByteBuffer bornHostBytes = msgInner.getBornHostBytes(); + this.byteBuf.writeBytes(bornHostBytes.array()); + + // 11 STORETIMESTAMP + this.byteBuf.writeLong(msgInner.getStoreTimestamp()); + + // 12 STOREHOSTADDRESS + ByteBuffer storeHostBytes = msgInner.getStoreHostBytes(); + this.byteBuf.writeBytes(storeHostBytes.array()); + + // 13 RECONSUMETIMES + this.byteBuf.writeInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.byteBuf.writeInt(bodyLength); + if (bodyLength > 0) + this.byteBuf.writeBytes(msgInner.getBody()); + + // 16 TOPIC + if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) { + this.byteBuf.writeShort((short) topicLength); + } else { + this.byteBuf.writeByte((byte) topicLength); + } + this.byteBuf.writeBytes(topicData); + + return null; + } + public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.clear(); + + if (CommitLog.isInnerBatchMsg(msgInner) || messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { + return encodeWithoutProperties(msgInner); + } + /** * Serialize message */ @@ -102,8 +202,6 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } - final long queueOffset = msgInner.getQueueOffset(); - // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength @@ -121,8 +219,8 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.writeInt(msgInner.getQueueId()); // 5 FLAG this.byteBuf.writeInt(msgInner.getFlag()); - // 6 QUEUEOFFSET - this.byteBuf.writeLong(queueOffset); + // 6 QUEUEOFFSET, need update later + this.byteBuf.writeLong(0); // 7 PHYSICALOFFSET, need update later this.byteBuf.writeLong(0); // 8 SYSFLAG @@ -160,8 +258,9 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { // 17 PROPERTIES this.byteBuf.writeShort((short) propertiesLength); - if (propertiesLength > 0) + if (propertiesLength > 0) { this.byteBuf.writeBytes(propertiesData); + } return null; } @@ -230,9 +329,9 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex this.byteBuf.writeInt(messageExtBatch.getQueueId()); // 5 FLAG this.byteBuf.writeInt(flag); - // 6 QUEUEOFFSET + // 6 QUEUEOFFSET, place-holder, need update later this.byteBuf.writeLong(0); - // 7 PHYSICALOFFSET + // 7 PHYSICALOFFSET, place-holder, need update later this.byteBuf.writeLong(0); // 8 SYSFLAG this.byteBuf.writeInt(messageExtBatch.getSysFlag()); @@ -286,7 +385,7 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex } public ByteBuffer getEncoderBuffer() { - return this.byteBuf.nioBuffer(); + return this.byteBuf.nioBuffer(0, this.byteBuf.capacity()); } public int getMaxMessageBodySize() { @@ -304,8 +403,9 @@ public void updateEncoderBufferCapacity(int newMaxMessageBodySize) { static class PutMessageThreadLocal { private final MessageExtEncoder encoder; private final StringBuilder keyBuilder; - PutMessageThreadLocal(int size) { - encoder = new MessageExtEncoder(size); + + PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) { + encoder = new MessageExtEncoder(messageStoreConfig); keyBuilder = new StringBuilder(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 9730aef4f56..8cebf554e5d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -727,22 +727,6 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma */ boolean isSyncMaster(); - /** - * Assign a message to queue offset. If there is a race condition, you need to lock/unlock this method - * yourself. - * - * @param msg message - */ - void assignOffset(MessageExtBrokerInner msg); - - /** - * Increase queue offset in memory table. If there is a race condition, you need to lock/unlock this method - * - * @param msg message - * @param messageNum message num - */ - void increaseOffset(MessageExtBrokerInner msg, short messageNum); - /** * get all topic config * diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java new file mode 100644 index 00000000000..e7d646c12cd --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; + +/** + * MultiDispatch for lmq, not-thread-safe + */ +public class MultiDispatch { + private final StringBuilder keyBuilder = new StringBuilder(); + private final DefaultMessageStore messageStore; + + public MultiDispatch(DefaultMessageStore messageStore) { + this.messageStore = messageStore; + } + + public String queueKey(String queueName, MessageExtBrokerInner msgInner) { + keyBuilder.setLength(0); + keyBuilder.append(queueName); + keyBuilder.append('-'); + int queueId = msgInner.getQueueId(); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + keyBuilder.append(queueId); + return keyBuilder.toString(); + } + + public void wrapMultiDispatch(final MessageExtBrokerInner msg) { + String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + Long[] queueOffsets = new Long[queues.length]; + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msg); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key); + } + } + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, + StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); + } + + + public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) { + String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + for (String queue : queues) { + String key = queueKey(queue, msgInner); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + messageStore.getQueueStore().increaseLmqOffset(key, (short) 1); + } + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index ec5e86d704d..000303afcfa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -264,7 +264,7 @@ public SelectMappedBufferResult getData(final long offset, final boolean returnF return null; } - + @Override public boolean getData(final long offset, final int size, final ByteBuffer byteBuffer) { if (offset < dividedCommitlogOffset) { @@ -432,53 +432,55 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner AppendFuture dledgerFuture; EncodeResult encodeResult; - String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId(); - topicQueueLock.lock(topicQueueKey); + encodeResult = this.messageSerializer.serialize(msg); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); + } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + long elapsedTimeInLock; + long queueOffset; try { - defaultMessageStore.assignOffset(msg); - - encodeResult = this.messageSerializer.serialize(msg); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); - } - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - long elapsedTimeInLock; - long queueOffset; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(msg, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); - AppendEntryRequest request = new AppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.getData()); - dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (dledgerFuture.getPos() == -1) { - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } - long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; - - int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - } catch (Exception e) { - log.error("Put message error", e); - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + queueOffset = getQueueOffsetByKey(msg, tranType); + encodeResult.setQueueOffsetKey(queueOffset, false); + AppendEntryRequest request = new AppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBody(encodeResult.getData()); + dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } + long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); - } + int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - defaultMessageStore.increaseOffset(msg, getMessageNum(msg)); + String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + defaultMessageStore.getQueueStore().increaseQueueOffset(msg.getTopic(), msg.getQueueId(), (short) 1); + break; + default: + break; + } + } catch (Exception e) { + log.error("Put message error", e); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { - topicQueueLock.unlock(topicQueueKey); + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); } return dledgerFuture.thenApply(appendEntryResponse -> { @@ -556,73 +558,64 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess } int batchNum = encodeResult.batchData.size(); - topicQueueLock.lock(encodeResult.queueOffsetKey); + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + msgIdBuilder.setLength(0); + long elapsedTimeInLock; + long queueOffset; + int msgNum = 0; try { - defaultMessageStore.assignOffset(messageExtBatch); - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - msgIdBuilder.setLength(0); - long elapsedTimeInLock; - long queueOffset; - int msgNum = 0; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(messageExtBatch, tranType); - encodeResult.setQueueOffsetKey(queueOffset, true); - BatchAppendEntryRequest request = new BatchAppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBatchMsgs(encodeResult.batchData); - AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (appendFuture.getPos() == -1) { - log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } - dledgerFuture = (BatchAppendFuture) appendFuture; - - long wroteOffset = 0; - - int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - boolean isFirstOffset = true; - long firstWroteOffset = 0; - for (long pos : dledgerFuture.getPositions()) { - wroteOffset = pos + DLedgerEntry.BODY_OFFSET; - if (isFirstOffset) { - firstWroteOffset = wroteOffset; - isFirstOffset = false; - } - String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); - if (msgIdBuilder.length() > 0) { - msgIdBuilder.append(',').append(msgId); - } else { - msgIdBuilder.append(msgId); - } - msgNum++; - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, - msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - appendResult.setMsgNum(msgNum); - } catch (Exception e) { - log.error("Put message error", e); - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + queueOffset = getQueueOffsetByKey(messageExtBatch, tranType); + encodeResult.setQueueOffsetKey(queueOffset, true); + BatchAppendEntryRequest request = new BatchAppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBatchMsgs(encodeResult.batchData); + AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (appendFuture.getPos() == -1) { + log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } + dledgerFuture = (BatchAppendFuture) appendFuture; - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", - elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); - } + long wroteOffset = 0; - defaultMessageStore.increaseOffset(messageExtBatch, (short) batchNum); + int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); + boolean isFirstOffset = true; + long firstWroteOffset = 0; + for (long pos : dledgerFuture.getPositions()) { + wroteOffset = pos + DLedgerEntry.BODY_OFFSET; + if (isFirstOffset) { + firstWroteOffset = wroteOffset; + isFirstOffset = false; + } + String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); + if (msgIdBuilder.length() > 0) { + msgIdBuilder.append(',').append(msgId); + } else { + msgIdBuilder.append(msgId); + } + msgNum++; + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, + msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + appendResult.setMsgNum(msgNum); + defaultMessageStore.getQueueStore().increaseQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), (short) batchNum); + } catch (Exception e) { + log.error("Put message error", e); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { - topicQueueLock.unlock(encodeResult.queueOffsetKey); + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", + elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); } return dledgerFuture.thenApply(appendEntryResponse -> { @@ -708,12 +701,12 @@ public long lockTimeMills() { } private long getQueueOffsetByKey(MessageExtBrokerInner msg, int tranType) { - Long queueOffset = msg.getQueueOffset(); + Long queueOffset = defaultMessageStore.getQueueStore().getQueueOffset(msg.getTopic(), msg.getQueueId()); // Transaction messages that require special handling switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the - // consumer queuec + // consumer queue case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index e5eeb14e3cc..36542f092fa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -590,16 +590,6 @@ public boolean isSyncMaster() { return next.isSyncMaster(); } - @Override - public void assignOffset(MessageExtBrokerInner msg) { - next.assignOffset(msg); - } - - @Override - public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { - next.increaseOffset(msg, messageNum); - } - @Override public Map getTopicConfigs() { return next.getTopicConfigs(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index d6145bb82fe..20b7360279a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -26,11 +26,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DispatchRequest; @@ -421,8 +416,7 @@ public void truncateDirtyLogicFiles(long phyOffset) { @Override public boolean flush(final int flushLeastPages) { - boolean result = this.mappedFileQueue.flush(flushLeastPages); - return result; + return this.mappedFileQueue.flush(flushLeastPages); } @Override @@ -513,26 +507,6 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { this.messageStore.getRunningFlags().makeLogicsQueueError(); } - @Override - public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - - long queueOffset = queueOffsetOperator.getBatchQueueOffset(topicQueueKey); - - if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(queueOffset)); - msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); - } - msg.setQueueOffset(queueOffset); - } - - @Override - public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, - short messageNum) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - queueOffsetOperator.increaseBatchQueueOffset(topicQueueKey, messageNum); - } - public boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime, final long msgBaseOffset, final short batchSize) { @@ -1012,6 +986,17 @@ public long getMinOffsetInQueue() { return minOffsetInQueue; } + @Override + public long getQueueOffset(QueueOffsetOperator queueOffsetOperator) { + return queueOffsetOperator.getBatchQueueOffset(topic + "-" + queueId); + } + + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, + short messageNum) { + queueOffsetOperator.increaseBatchQueueOffset(topic + "-" + queueId, messageNum); + } + @Override public void checkSelf() { mappedFileQueue.checkSelf(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java index e09d1304e36..00ccdc9e809 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.store.queue; import org.apache.rocketmq.common.attribute.CQType; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.MessageFilter; @@ -140,20 +139,20 @@ public interface ConsumeQueueInterface { void putMessagePositionInfoWrapper(DispatchRequest request); /** - * Assign queue offset. - * @param queueOffsetAssigner the delegated queue offset assigner - * @param msg message itself + * Get queue offset. + * @param queueOffsetOperator the delegated queue offset operator + * + * @return queue offset */ - void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg); + long getQueueOffset(QueueOffsetOperator queueOffsetOperator); /** - * increase queue offset. - * @param queueOffsetAssigner the delegated queue offset assigner - * @param msg message itself + * Increase queue offset. + * @param queueOffsetOperator the delegated queue offset operator * @param messageNum message number */ - void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum); + void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, short messageNum); /** * Estimate number of records matching given filter. diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 6031b815bfa..bedb7e265f8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -40,7 +40,6 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -61,7 +60,7 @@ public class ConsumeQueueStore { protected final DefaultMessageStore messageStore; protected final MessageStoreConfig messageStoreConfig; - protected final QueueOffsetOperator queueOffsetAssigner = new QueueOffsetOperator(); + protected final QueueOffsetOperator queueOffsetOperator = new QueueOffsetOperator(); protected final ConcurrentMap> consumeQueueTable; // Should be careful, do not change the topic config @@ -168,7 +167,7 @@ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String top this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(), this.messageStore); } else { - throw new RuntimeException(format("queue type %s is not supported.", cqType.toString())); + throw new RuntimeException(format("queue type %s is not supported.", cqType)); } } @@ -182,14 +181,14 @@ private void queueTypeShouldBe(String topic, CQType cqTypeExpected) { } } - private ExecutorService buildExecutorService(BlockingQueue blockingQueue, String threadNamePrefix) { + private ExecutorService buildExecutorService(BlockingQueue blockingQueue) { return new ThreadPoolExecutor( this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, blockingQueue, - new ThreadFactoryImpl(threadNamePrefix)); + new ThreadFactoryImpl("RecoverConsumeQueueThread_")); } public void recover(ConsumeQueueInterface consumeQueue) { @@ -212,7 +211,7 @@ public boolean recoverConcurrently() { } final CountDownLatch countDownLatch = new CountDownLatch(count); BlockingQueue recoverQueue = new LinkedBlockingQueue<>(); - final ExecutorService executor = buildExecutorService(recoverQueue, "RecoverConsumeQueueThread_"); + final ExecutorService executor = buildExecutorService(recoverQueue); List> result = new ArrayList<>(count); try { for (ConcurrentMap maps : this.consumeQueueTable.values()) { @@ -370,39 +369,47 @@ private ConsumeQueueInterface doFindOrCreateConsumeQueue(String topic, int queue } public Long getMaxOffset(String topic, int queueId) { - return this.queueOffsetAssigner.currentQueueOffset(topic + "-" + queueId); + return this.queueOffsetOperator.currentQueueOffset(topic + "-" + queueId); } public void setTopicQueueTable(ConcurrentMap topicQueueTable) { - this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable); - this.queueOffsetAssigner.setLmqTopicQueueTable(topicQueueTable); + this.queueOffsetOperator.setTopicQueueTable(topicQueueTable); + this.queueOffsetOperator.setLmqTopicQueueTable(topicQueueTable); } public ConcurrentMap getTopicQueueTable() { - return this.queueOffsetAssigner.getTopicQueueTable(); + return this.queueOffsetOperator.getTopicQueueTable(); } public void setBatchTopicQueueTable(ConcurrentMap batchTopicQueueTable) { - this.queueOffsetAssigner.setBatchTopicQueueTable(batchTopicQueueTable); + this.queueOffsetOperator.setBatchTopicQueueTable(batchTopicQueueTable); } - public void assignQueueOffset(MessageExtBrokerInner msg) { - ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId()); - consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg); + public long getQueueOffset(String topic, int queueId) { + ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(topic, queueId); + return consumeQueue.getQueueOffset(queueOffsetOperator); } - public void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum) { - ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId()); - consumeQueue.increaseQueueOffset(this.queueOffsetAssigner, msg, messageNum); + public long getLmqQueueOffset(String topicQueueKey) { + return queueOffsetOperator.getLmqOffset(topicQueueKey); + } + + public void increaseQueueOffset(String topic, int queueId, short messageNum) { + ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(topic, queueId); + consumeQueue.increaseQueueOffset(queueOffsetOperator, messageNum); + } + + public void increaseLmqOffset(String queueKey, short messageNum) { + queueOffsetOperator.increaseLmqOffset(queueKey, messageNum); } public void updateQueueOffset(String topic, int queueId, long offset) { String topicQueueKey = topic + "-" + queueId; - this.queueOffsetAssigner.updateQueueOffset(topicQueueKey, offset); + this.queueOffsetOperator.updateQueueOffset(topicQueueKey, offset); } public void removeTopicQueueTable(String topic, Integer queueId) { - this.queueOffsetAssigner.remove(topic, queueId); + this.queueOffsetOperator.remove(topic, queueId); } public ConcurrentMap> getConsumeQueueTable() { diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java index bfb56ea7f7c..2545bbf523d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java @@ -73,14 +73,6 @@ public long currentQueueOffset(String topicQueueKey) { return currentQueueOffset == null ? 0L : currentQueueOffset; } - public long currentBatchQueueOffset(String topicQueueKey) { - return this.batchTopicQueueTable.get(topicQueueKey); - } - - public long currentLmqOffset(String topicQueueKey) { - return this.lmqTopicQueueTable.get(topicQueueKey); - } - public synchronized void remove(String topic, Integer queueId) { String topicQueueKey = topic + "-" + queueId; // Beware of thread-safety diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index dc1af78b3fd..81c9ca096b6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -43,7 +43,7 @@ public class AppendCallbackTest { AppendMessageCallback callback; - MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024); + MessageExtEncoder batchEncoder; @Before public void init() throws Exception { @@ -52,12 +52,14 @@ public void init() throws Exception { messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024); messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); //too much reference DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig()); CommitLog commitLog = new CommitLog(messageStore); - callback = commitLog.new DefaultAppendMessageCallback(); + callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig); + batchEncoder = new MessageExtEncoder(messageStoreConfig); } @After diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java index 6003303557a..54cae0b0135 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -19,8 +19,8 @@ import java.io.File; import java.net.InetSocketAddress; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageConst; @@ -31,14 +31,13 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MultiDispatchTest { - private ConsumeQueue consumeQueue; + private MultiDispatch multiDispatch; private DefaultMessageStore messageStore; @@ -58,8 +57,7 @@ public void init() throws Exception { BrokerConfig brokerConfig = new BrokerConfig(); //too much reference messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig); - consumeQueue = new ConsumeQueue("xxx", 0, - getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore); + multiDispatch = new MultiDispatch(messageStore); } @After @@ -71,14 +69,14 @@ public void destroy() { public void queueKey() { MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); when(messageExtBrokerInner.getQueueId()).thenReturn(2); - String ret = consumeQueue.queueKey("%LMQ%lmq123", messageExtBrokerInner); + String ret = multiDispatch.queueKey("%LMQ%lmq123", messageExtBrokerInner); assertEquals(ret, "%LMQ%lmq123-0"); } @Test public void wrapMultiDispatch() { MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue(); - messageStore.assignOffset(messageExtBrokerInner); + multiDispatch.wrapMultiDispatch(messageExtBrokerInner); assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0"); } @@ -87,7 +85,7 @@ private MessageExtBrokerInner buildMessageMultiQueue() { msg.setTopic("test"); msg.setTags("TAG1"); msg.setKeys("Hello"); - msg.setBody("aaa".getBytes(Charset.forName("UTF-8"))); + msg.setBody("aaa".getBytes(StandardCharsets.UTF_8)); msg.setKeys(String.valueOf(System.currentTimeMillis())); msg.setQueueId(0); msg.setSysFlag(0); diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java index df3c31c6edf..6cb56509294 100644 --- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java @@ -85,8 +85,7 @@ public class CompactionLogTest { int compactionFileSize = 10240; int compactionCqFileSize = 1024; - - private static MessageExtEncoder encoder = new MessageExtEncoder(1024); + private static MessageExtEncoder encoder = new MessageExtEncoder(new MessageStoreConfig()); private static SocketAddress storeHost; private static SocketAddress bornHost; @@ -123,6 +122,7 @@ public void setUp() throws IOException { static int queueOffset = 0; static int keyCount = 10; + public static ByteBuffer buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic("ctopic"); @@ -134,17 +134,18 @@ public static ByteBuffer buildMessage() { msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(storeHost); msg.setBornHost(bornHost); - msg.setQueueOffset(queueOffset); - queueOffset++; for (int i = 1; i < 3; i++) { msg.putUserProperty(String.valueOf(i), "xxx" + i); } msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); encoder.encode(msg); + ByteBuffer preEncodeBuffer = encoder.getEncoderBuffer(); + int pos = 4 + 4 + 4 + 4 + 4; + preEncodeBuffer.putLong(pos, queueOffset); + queueOffset++; return encoder.getEncoderBuffer(); } - @Test public void testCheck() throws IllegalAccessException { MappedFileQueue mfq = mock(MappedFileQueue.class); @@ -189,15 +190,23 @@ public void testCheckWithException() throws IllegalAccessException, IOException @Test public void testCompaction() throws DigestException, NoSuchAlgorithmException, IllegalAccessException { Iterator iterator = mock(Iterator.class); - SelectMappedBufferResult smb = mock(SelectMappedBufferResult.class); - when(iterator.hasNext()).thenAnswer((Answer)invocationOnMock -> queueOffset < 1024); - when(iterator.next()).thenAnswer((Answer)invocation -> + when(iterator.hasNext()).thenAnswer((Answer) invocationOnMock -> queueOffset < 1024); + when(iterator.next()).thenAnswer((Answer) invocation -> new SelectMappedBufferResult(0, buildMessage(), 0, null)); MappedFile mf = mock(MappedFile.class); List mappedFileList = Lists.newArrayList(mf); doReturn(iterator).when(mf).iterator(0); +// MessageStore messageStore = mock(DefaultMessageStore.class); +// MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); +// when(messageStoreConfig.getMappedFileSizeCommitLog()).thenReturn(1024 * 1024); +// when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig); +// CompactionLog clog = mock(CompactionLog.class); +// FieldUtils.writeField(clog, "defaultMessageStore", messageStore, true); +// doCallRealMethod().when(clog).getOffsetMap(any()); +// FieldUtils.writeField(clog, "positionMgr", positionMgr, true); + MessageStore messageStore = mock(DefaultMessageStore.class); CommitLog commitLog = mock(CommitLog.class); when(messageStore.getCommitLog()).thenReturn(commitLog); @@ -219,7 +228,7 @@ public void testCompaction() throws DigestException, NoSuchAlgorithmException, I List compactResult = Lists.newArrayList(); when(clog.asyncPutMessage(any(ByteBuffer.class), any(MessageExt.class), any(CompactionLog.TopicPartitionLog.class))) - .thenAnswer((Answer>)invocation -> { + .thenAnswer((Answer>) invocation -> { compactResult.add(invocation.getArgument(1)); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java index c9e290b5db5..0e7ede18737 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java @@ -58,10 +58,10 @@ public void setUp() throws IOException { defaultMessageStore = mock(DefaultMessageStore.class); CommitLog commitLog = mock(CommitLog.class); when(defaultMessageStore.getCommitLog()).thenReturn(commitLog); - when(commitLog.getCommitLogSize()).thenReturn(10 * 1024 * 1024); MessageStoreConfig config = mock(MessageStoreConfig.class); doReturn(config).when(defaultMessageStore).getMessageStoreConfig(); doReturn(true).when(config).isSearchBcqByCacheEnable(); + doReturn(10 * 1024 * 1024).when(config).getMappedFileSizeCommitLog(); } private void fillByteBuf(ByteBuffer bb, long phyOffset, long queueOffset) { From 2172daffcfaa0f542399d8ea70e75464e47043d5 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 17 Apr 2023 15:39:55 +0800 Subject: [PATCH 06/11] Pass the checkstyle --- .../org/apache/rocketmq/store/CommitLog.java | 8 ------- .../rocketmq/store/DefaultMessageStore.java | 3 ++- .../store/queue/BatchConsumeQueue.java | 16 +++++++------- .../rocketmq/store/MultiDispatchTest.java | 1 + .../rocketmq/store/kv/CompactionLogTest.java | 22 +++++-------------- 5 files changed, 16 insertions(+), 34 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 68a4487bb98..c3262ed58a7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -86,8 +86,6 @@ public class CommitLog implements Swappable { protected final MultiDispatch multiDispatch; - protected int commitLogSize; - public CommitLog(final DefaultMessageStore messageStore) { String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) { @@ -116,12 +114,6 @@ protected PutMessageThreadLocal initialValue() { this.flushDiskWatcher = new FlushDiskWatcher(); this.multiDispatch = new MultiDispatch(defaultMessageStore); - - this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); - } - - public int getCommitLogSize() { - return commitLogSize; } public void setFullStorePaths(Set fullStorePaths) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index eb6a33f0505..c62514a1659 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -2042,7 +2042,6 @@ public boolean isSyncMaster() { return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole(); } - @Override public ConcurrentMap getTopicConfigs() { return this.topicConfigTable; } @@ -2067,6 +2066,8 @@ public BrokerIdentity getBrokerIdentity() { } } + + class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index a4cdd0a97f8..3ee9b354a87 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -49,8 +49,9 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { * │ Store Unit │ * │ │ * - * BatchConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Store - * time(8) + msgBaseOffset(8) + batchSize(2) + compactedOffset(4) + reserved(4)= 46 Bytes + * BatchConsumeQueue's store unit. Size: + * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Store time(8) + + * msgBaseOffset(8) + batchSize(2) + compactedOffset(4) + reserved(4)= 46 Bytes */ public static final int CQ_STORE_UNIT_SIZE = 46; public static final int MSG_TAG_OFFSET_INDEX = 12; @@ -89,7 +90,7 @@ public BatchConsumeQueue( this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.messageStore = messageStore; - this.commitLogSize = messageStore.getCommitLog().getCommitLogSize(); + this.commitLogSize = messageStore.getMessageStoreConfig().getMaxMessageSize(); this.topic = topic; this.queueId = queueId; @@ -592,9 +593,8 @@ private static int ceil(int pos) { } /** - * Gets SelectMappedBufferResult by batch-message offset Node: the caller is responsible for the release of - * SelectMappedBufferResult - * + * Gets SelectMappedBufferResult by batch-message offset + * Node: the caller is responsible for the release of SelectMappedBufferResult * @param msgOffset * @return SelectMappedBufferResult */ @@ -829,8 +829,8 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, } /** - * Here is vulnerable, the min value of the bytebuffer must be smaller or equal then the given value. Otherwise it - * may get -1 + * Here is vulnerable, the min value of the bytebuffer must be smaller or equal then the given value. + * Otherwise it may get -1 */ protected int binarySearch(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift, long targetValue) { diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java index 79fcd2eafcb..c1ca64c4f74 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java index 6cb56509294..d57ecf94c29 100644 --- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; -import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MappedFileQueue; import org.apache.rocketmq.store.MessageExtEncoder; @@ -75,19 +74,18 @@ import static org.mockito.Mockito.when; public class CompactionLogTest { - CompactionLog clog; MessageStoreConfig storeConfig; MessageStore defaultMessageStore; CompactionPositionMgr positionMgr; String topic = "ctopic"; int queueId = 0; - int offsetMemorySize = 1024; int compactionFileSize = 10240; int compactionCqFileSize = 1024; private static MessageExtEncoder encoder = new MessageExtEncoder(new MessageStoreConfig()); private static SocketAddress storeHost; private static SocketAddress bornHost; + private static final int QUEUE_OFFSET_POS = 4 + 4 + 4 + 4 + 4; @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -140,8 +138,7 @@ public static ByteBuffer buildMessage() { msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); encoder.encode(msg); ByteBuffer preEncodeBuffer = encoder.getEncoderBuffer(); - int pos = 4 + 4 + 4 + 4 + 4; - preEncodeBuffer.putLong(pos, queueOffset); + preEncodeBuffer.putLong(QUEUE_OFFSET_POS, queueOffset); queueOffset++; return encoder.getEncoderBuffer(); } @@ -198,19 +195,10 @@ public void testCompaction() throws DigestException, NoSuchAlgorithmException, I List mappedFileList = Lists.newArrayList(mf); doReturn(iterator).when(mf).iterator(0); -// MessageStore messageStore = mock(DefaultMessageStore.class); -// MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); -// when(messageStoreConfig.getMappedFileSizeCommitLog()).thenReturn(1024 * 1024); -// when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig); -// CompactionLog clog = mock(CompactionLog.class); -// FieldUtils.writeField(clog, "defaultMessageStore", messageStore, true); -// doCallRealMethod().when(clog).getOffsetMap(any()); -// FieldUtils.writeField(clog, "positionMgr", positionMgr, true); - MessageStore messageStore = mock(DefaultMessageStore.class); - CommitLog commitLog = mock(CommitLog.class); - when(messageStore.getCommitLog()).thenReturn(commitLog); - when(commitLog.getCommitLogSize()).thenReturn(1024 * 1024); + MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); + when(messageStoreConfig.getMappedFileSizeCommitLog()).thenReturn(1024 * 1024); + when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig); CompactionLog clog = mock(CompactionLog.class); FieldUtils.writeField(clog, "defaultMessageStore", messageStore, true); doCallRealMethod().when(clog).getOffsetMap(any()); From 18f2dcc83771b746d82e630b0d5a2e5d2da9e00b Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 17 Apr 2023 16:38:06 +0800 Subject: [PATCH 07/11] Pass the BCQ UTs --- .../common/message/MessageExtBrokerInner.java | 10 ++++++++++ .../java/org/apache/rocketmq/store/CommitLog.java | 11 +++++++++-- .../org/apache/rocketmq/store/MessageExtEncoder.java | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 0c72ebb7bbd..17c1e8c439a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -27,6 +27,8 @@ public class MessageExtBrokerInner extends MessageExt { private ByteBuffer encodedBuff; + private volatile boolean isEncodeCompleted; + private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1; public ByteBuffer getEncodedBuff() { @@ -70,4 +72,12 @@ public MessageVersion getVersion() { public void setVersion(MessageVersion version) { this.version = version; } + + public boolean isEncodeCompleted() { + return isEncodeCompleted; + } + + public void setEncodeCompleted(boolean encodeCompleted) { + isEncodeCompleted = encodeCompleted; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index c3262ed58a7..236839624ff 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -842,8 +842,8 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } - result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); + switch (result.getStatus()) { case PUT_OK: onCommitLogAppend(msg, result, mappedFile); @@ -1627,6 +1627,11 @@ public AppendMessageResult handlePropertiesForSpecialMsg(ByteBuffer preEncodeBuf final MessageExtBrokerInner msgInner, boolean isInnerBatchMsg, boolean isMultiDispatchMsg) { if (!isInnerBatchMsg && !isMultiDispatchMsg) { + msgInner.setEncodeCompleted(true); + return null; + } + + if (msgInner.isEncodeCompleted()){ return null; } @@ -1662,7 +1667,6 @@ public AppendMessageResult handlePropertiesForSpecialMsg(ByteBuffer preEncodeBuf // Back filling total message length preEncodeBuffer.putInt(0, msgLen); - // Modify position to msgLenWithoutProperties preEncodeBuffer.position(msgLenWithoutProperties); @@ -1672,6 +1676,8 @@ public AppendMessageResult handlePropertiesForSpecialMsg(ByteBuffer preEncodeBuf preEncodeBuffer.put(propertiesData); } + msgInner.setEncodeCompleted(true); + return null; } @@ -1722,6 +1728,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */ msgIdSupplier, msgInner.getStoreTimestamp(), diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index 9fff8eb14c3..f4650af213e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -381,7 +381,7 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex putMessageContext.setBatchSize(batchSize); putMessageContext.setPhyPos(new long[batchSize]); - return this.byteBuf.nioBuffer(); + return this.byteBuf.nioBuffer(0, this.byteBuf.capacity()); } public ByteBuffer getEncoderBuffer() { From a01b961394299420b9023df3807aeefe2f9a8558 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 17 Apr 2023 16:49:07 +0800 Subject: [PATCH 08/11] Pass the checkstyle --- store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 236839624ff..a1b1fe8f2ec 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1631,7 +1631,7 @@ public AppendMessageResult handlePropertiesForSpecialMsg(ByteBuffer preEncodeBuf return null; } - if (msgInner.isEncodeCompleted()){ + if (msgInner.isEncodeCompleted()) { return null; } From 3509fbbccd0222f0ff52f46fb325c09d1429529c Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 17 Apr 2023 17:38:05 +0800 Subject: [PATCH 09/11] Pass BatchPutMessageTest.testPutMessages UT --- .../main/java/org/apache/rocketmq/store/MessageExtEncoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index f4650af213e..9fff8eb14c3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -381,7 +381,7 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex putMessageContext.setBatchSize(batchSize); putMessageContext.setPhyPos(new long[batchSize]); - return this.byteBuf.nioBuffer(0, this.byteBuf.capacity()); + return this.byteBuf.nioBuffer(); } public ByteBuffer getEncoderBuffer() { From 6c7484d97a901a9520f0180ccd4241f04a443af7 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Sat, 29 Apr 2023 21:38:24 +0800 Subject: [PATCH 10/11] Polish the code by review --- .../common/message/MessageExtBrokerInner.java | 6 +++--- .../org/apache/rocketmq/store/CommitLog.java | 2 +- .../apache/rocketmq/store/ConsumeQueue.java | 2 +- .../apache/rocketmq/store/MultiDispatch.java | 19 +++++++++++-------- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 17c1e8c439a..7cc8a4314b9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -27,7 +27,7 @@ public class MessageExtBrokerInner extends MessageExt { private ByteBuffer encodedBuff; - private volatile boolean isEncodeCompleted; + private volatile boolean encodeCompleted; private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1; @@ -74,10 +74,10 @@ public void setVersion(MessageVersion version) { } public boolean isEncodeCompleted() { - return isEncodeCompleted; + return encodeCompleted; } public void setEncodeCompleted(boolean encodeCompleted) { - isEncodeCompleted = encodeCompleted; + this.encodeCompleted = encodeCompleted; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index a1b1fe8f2ec..ff7dbfdf5b8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1996,7 +1996,7 @@ public FlushManager getFlushManager() { } public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { - return !StringUtils.isBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); } public static boolean isInnerBatchMsg(MessageExtBrokerInner msg) { diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 792fba89cf2..3836917c4e4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -729,7 +729,7 @@ private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) { } String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); - return !StringUtils.isBlank(multiDispatchQueue) && !StringUtils.isBlank(multiQueueOffset); + return StringUtils.isNotBlank(multiDispatchQueue) && StringUtils.isNotBlank(multiQueueOffset); } private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java index e7d646c12cd..b60c1a25c84 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java @@ -28,13 +28,14 @@ public class MultiDispatch { private final StringBuilder keyBuilder = new StringBuilder(); private final DefaultMessageStore messageStore; + private static final short VALUE_OF_EACH_INCREMENT = 1; public MultiDispatch(DefaultMessageStore messageStore) { this.messageStore = messageStore; } public String queueKey(String queueName, MessageExtBrokerInner msgInner) { - keyBuilder.setLength(0); + keyBuilder.delete(0, keyBuilder.length()); keyBuilder.append(queueName); keyBuilder.append('-'); int queueId = msgInner.getQueueId(); @@ -46,27 +47,29 @@ public String queueKey(String queueName, MessageExtBrokerInner msgInner) { } public void wrapMultiDispatch(final MessageExtBrokerInner msg) { + String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - Long[] queueOffsets = new Long[queues.length]; - for (int i = 0; i < queues.length; i++) { - String key = queueKey(queues[i], msg); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key); + long[] queueOffsets = new long[queues.length]; + if (messageStore.getMessageStoreConfig().isEnableLmq()) { + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msg); + if (MixAll.isLmq(key)) { + queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key); + } } } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); } - public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) { String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); for (String queue : queues) { String key = queueKey(queue, msgInner); if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - messageStore.getQueueStore().increaseLmqOffset(key, (short) 1); + messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT); } } } From 5c6d0bc10235ee050441aa91ef59bb124daee802 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Thu, 4 May 2023 14:21:53 +0800 Subject: [PATCH 11/11] Pass the UTs --- .../src/main/java/org/apache/rocketmq/store/MultiDispatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java index b60c1a25c84..c634cf819c5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java @@ -50,7 +50,7 @@ public void wrapMultiDispatch(final MessageExtBrokerInner msg) { String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - long[] queueOffsets = new long[queues.length]; + Long[] queueOffsets = new Long[queues.length]; if (messageStore.getMessageStoreConfig().isEnableLmq()) { for (int i = 0; i < queues.length; i++) { String key = queueKey(queues[i], msg);