diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index 0c72ebb7bbd..7cc8a4314b9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -27,6 +27,8 @@ public class MessageExtBrokerInner extends MessageExt { private ByteBuffer encodedBuff; + private volatile boolean encodeCompleted; + private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1; public ByteBuffer getEncodedBuff() { @@ -70,4 +72,12 @@ public MessageVersion getVersion() { public void setVersion(MessageVersion version) { this.version = version; } + + public boolean isEncodeCompleted() { + return encodeCompleted; + } + + public void setEncodeCompleted(boolean encodeCompleted) { + this.encodeCompleted = encodeCompleted; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 75b4042dc32..ff7dbfdf5b8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -31,12 +31,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -51,6 +53,7 @@ import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.logfile.MappedFile; @@ -77,13 +80,11 @@ public class CommitLog implements Swappable { protected final PutMessageLock putMessageLock; - protected final TopicQueueLock topicQueueLock; - private volatile Set fullStorePaths = Collections.emptySet(); private final FlushDiskWatcher flushDiskWatcher; - protected int commitLogSize; + protected final MultiDispatch multiDispatch; public CommitLog(final DefaultMessageStore messageStore) { String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); @@ -101,20 +102,18 @@ public CommitLog(final DefaultMessageStore messageStore) { this.flushManager = new DefaultFlushManager(); - this.appendMessageCallback = new DefaultAppendMessageCallback(); + this.appendMessageCallback = new DefaultAppendMessageCallback(messageStore.getMessageStoreConfig()); putMessageThreadLocal = new ThreadLocal() { @Override protected PutMessageThreadLocal initialValue() { - return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + return new PutMessageThreadLocal(messageStore.getMessageStoreConfig()); } }; this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); this.flushDiskWatcher = new FlushDiskWatcher(); - this.topicQueueLock = new TopicQueueLock(); - - this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + this.multiDispatch = new MultiDispatch(defaultMessageStore); } public void setFullStorePaths(Set fullStorePaths) { @@ -744,7 +743,7 @@ public void updateMaxMessageSize(PutMessageThreadLocal putMessageThreadLocal) { // dynamically adjust maxMessageSize, but not support DLedger mode temporarily int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(); if (newMaxMessageSize >= 10 && - putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) { + putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) { putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize); } } @@ -780,11 +779,6 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke msg.setStoreHostAddressV6Flag(); } - PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); - updateMaxMessageSize(putMessageThreadLocal); - String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg); - long elapsedTimeInLock = 0; - MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); long currOffset; @@ -815,89 +809,72 @@ public CompletableFuture asyncPutMessage(final MessageExtBroke } } - topicQueueLock.lock(topicQueueKey); - try { + PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); + updateMaxMessageSize(putMessageThreadLocal); - boolean needAssignOffset = true; - if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() - && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { - needAssignOffset = false; - } - if (needAssignOffset) { - defaultMessageStore.assignOffset(msg); - } + PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); + msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); - PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); - if (encodeResult != null) { - return CompletableFuture.completedFuture(encodeResult); - } - msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); - PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); + if (encodeResult != null) { + return CompletableFuture.completedFuture(encodeResult); + } - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; + String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg); + PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); - // Here settings are stored timestamp, in order to ensure an orderly - // global - if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { - msg.setStoreTimestamp(beginLockTimestamp); - } + long elapsedTimeInLock = 0; + MappedFile unlockMappedFile = null; - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); - } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + try { + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; - result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); - switch (result.getStatus()) { - case PUT_OK: - onCommitLogAppend(msg, result, mappedFile); - break; - case END_OF_FILE: - onCommitLogAppend(msg, result, mappedFile); - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); - } - result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); - if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { - onCommitLogAppend(msg, result, mappedFile); - } - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); - case UNKNOWN_ERROR: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - default: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - } + // Here settings are stored timestamp, in order to ensure an orderly + if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { + msg.setStoreTimestamp(beginLockTimestamp); + } - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } - // Increase queue offset when messages are successfully written - if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { - this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg)); + if (null == mappedFile) { + log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); + + switch (result.getStatus()) { + case PUT_OK: + onCommitLogAppend(msg, result, mappedFile); + break; + case END_OF_FILE: + onCommitLogAppend(msg, result, mappedFile); + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); + } + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); + if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { + onCommitLogAppend(msg, result, mappedFile); + } + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + default: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + } finally { - topicQueueLock.unlock(topicQueueKey); + beginTimeInLock = 0; + putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { @@ -956,7 +933,6 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas(); boolean needHandleHA = needHandleHA(messageExtBatch); - if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) { if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); @@ -992,66 +968,50 @@ public CompletableFuture asyncPutMessages(final MessageExtBatc PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); - topicQueueLock.lock(topicQueueKey); + putMessageLock.lock(); try { - defaultMessageStore.assignOffset(messageExtBatch); - - putMessageLock.lock(); - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; - - // Here settings are stored timestamp, in order to ensure an orderly - // global - messageExtBatch.setStoreTimestamp(beginLockTimestamp); + long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); + this.beginTimeInLock = beginLockTimestamp; - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); - } - - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); - switch (result.getStatus()) { - case PUT_OK: - break; - case END_OF_FILE: - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); - } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); - case UNKNOWN_ERROR: - default: - beginTimeInLock = 0; - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); - } + // Here settings are stored timestamp, in order to ensure an orderly + // global + messageExtBatch.setStoreTimestamp(beginLockTimestamp); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); + if (null == mappedFile || mappedFile.isFull()) { + mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise + } + if (null == mappedFile) { + log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); } - // Increase queue offset when messages are successfully written - if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) { - this.defaultMessageStore.increaseOffset(messageExtBatch, (short) putMessageContext.getBatchSize()); + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); + switch (result.getStatus()) { + case PUT_OK: + break; + case END_OF_FILE: + unlockMappedFile = mappedFile; + // Create a new file, re-write the message + mappedFile = this.mappedFileQueue.getLastMappedFile(0); + if (null == mappedFile) { + // XXX: warn and notify me + log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); + } + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); + break; + case MESSAGE_SIZE_EXCEEDED: + case PROPERTIES_SIZE_EXCEEDED: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); + case UNKNOWN_ERROR: + default: + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; } finally { - topicQueueLock.unlock(topicQueueKey); + beginTimeInLock = 0; + putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { @@ -1656,14 +1616,94 @@ class DefaultAppendMessageCallback implements AppendMessageCallback { // Store the message content private final ByteBuffer msgStoreItemMemory; - DefaultAppendMessageCallback() { + private final MessageStoreConfig messageStoreConfig; + + DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) { this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); + this.messageStoreConfig = messageStoreConfig; + } + + public AppendMessageResult handlePropertiesForSpecialMsg(ByteBuffer preEncodeBuffer, + final MessageExtBrokerInner msgInner, boolean isInnerBatchMsg, boolean isMultiDispatchMsg) { + + if (!isInnerBatchMsg && !isMultiDispatchMsg) { + msgInner.setEncodeCompleted(true); + return null; + } + + if (msgInner.isEncodeCompleted()) { + return null; + } + + if (isInnerBatchMsg) { + MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_INNER_BASE, String.valueOf(msgInner.getQueueOffset())); + } + + if (isMultiDispatchMsg) { + multiDispatch.wrapMultiDispatch(msgInner); + } + + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); + } + + int msgLenWithoutProperties = preEncodeBuffer.getInt(0); + + int msgLen = msgLenWithoutProperties + 2 + propertiesLength; + + // Exceeds the maximum message + if (msgLen > this.messageStoreConfig.getMaxMessageSize()) { + log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize()); + return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); + } + + // Back filling total message length + preEncodeBuffer.putInt(0, msgLen); + // Modify position to msgLenWithoutProperties + preEncodeBuffer.position(msgLenWithoutProperties); + + preEncodeBuffer.putShort((short) propertiesLength); + + if (propertiesLength > 0) { + preEncodeBuffer.put(propertiesData); + } + + msgInner.setEncodeCompleted(true); + + return null; } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
+ final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); + long queueOffset = 0L; + // Transaction messages that require special handling + queueOffset = (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) ? + CommitLog.this.defaultMessageStore.getQueueStore().getQueueOffset(msgInner.getTopic(), msgInner.getQueueId()) : 0L; + + ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); + msgInner.setQueueOffset(queueOffset); + boolean isInnerBatchMsg = isInnerBatchMsg(msgInner); + boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); + AppendMessageResult appendMessageResult = handlePropertiesForSpecialMsg(preEncodeBuffer, msgInner, isInnerBatchMsg, isMultiDispatchMsg); + if (appendMessageResult != null) { + return appendMessageResult; + } + + final int msgLen = preEncodeBuffer.getInt(0); + preEncodeBuffer.position(0); + preEncodeBuffer.limit(msgLen); + // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); @@ -1677,29 +1717,6 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer return UtilAll.bytes2string(msgIdBuffer.array()); }; - // Record ConsumeQueue information - Long queueOffset = msgInner.getQueueOffset(); - - // this msg maybe a inner-batch msg. - short messageNum = getMessageNum(msgInner); - - // Transaction messages that require special handling - final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); - switch (tranType) { - // Prepared and Rollback message is not consumed, will not enter the consume queue - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - queueOffset = 0L; - break; - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - default: - break; - } - - ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); - final int msgLen = preEncodeBuffer.getInt(0); - // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); @@ -1711,6 +1728,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */ msgIdSupplier, msgInner.getStoreTimestamp(), @@ -1735,6 +1753,19 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer.put(preEncodeBuffer); CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS"); msgInner.setEncodedBuff(null); + + // this msg maybe an inner-batch msg. + short messageNum = getMessageNum(msgInner); + + // transaction messages that require special handling + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { + CommitLog.this.defaultMessageStore.getQueueStore().increaseQueueOffset(msgInner.getTopic(), msgInner.getQueueId(), messageNum); + // for lmq + if (isMultiDispatchMsg) { + CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); + } + } + return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum); } @@ -1745,7 +1776,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue information - Long queueOffset = messageExtBatch.getQueueOffset(); + long queueOffset = CommitLog.this.defaultMessageStore.getQueueStore().getQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId()); long beginQueueOffset = queueOffset; int totalMsgLen = 0; int msgNum = 0; @@ -1824,12 +1855,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); result.setMsgNum(msgNum); + CommitLog.this.defaultMessageStore.getQueueStore().increaseQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), (short) msgNum); + return result; } } - class DefaultFlushManager implements FlushManager { private final FlushCommitLogService flushCommitLogService; @@ -1937,10 +1969,6 @@ public void shutdown() { } - public int getCommitLogSize() { - return commitLogSize; - } - public MappedFileQueue getMappedFileQueue() { return mappedFileQueue; } @@ -1966,4 +1994,12 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) { public FlushManager getFlushManager() { return flushManager; } + + public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { + return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + } + + public static boolean isInnerBatchMsg(MessageExtBrokerInner msg) { + return MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 78d083e2cb0..3836917c4e4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -26,10 +26,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; @@ -54,7 +51,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { * │ Store Unit │ * │ │ * - * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes + * ConsumeQueue's store unit. Size: + * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes */ public static final int CQ_STORE_UNIT_SIZE = 20; public static final int MSG_TAG_OFFSET_INDEX = 12; @@ -731,10 +729,7 @@ private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) { } String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); - if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { - return false; - } - return true; + return StringUtils.isNotBlank(multiDispatchQueue) && StringUtils.isNotBlank(multiQueueOffset); } private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) { @@ -757,7 +752,6 @@ private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) { doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId); } - return; } private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset, @@ -783,88 +777,6 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String } } - @Override - public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey); - msg.setQueueOffset(queueOffset); - - - // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), - // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. - if (!isNeedHandleMultiDispatch(msg)) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - Long[] queueOffsets = new Long[queues.length]; - for (int i = 0; i < queues.length; i++) { - String key = queueKey(queues[i], msg); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - queueOffsets[i] = queueOffsetOperator.getLmqOffset(key); - } - } - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, - StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); - removeWaitStorePropertyString(msg); - } - - @Override - public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, - short messageNum) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum); - - // Handling the multi dispatch message. In the context of a light message queue (as defined in RIP-28), - // light message queues are constructed based on message properties, which requires special handling of queue offset of the light message queue. - if (!isNeedHandleMultiDispatch(msg)) { - return; - } - String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - if (StringUtils.isBlank(multiDispatchQueue)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - for (int i = 0; i < queues.length; i++) { - String key = queueKey(queues[i], msg); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { - queueOffsetOperator.increaseLmqOffset(key, (short) 1); - } - } - } - - public boolean isNeedHandleMultiDispatch(MessageExtBrokerInner msg) { - return messageStore.getMessageStoreConfig().isEnableMultiDispatch() && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); - } - - public String queueKey(String queueName, MessageExtBrokerInner msgInner) { - StringBuilder keyBuilder = new StringBuilder(); - keyBuilder.append(queueName); - keyBuilder.append('-'); - int queueId = msgInner.getQueueId(); - if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { - queueId = 0; - } - keyBuilder.append(queueId); - return keyBuilder.toString(); - } - - private void removeWaitStorePropertyString(MessageExtBrokerInner msgInner) { - if (msgInner.getProperties().containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { - // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. - // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. - String waitStoreMsgOKValue = msgInner.getProperties().remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later - msgInner.getProperties().put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); - } else { - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - } - } - private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { @@ -1135,6 +1047,16 @@ public long getMaxOffsetInQueue() { return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; } + @Override + public long getQueueOffset(QueueOffsetOperator queueOffsetOperator) { + return queueOffsetOperator.getQueueOffset(topic + "-" + queueId); + } + + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, short messageNum) { + queueOffsetOperator.increaseQueueOffset(topic + "-" + queueId, messageNum); + } + @Override public void checkSelf() { mappedFileQueue.checkSelf(); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index e1bdc6e7111..43553899fd4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -2052,25 +2052,6 @@ public boolean isSyncMaster() { return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole(); } - @Override - public void assignOffset(MessageExtBrokerInner msg) { - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - this.consumeQueueStore.assignQueueOffset(msg); - } - } - - - @Override - public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - this.consumeQueueStore.increaseQueueOffset(msg, messageNum); - } - } - public ConcurrentMap getTopicConfigs() { return this.topicConfigTable; } @@ -2095,6 +2076,8 @@ public BrokerIdentity getBrokerIdentity() { } } + + class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index ee609a337bc..9fff8eb14c3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import org.apache.rocketmq.store.config.MessageStoreConfig; public class MessageExtEncoder { protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -38,13 +39,17 @@ public class MessageExtEncoder { private int maxMessageBodySize; // The maximum length of the full message. private int maxMessageSize; - public MessageExtEncoder(final int maxMessageBodySize) { + + private MessageStoreConfig messageStoreConfig; + + public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) { ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + this.messageStoreConfig = messageStoreConfig; + this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize(); //Reserve 64kb for encoding buffer outside body int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ? maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE; byteBuf = alloc.directBuffer(maxMessageSize); - this.maxMessageBodySize = maxMessageBodySize; this.maxMessageSize = maxMessageSize; } @@ -73,8 +78,103 @@ public static int calMsgLength(MessageVersion messageVersion, + 2 + (Math.max(propertiesLength, 0)); //propertiesLength } + public static int calMsgLengthNoProperties(MessageVersion messageVersion, + int sysFlag, int bodyLength, int topicLength) { + + int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; + int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20; + + return 4 //TOTALSIZE + + 4 //MAGICCODE + + 4 //BODYCRC + + 4 //QUEUEID + + 4 //FLAG + + 8 //QUEUEOFFSET + + 8 //PHYSICALOFFSET + + 4 //SYSFLAG + + 8 //BORNTIMESTAMP + + bornhostLength //BORNHOST + + 8 //STORETIMESTAMP + + storehostAddressLength //STOREHOSTADDRESS + + 4 //RECONSUMETIMES + + 8 //Prepared Transaction Offset + + 4 + (Math.max(bodyLength, 0)) //BODY + + messageVersion.getTopicLengthSize() + topicLength; //TOPIC + } + + public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) { + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + // Exceeds the maximum message body + if (bodyLength > this.maxMessageBodySize) { + CommitLog.log.warn("message body size exceeded, msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageBodySize); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + final int msgLenNoProperties = calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), bodyLength, topicLength); + + // 1 TOTALSIZE + this.byteBuf.writeInt(msgLenNoProperties); + // 2 MAGICCODE + this.byteBuf.writeInt(msgInner.getVersion().getMagicCode()); + // 3 BODYCRC + this.byteBuf.writeInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.byteBuf.writeInt(msgInner.getQueueId()); + // 5 FLAG + this.byteBuf.writeInt(msgInner.getFlag()); + // 6 QUEUEOFFSET, need update later + this.byteBuf.writeLong(0); + // 7 PHYSICALOFFSET, need update later + this.byteBuf.writeLong(0); + // 8 SYSFLAG + this.byteBuf.writeInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.byteBuf.writeLong(msgInner.getBornTimestamp()); + + // 10 BORNHOST + ByteBuffer bornHostBytes = msgInner.getBornHostBytes(); + this.byteBuf.writeBytes(bornHostBytes.array()); + + // 11 STORETIMESTAMP + this.byteBuf.writeLong(msgInner.getStoreTimestamp()); + + // 12 STOREHOSTADDRESS + ByteBuffer storeHostBytes = msgInner.getStoreHostBytes(); + this.byteBuf.writeBytes(storeHostBytes.array()); + + // 13 RECONSUMETIMES + this.byteBuf.writeInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.byteBuf.writeInt(bodyLength); + if (bodyLength > 0) + this.byteBuf.writeBytes(msgInner.getBody()); + + // 16 TOPIC + if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) { + this.byteBuf.writeShort((short) topicLength); + } else { + this.byteBuf.writeByte((byte) topicLength); + } + this.byteBuf.writeBytes(topicData); + + return null; + } + public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.clear(); + + if (CommitLog.isInnerBatchMsg(msgInner) || messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { + return encodeWithoutProperties(msgInner); + } + /** * Serialize message */ @@ -102,8 +202,6 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } - final long queueOffset = msgInner.getQueueOffset(); - // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength @@ -121,8 +219,8 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.writeInt(msgInner.getQueueId()); // 5 FLAG this.byteBuf.writeInt(msgInner.getFlag()); - // 6 QUEUEOFFSET - this.byteBuf.writeLong(queueOffset); + // 6 QUEUEOFFSET, need update later + this.byteBuf.writeLong(0); // 7 PHYSICALOFFSET, need update later this.byteBuf.writeLong(0); // 8 SYSFLAG @@ -160,8 +258,9 @@ public PutMessageResult encode(MessageExtBrokerInner msgInner) { // 17 PROPERTIES this.byteBuf.writeShort((short) propertiesLength); - if (propertiesLength > 0) + if (propertiesLength > 0) { this.byteBuf.writeBytes(propertiesData); + } return null; } @@ -230,9 +329,9 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex this.byteBuf.writeInt(messageExtBatch.getQueueId()); // 5 FLAG this.byteBuf.writeInt(flag); - // 6 QUEUEOFFSET + // 6 QUEUEOFFSET, place-holder, need update later this.byteBuf.writeLong(0); - // 7 PHYSICALOFFSET + // 7 PHYSICALOFFSET, place-holder, need update later this.byteBuf.writeLong(0); // 8 SYSFLAG this.byteBuf.writeInt(messageExtBatch.getSysFlag()); @@ -286,7 +385,7 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContex } public ByteBuffer getEncoderBuffer() { - return this.byteBuf.nioBuffer(); + return this.byteBuf.nioBuffer(0, this.byteBuf.capacity()); } public int getMaxMessageBodySize() { @@ -304,8 +403,9 @@ public void updateEncoderBufferCapacity(int newMaxMessageBodySize) { static class PutMessageThreadLocal { private final MessageExtEncoder encoder; private final StringBuilder keyBuilder; - PutMessageThreadLocal(int size) { - encoder = new MessageExtEncoder(size); + + PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) { + encoder = new MessageExtEncoder(messageStoreConfig); keyBuilder = new StringBuilder(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 3db0c18f7f8..7ca9627df1e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -724,22 +724,6 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma */ boolean isSyncMaster(); - /** - * Assign a message to queue offset. If there is a race condition, you need to lock/unlock this method - * yourself. - * - * @param msg message - */ - void assignOffset(MessageExtBrokerInner msg); - - /** - * Increase queue offset in memory table. If there is a race condition, you need to lock/unlock this method - * - * @param msg message - * @param messageNum message num - */ - void increaseOffset(MessageExtBrokerInner msg, short messageNum); - /** * Get master broker message store in process in broker container * diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java new file mode 100644 index 00000000000..c634cf819c5 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; + +/** + * MultiDispatch for lmq, not-thread-safe + */ +public class MultiDispatch { + private final StringBuilder keyBuilder = new StringBuilder(); + private final DefaultMessageStore messageStore; + private static final short VALUE_OF_EACH_INCREMENT = 1; + + public MultiDispatch(DefaultMessageStore messageStore) { + this.messageStore = messageStore; + } + + public String queueKey(String queueName, MessageExtBrokerInner msgInner) { + keyBuilder.delete(0, keyBuilder.length()); + keyBuilder.append(queueName); + keyBuilder.append('-'); + int queueId = msgInner.getQueueId(); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + keyBuilder.append(queueId); + return keyBuilder.toString(); + } + + public void wrapMultiDispatch(final MessageExtBrokerInner msg) { + + String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + Long[] queueOffsets = new Long[queues.length]; + if (messageStore.getMessageStoreConfig().isEnableLmq()) { + for (int i = 0; i < queues.length; i++) { + String key = queueKey(queues[i], msg); + if (MixAll.isLmq(key)) { + queueOffsets[i] = messageStore.getQueueStore().getLmqQueueOffset(key); + } + } + } + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, + StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER)); + } + + public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) { + String multiDispatchQueue = msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + for (String queue : queues) { + String key = queueKey(queue, msgInner); + if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) { + messageStore.getQueueStore().increaseLmqOffset(key, VALUE_OF_EACH_INCREMENT); + } + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index ec5e86d704d..000303afcfa 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -264,7 +264,7 @@ public SelectMappedBufferResult getData(final long offset, final boolean returnF return null; } - + @Override public boolean getData(final long offset, final int size, final ByteBuffer byteBuffer) { if (offset < dividedCommitlogOffset) { @@ -432,53 +432,55 @@ public CompletableFuture asyncPutMessage(MessageExtBrokerInner AppendFuture dledgerFuture; EncodeResult encodeResult; - String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId(); - topicQueueLock.lock(topicQueueKey); + encodeResult = this.messageSerializer.serialize(msg); + if (encodeResult.status != AppendMessageStatus.PUT_OK) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); + } + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + long elapsedTimeInLock; + long queueOffset; try { - defaultMessageStore.assignOffset(msg); - - encodeResult = this.messageSerializer.serialize(msg); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status))); - } - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - long elapsedTimeInLock; - long queueOffset; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(msg, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); - AppendEntryRequest request = new AppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.getData()); - dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (dledgerFuture.getPos() == -1) { - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } - long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; - - int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - } catch (Exception e) { - log.error("Put message error", e); - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + queueOffset = getQueueOffsetByKey(msg, tranType); + encodeResult.setQueueOffsetKey(queueOffset, false); + AppendEntryRequest request = new AppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBody(encodeResult.getData()); + dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (dledgerFuture.getPos() == -1) { + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } + long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); - } + int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - defaultMessageStore.increaseOffset(msg, getMessageNum(msg)); + String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + switch (tranType) { + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + // The next update ConsumeQueue information + defaultMessageStore.getQueueStore().increaseQueueOffset(msg.getTopic(), msg.getQueueId(), (short) 1); + break; + default: + break; + } + } catch (Exception e) { + log.error("Put message error", e); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { - topicQueueLock.unlock(topicQueueKey); + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); } return dledgerFuture.thenApply(appendEntryResponse -> { @@ -556,73 +558,64 @@ public CompletableFuture asyncPutMessages(MessageExtBatch mess } int batchNum = encodeResult.batchData.size(); - topicQueueLock.lock(encodeResult.queueOffsetKey); + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config + msgIdBuilder.setLength(0); + long elapsedTimeInLock; + long queueOffset; + int msgNum = 0; try { - defaultMessageStore.assignOffset(messageExtBatch); - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - msgIdBuilder.setLength(0); - long elapsedTimeInLock; - long queueOffset; - int msgNum = 0; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(messageExtBatch, tranType); - encodeResult.setQueueOffsetKey(queueOffset, true); - BatchAppendEntryRequest request = new BatchAppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBatchMsgs(encodeResult.batchData); - AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (appendFuture.getPos() == -1) { - log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } - dledgerFuture = (BatchAppendFuture) appendFuture; - - long wroteOffset = 0; - - int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - boolean isFirstOffset = true; - long firstWroteOffset = 0; - for (long pos : dledgerFuture.getPositions()) { - wroteOffset = pos + DLedgerEntry.BODY_OFFSET; - if (isFirstOffset) { - firstWroteOffset = wroteOffset; - isFirstOffset = false; - } - String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); - if (msgIdBuilder.length() > 0) { - msgIdBuilder.append(',').append(msgId); - } else { - msgIdBuilder.append(msgId); - } - msgNum++; - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, - msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - appendResult.setMsgNum(msgNum); - } catch (Exception e) { - log.error("Put message error", e); - return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); + beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); + queueOffset = getQueueOffsetByKey(messageExtBatch, tranType); + encodeResult.setQueueOffsetKey(queueOffset, true); + BatchAppendEntryRequest request = new BatchAppendEntryRequest(); + request.setGroup(dLedgerConfig.getGroup()); + request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); + request.setBatchMsgs(encodeResult.batchData); + AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); + if (appendFuture.getPos() == -1) { + log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } + dledgerFuture = (BatchAppendFuture) appendFuture; - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", - elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); - } + long wroteOffset = 0; - defaultMessageStore.increaseOffset(messageExtBatch, (short) batchNum); + int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); + boolean isFirstOffset = true; + long firstWroteOffset = 0; + for (long pos : dledgerFuture.getPositions()) { + wroteOffset = pos + DLedgerEntry.BODY_OFFSET; + if (isFirstOffset) { + firstWroteOffset = wroteOffset; + isFirstOffset = false; + } + String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); + if (msgIdBuilder.length() > 0) { + msgIdBuilder.append(',').append(msgId); + } else { + msgIdBuilder.append(msgId); + } + msgNum++; + } + + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; + appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, + msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); + appendResult.setMsgNum(msgNum); + defaultMessageStore.getQueueStore().increaseQueueOffset(messageExtBatch.getTopic(), messageExtBatch.getQueueId(), (short) batchNum); + } catch (Exception e) { + log.error("Put message error", e); + return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { - topicQueueLock.unlock(encodeResult.queueOffsetKey); + beginTimeInDledgerLock = 0; + putMessageLock.unlock(); + } + + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", + elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); } return dledgerFuture.thenApply(appendEntryResponse -> { @@ -708,12 +701,12 @@ public long lockTimeMills() { } private long getQueueOffsetByKey(MessageExtBrokerInner msg, int tranType) { - Long queueOffset = msg.getQueueOffset(); + Long queueOffset = defaultMessageStore.getQueueStore().getQueueOffset(msg.getTopic(), msg.getQueueId()); // Transaction messages that require special handling switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the - // consumer queuec + // consumer queue case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 25e947512ff..b2b8979e2b4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -587,16 +587,6 @@ public boolean isSyncMaster() { return next.isSyncMaster(); } - @Override - public void assignOffset(MessageExtBrokerInner msg) { - next.assignOffset(msg); - } - - @Override - public void increaseOffset(MessageExtBrokerInner msg, short messageNum) { - next.increaseOffset(msg, messageNum); - } - @Override public List getPutMessageHookList() { return next.getPutMessageHookList(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 8fec1bf7b01..503aa0861e6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -26,11 +26,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DispatchRequest; @@ -95,7 +90,7 @@ public BatchConsumeQueue( this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.messageStore = messageStore; - this.commitLogSize = messageStore.getCommitLog().getCommitLogSize(); + this.commitLogSize = messageStore.getMessageStoreConfig().getMaxMessageSize(); this.topic = topic; this.queueId = queueId; @@ -422,8 +417,7 @@ public void truncateDirtyLogicFiles(long phyOffset) { @Override public boolean flush(final int flushLeastPages) { - boolean result = this.mappedFileQueue.flush(flushLeastPages); - return result; + return this.mappedFileQueue.flush(flushLeastPages); } @Override @@ -514,26 +508,6 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { this.messageStore.getRunningFlags().makeLogicsQueueError(); } - @Override - public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - - long queueOffset = queueOffsetOperator.getBatchQueueOffset(topicQueueKey); - - if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(queueOffset)); - msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); - } - msg.setQueueOffset(queueOffset); - } - - @Override - public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg, - short messageNum) { - String topicQueueKey = getTopic() + "-" + getQueueId(); - queueOffsetOperator.increaseBatchQueueOffset(topicQueueKey, messageNum); - } - public boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime, final long msgBaseOffset, final short batchSize) { @@ -819,7 +793,7 @@ private MappedFile searchTimeFromFiles(long timestamp) { /** * Find the offset of which the value is equal or larger than the given targetValue. - * If there are many values equal to the target, then find the earliest one. + * If there are multiple values equal to the target, return the earliest one. */ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift, @@ -999,7 +973,6 @@ public long rollNextFile(long nextBeginOffset) { /** * Batch msg offset (deep logic offset) - * * @return max deep offset */ @Override @@ -1012,6 +985,17 @@ public long getMinOffsetInQueue() { return minOffsetInQueue; } + @Override + public long getQueueOffset(QueueOffsetOperator queueOffsetOperator) { + return queueOffsetOperator.getBatchQueueOffset(topic + "-" + queueId); + } + + @Override + public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, + short messageNum) { + queueOffsetOperator.increaseBatchQueueOffset(topic + "-" + queueId, messageNum); + } + @Override public void checkSelf() { mappedFileQueue.checkSelf(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java index d7213fa37a1..f41750ef94b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.store.queue; import org.apache.rocketmq.common.attribute.CQType; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.MessageFilter; @@ -140,20 +139,20 @@ public interface ConsumeQueueInterface extends FileQueueLifeCycle { void putMessagePositionInfoWrapper(DispatchRequest request); /** - * Assign queue offset. - * @param queueOffsetAssigner the delegated queue offset assigner - * @param msg message itself + * Get queue offset. + * @param queueOffsetOperator the delegated queue offset operator + * + * @return queue offset */ - void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg); + long getQueueOffset(QueueOffsetOperator queueOffsetOperator); /** * Increase queue offset. - * @param queueOffsetAssigner the delegated queue offset assigner - * @param msg message itself + * @param queueOffsetOperator the delegated queue offset operator * @param messageNum message number */ - void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum); + void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, short messageNum); /** * Estimate number of records matching given filter. diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 8d38503b371..f2d7a8b3224 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -40,7 +40,6 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; @@ -160,7 +159,7 @@ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String top this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(), this.messageStore); } else { - throw new RuntimeException(format("queue type %s is not supported.", cqType.toString())); + throw new RuntimeException(format("queue type %s is not supported.", cqType)); } } @@ -174,14 +173,14 @@ private void queueTypeShouldBe(String topic, CQType cqTypeExpected) { } } - private ExecutorService buildExecutorService(BlockingQueue blockingQueue, String threadNamePrefix) { + private ExecutorService buildExecutorService(BlockingQueue blockingQueue) { return new ThreadPoolExecutor( this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, blockingQueue, - new ThreadFactoryImpl(threadNamePrefix)); + new ThreadFactoryImpl("RecoverConsumeQueueThread_")); } public void recover(ConsumeQueueInterface consumeQueue) { @@ -204,7 +203,7 @@ public boolean recoverConcurrently() { } final CountDownLatch countDownLatch = new CountDownLatch(count); BlockingQueue recoverQueue = new LinkedBlockingQueue<>(); - final ExecutorService executor = buildExecutorService(recoverQueue, "RecoverConsumeQueueThread_"); + final ExecutorService executor = buildExecutorService(recoverQueue); List> result = new ArrayList<>(count); try { for (ConcurrentMap maps : this.consumeQueueTable.values()) { @@ -378,14 +377,22 @@ public void setBatchTopicQueueTable(ConcurrentMap batchTopicQueueT this.queueOffsetOperator.setBatchTopicQueueTable(batchTopicQueueTable); } - public void assignQueueOffset(MessageExtBrokerInner msg) { - ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId()); - consumeQueue.assignQueueOffset(this.queueOffsetOperator, msg); + public long getQueueOffset(String topic, int queueId) { + ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(topic, queueId); + return consumeQueue.getQueueOffset(queueOffsetOperator); } - public void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum) { - ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId()); - consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum); + public long getLmqQueueOffset(String topicQueueKey) { + return queueOffsetOperator.getLmqOffset(topicQueueKey); + } + + public void increaseQueueOffset(String topic, int queueId, short messageNum) { + ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(topic, queueId); + consumeQueue.increaseQueueOffset(queueOffsetOperator, messageNum); + } + + public void increaseLmqOffset(String queueKey, short messageNum) { + queueOffsetOperator.increaseLmqOffset(queueKey, messageNum); } public void updateQueueOffset(String topic, int queueId, long offset) { diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index 87bfe85da23..3748571496d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -44,7 +44,7 @@ public class AppendCallbackTest { AppendMessageCallback callback; - MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024); + MessageExtEncoder batchEncoder; @Before public void init() throws Exception { @@ -53,12 +53,14 @@ public void init() throws Exception { messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4); messageStoreConfig.setMaxHashSlotNum(100); messageStoreConfig.setMaxIndexNum(100 * 10); + messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024); messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore"); messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + File.separator + "unitteststore" + File.separator + "commitlog"); //too much reference DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new ConcurrentHashMap<>()); CommitLog commitLog = new CommitLog(messageStore); - callback = commitLog.new DefaultAppendMessageCallback(); + callback = commitLog.new DefaultAppendMessageCallback(messageStoreConfig); + batchEncoder = new MessageExtEncoder(messageStoreConfig); } @After diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 083aabc48b3..6e880169a31 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -119,8 +119,6 @@ public void testIsSpaceFullMultiCommitLogStorePath() throws Exception { assertEquals(3, paths.length); initMessageStore(config, diskSpaceCleanForciblyRatio); - - // build and put 55 messages, exactly one message per CommitLog file. buildAndPutMessagesToMessageStore(msgCount); MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog(); diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java index 85626a332e9..c1ca64c4f74 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java @@ -19,8 +19,8 @@ import java.io.File; import java.net.InetSocketAddress; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; @@ -32,14 +32,13 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MultiDispatchTest { - private ConsumeQueue consumeQueue; + private MultiDispatch multiDispatch; private DefaultMessageStore messageStore; @@ -59,8 +58,7 @@ public void init() throws Exception { BrokerConfig brokerConfig = new BrokerConfig(); //too much reference messageStore = new DefaultMessageStore(messageStoreConfig, null, null, brokerConfig, new ConcurrentHashMap<>()); - consumeQueue = new ConsumeQueue("xxx", 0, - getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore); + multiDispatch = new MultiDispatch(messageStore); } @After @@ -72,14 +70,14 @@ public void destroy() { public void queueKey() { MessageExtBrokerInner messageExtBrokerInner = mock(MessageExtBrokerInner.class); when(messageExtBrokerInner.getQueueId()).thenReturn(2); - String ret = consumeQueue.queueKey("%LMQ%lmq123", messageExtBrokerInner); + String ret = multiDispatch.queueKey("%LMQ%lmq123", messageExtBrokerInner); assertEquals(ret, "%LMQ%lmq123-0"); } @Test public void wrapMultiDispatch() { MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue(); - messageStore.assignOffset(messageExtBrokerInner); + multiDispatch.wrapMultiDispatch(messageExtBrokerInner); assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET), "0,0"); } @@ -88,7 +86,7 @@ private MessageExtBrokerInner buildMessageMultiQueue() { msg.setTopic("test"); msg.setTags("TAG1"); msg.setKeys("Hello"); - msg.setBody("aaa".getBytes(Charset.forName("UTF-8"))); + msg.setBody("aaa".getBytes(StandardCharsets.UTF_8)); msg.setKeys(String.valueOf(System.currentTimeMillis())); msg.setQueueId(0); msg.setSysFlag(0); diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java index df3c31c6edf..d57ecf94c29 100644 --- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; -import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MappedFileQueue; import org.apache.rocketmq.store.MessageExtEncoder; @@ -75,20 +74,18 @@ import static org.mockito.Mockito.when; public class CompactionLogTest { - CompactionLog clog; MessageStoreConfig storeConfig; MessageStore defaultMessageStore; CompactionPositionMgr positionMgr; String topic = "ctopic"; int queueId = 0; - int offsetMemorySize = 1024; int compactionFileSize = 10240; int compactionCqFileSize = 1024; - - private static MessageExtEncoder encoder = new MessageExtEncoder(1024); + private static MessageExtEncoder encoder = new MessageExtEncoder(new MessageStoreConfig()); private static SocketAddress storeHost; private static SocketAddress bornHost; + private static final int QUEUE_OFFSET_POS = 4 + 4 + 4 + 4 + 4; @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -123,6 +120,7 @@ public void setUp() throws IOException { static int queueOffset = 0; static int keyCount = 10; + public static ByteBuffer buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic("ctopic"); @@ -134,17 +132,17 @@ public static ByteBuffer buildMessage() { msg.setBornTimestamp(System.currentTimeMillis()); msg.setStoreHost(storeHost); msg.setBornHost(bornHost); - msg.setQueueOffset(queueOffset); - queueOffset++; for (int i = 1; i < 3; i++) { msg.putUserProperty(String.valueOf(i), "xxx" + i); } msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); encoder.encode(msg); + ByteBuffer preEncodeBuffer = encoder.getEncoderBuffer(); + preEncodeBuffer.putLong(QUEUE_OFFSET_POS, queueOffset); + queueOffset++; return encoder.getEncoderBuffer(); } - @Test public void testCheck() throws IllegalAccessException { MappedFileQueue mfq = mock(MappedFileQueue.class); @@ -189,9 +187,8 @@ public void testCheckWithException() throws IllegalAccessException, IOException @Test public void testCompaction() throws DigestException, NoSuchAlgorithmException, IllegalAccessException { Iterator iterator = mock(Iterator.class); - SelectMappedBufferResult smb = mock(SelectMappedBufferResult.class); - when(iterator.hasNext()).thenAnswer((Answer)invocationOnMock -> queueOffset < 1024); - when(iterator.next()).thenAnswer((Answer)invocation -> + when(iterator.hasNext()).thenAnswer((Answer) invocationOnMock -> queueOffset < 1024); + when(iterator.next()).thenAnswer((Answer) invocation -> new SelectMappedBufferResult(0, buildMessage(), 0, null)); MappedFile mf = mock(MappedFile.class); @@ -199,9 +196,9 @@ public void testCompaction() throws DigestException, NoSuchAlgorithmException, I doReturn(iterator).when(mf).iterator(0); MessageStore messageStore = mock(DefaultMessageStore.class); - CommitLog commitLog = mock(CommitLog.class); - when(messageStore.getCommitLog()).thenReturn(commitLog); - when(commitLog.getCommitLogSize()).thenReturn(1024 * 1024); + MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class); + when(messageStoreConfig.getMappedFileSizeCommitLog()).thenReturn(1024 * 1024); + when(messageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig); CompactionLog clog = mock(CompactionLog.class); FieldUtils.writeField(clog, "defaultMessageStore", messageStore, true); doCallRealMethod().when(clog).getOffsetMap(any()); @@ -219,7 +216,7 @@ public void testCompaction() throws DigestException, NoSuchAlgorithmException, I List compactResult = Lists.newArrayList(); when(clog.asyncPutMessage(any(ByteBuffer.class), any(MessageExt.class), any(CompactionLog.TopicPartitionLog.class))) - .thenAnswer((Answer>)invocation -> { + .thenAnswer((Answer>) invocation -> { compactResult.add(invocation.getArgument(1)); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java index c9e290b5db5..0e7ede18737 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/SparseConsumeQueueTest.java @@ -58,10 +58,10 @@ public void setUp() throws IOException { defaultMessageStore = mock(DefaultMessageStore.class); CommitLog commitLog = mock(CommitLog.class); when(defaultMessageStore.getCommitLog()).thenReturn(commitLog); - when(commitLog.getCommitLogSize()).thenReturn(10 * 1024 * 1024); MessageStoreConfig config = mock(MessageStoreConfig.class); doReturn(config).when(defaultMessageStore).getMessageStoreConfig(); doReturn(true).when(config).isSearchBcqByCacheEnable(); + doReturn(10 * 1024 * 1024).when(config).getMappedFileSizeCommitLog(); } private void fillByteBuf(ByteBuffer bb, long phyOffset, long queueOffset) {