Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6570] Optimizing the process of putting messages to improve performance #6571

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
13 changes: 11 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
needAssignOffset = false;
}
if (needAssignOffset) {
defaultMessageStore.assignOffset(msg, getMessageNum(msg));
defaultMessageStore.assignOffset(msg);
}

PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
Expand Down Expand Up @@ -892,6 +892,10 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
} finally {
putMessageLock.unlock();
}
// Increase queue offset when messages are successfully written
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
lizhanhui marked this conversation as resolved.
Show resolved Hide resolved
this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
}
} finally {
topicQueueLock.unlock(topicQueueKey);
}
Expand Down Expand Up @@ -990,7 +994,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc

topicQueueLock.lock(topicQueueKey);
try {
defaultMessageStore.assignOffset(messageExtBatch, (short) putMessageContext.getBatchSize());
defaultMessageStore.assignOffset(messageExtBatch);

putMessageLock.lock();
try {
Expand Down Expand Up @@ -1041,6 +1045,11 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
} finally {
putMessageLock.unlock();
}

// Increase queue offset when messages are successfully written
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
this.defaultMessageStore.increaseOffset(messageExtBatch, (short) putMessageContext.getBatchSize());
}
} finally {
topicQueueLock.unlock(topicQueueKey);
}
Expand Down
47 changes: 34 additions & 13 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
import org.apache.rocketmq.store.queue.QueueOffsetAssigner;
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;

public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
Expand All @@ -54,8 +54,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
* │ Store Unit │
* │ │
* </pre>
* ConsumeQueue's store unit. Size:
* CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
* ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
*/
public static final int CQ_STORE_UNIT_SIZE = 20;
public static final int MSG_TAG_OFFSET_INDEX = 12;
Expand Down Expand Up @@ -785,10 +784,9 @@ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String
}

@Override
public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg,
short messageNum) {
public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) {
String topicQueueKey = getTopic() + "-" + getQueueId();
long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
msg.setQueueOffset(queueOffset);
// For LMQ
if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
Expand All @@ -803,14 +801,37 @@ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageEx
for (int i = 0; i < queues.length; i++) {
String key = queueKey(queues[i], msg);
if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
queueOffsets[i] = queueOffsetAssigner.assignLmqOffset(key, (short) 1);
queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
}
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
StringUtils.join(queueOffsets, MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
removeWaitStorePropertyString(msg);
}

@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg,
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);

// For LMQ
if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return;
}
String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
if (StringUtils.isBlank(multiDispatchQueue)) {
return;
}
String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
for (int i = 0; i < queues.length; i++) {
String key = queueKey(queues[i], msg);
if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
queueOffsetOperator.increaseLmqOffset(key, (short) 1);
}
}
}

public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
StringBuilder keyBuilder = new StringBuilder();
keyBuilder.append(queueName);
Expand Down Expand Up @@ -968,7 +989,7 @@ private class ConsumeQueueIterator implements ReferredIterator<CqUnit> {
private int relativePos = 0;

public ConsumeQueueIterator(SelectMappedBufferResult sbr) {
this.sbr = sbr;
this.sbr = sbr;
if (sbr != null && sbr.getByteBuffer() != null) {
relativePos = sbr.getByteBuffer().position();
}
Expand All @@ -988,11 +1009,11 @@ public CqUnit next() {
if (!hasNext()) {
return null;
}
long queueOffset = (sbr.getStartOffset() + sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
long queueOffset = (sbr.getStartOffset() + sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
CqUnit cqUnit = new CqUnit(queueOffset,
sbr.getByteBuffer().getLong(),
sbr.getByteBuffer().getInt(),
sbr.getByteBuffer().getLong());
sbr.getByteBuffer().getLong(),
sbr.getByteBuffer().getInt(),
sbr.getByteBuffer().getLong());

if (isExtAddr(cqUnit.getTagsCode())) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
Expand All @@ -1003,7 +1024,7 @@ public CqUnit next() {
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content! addr={}, offsetPy={}, sizePy={}, topic={}",
cqUnit.getTagsCode(), cqUnit.getPos(), cqUnit.getPos(), getTopic());
cqUnit.getTagsCode(), cqUnit.getPos(), cqUnit.getPos(), getTopic());
}
}
return cqUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2039,11 +2039,21 @@ public boolean isSyncMaster() {
}

@Override
public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
public void assignOffset(MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
RongtongJin marked this conversation as resolved.
Show resolved Hide resolved
this.consumeQueueStore.assignQueueOffset(msg, messageNum);
this.consumeQueueStore.assignQueueOffset(msg);
}
}


@Override
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
this.consumeQueueStore.increaseQueueOffset(msg, messageNum);
}
}

Expand Down
11 changes: 9 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -728,13 +728,20 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma
boolean isSyncMaster();

/**
* Assign an queue offset and increase it. If there is a race condition, you need to lock/unlock this method
* Assign a message to queue offset. If there is a race condition, you need to lock/unlock this method
* yourself.
*
* @param msg message
*/
void assignOffset(MessageExtBrokerInner msg);

/**
* Increase queue offset in memory table. If there is a race condition, you need to lock/unlock this method
*
* @param msg message
* @param messageNum message num
*/
void assignOffset(MessageExtBrokerInner msg, short messageNum);
void increaseOffset(MessageExtBrokerInner msg, short messageNum);

/**
* get all topic config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
topicQueueLock.lock(topicQueueKey);
try {
defaultMessageStore.assignOffset(msg, getMessageNum(msg));
defaultMessageStore.assignOffset(msg);

encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
Expand Down Expand Up @@ -475,6 +475,8 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
}

defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
} finally {
topicQueueLock.unlock(topicQueueKey);
}
Expand Down Expand Up @@ -556,7 +558,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
int batchNum = encodeResult.batchData.size();
topicQueueLock.lock(encodeResult.queueOffsetKey);
try {
defaultMessageStore.assignOffset(messageExtBatch, (short) batchNum);
defaultMessageStore.assignOffset(messageExtBatch);

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
Expand Down Expand Up @@ -616,6 +618,9 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
}

defaultMessageStore.increaseOffset(messageExtBatch, (short) batchNum);

} finally {
topicQueueLock.unlock(encodeResult.queueOffsetKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,13 @@ public boolean isSyncMaster() {
}

@Override
public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
next.assignOffset(msg, messageNum);
public void assignOffset(MessageExtBrokerInner msg) {
next.assignOffset(msg);
}

@Override
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
next.increaseOffset(msg, messageNum);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
* │ Store Unit │
* │ │
* </pre>
* BatchConsumeQueue's store unit. Size:
* CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Store time(8) +
* msgBaseOffset(8) + batchSize(2) + compactedOffset(4) + reserved(4)= 46 Bytes
* BatchConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Store
* time(8) + msgBaseOffset(8) + batchSize(2) + compactedOffset(4) + reserved(4)= 46 Bytes
*/
public static final int CQ_STORE_UNIT_SIZE = 46;
public static final int MSG_TAG_OFFSET_INDEX = 12;
Expand Down Expand Up @@ -353,7 +352,7 @@ public boolean isFirstFileExist() {
@Override
public void truncateDirtyLogicFiles(long phyOffset) {

long oldMinOffset = minOffsetInQueue;
long oldMinOffset = minOffsetInQueue;
long oldMaxOffset = maxOffsetInQueue;

int logicFileSize = this.mappedFileSize;
Expand Down Expand Up @@ -515,10 +514,10 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
}

@Override
public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg) {
String topicQueueKey = getTopic() + "-" + getQueueId();

long queueOffset = queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum);
long queueOffset = queueOffsetOperator.getBatchQueueOffset(topicQueueKey);

if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(queueOffset));
Expand All @@ -527,7 +526,15 @@ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageEx
msg.setQueueOffset(queueOffset);
}

public boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime,
@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, MessageExtBrokerInner msg,
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseBatchQueueOffset(topicQueueKey, messageNum);
}

public boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long storeTime,
final long msgBaseOffset, final short batchSize) {

if (offset <= this.maxMsgPhyOffsetInCommitLog) {
Expand Down Expand Up @@ -611,8 +618,9 @@ private static int ceil(int pos) {
}

/**
* Gets SelectMappedBufferResult by batch-message offset
* Node: the caller is responsible for the release of SelectMappedBufferResult
* Gets SelectMappedBufferResult by batch-message offset Node: the caller is responsible for the release of
* SelectMappedBufferResult
*
* @param msgOffset
* @return SelectMappedBufferResult
*/
Expand Down Expand Up @@ -696,6 +704,7 @@ public MappedFile searchOffsetFromFiles(long msgOffset) {

/**
* Find the message whose timestamp is the smallest, greater than or equal to the given time.
*
* @param timestamp
* @return
*/
Expand Down Expand Up @@ -794,8 +803,8 @@ private MappedFile searchTimeFromFiles(long timestamp) {
}
} else {
//The max timestamp of this file is smaller than the given timestamp, so double check the previous file
if (i + 1 <= mappedFileNum - 1) {
mappedFile = mappedFileQueue.getMappedFiles().get(i + 1);
if (i + 1 <= mappedFileNum - 1) {
mappedFile = mappedFileQueue.getMappedFiles().get(i + 1);
targetBcq = mappedFile;
break;
} else {
Expand All @@ -809,10 +818,11 @@ private MappedFile searchTimeFromFiles(long timestamp) {
}

/**
* Find the offset of which the value is equal or larger than the given targetValue.
* If there are many values equal to the target, then find the earliest one.
* Find the offset of which the value is equal or larger than the given targetValue. If there are many values equal
lizhanhui marked this conversation as resolved.
Show resolved Hide resolved
* to the target, then find the earliest one.
*/
public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift,
public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right, final int unitSize,
final int unitShift,
long targetValue) {
int mid = -1;
while (left <= right) {
Expand All @@ -830,7 +840,7 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right,
if (tmpValue >= targetValue) {
return mid;
} else {
left = mid + unitSize;
left = mid + unitSize;
}
} else {
//mid is actually in the mid
Expand All @@ -845,8 +855,8 @@ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int right,
}

/**
* Here is vulnerable, the min value of the bytebuffer must be smaller or equal then the given value.
* Otherwise it may get -1
* Here is vulnerable, the min value of the bytebuffer must be smaller or equal then the given value. Otherwise it
* may get -1
*/
protected int binarySearch(ByteBuffer byteBuffer, int left, int right, final int unitSize, final int unitShift,
long targetValue) {
Expand Down Expand Up @@ -989,6 +999,7 @@ public long rollNextFile(long nextBeginOffset) {

/**
* Batch msg offset (deep logic offset)
*
* @return max deep offset
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,17 @@ public interface ConsumeQueueInterface {
* Assign queue offset.
* @param queueOffsetAssigner the delegated queue offset assigner
* @param msg message itself
*/
void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg);


/**
* increase queue offset.
* @param queueOffsetAssigner the delegated queue offset assigner
* @param msg message itself
* @param messageNum message number
*/
void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);

/**
* Estimate number of records matching given filter.
Expand Down
Loading