Skip to content

Commit

Permalink
[ISSUE apache#6691] Support reentrant pop orderly for broker
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed May 4, 2023
1 parent d9a7315 commit 2549542
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void updateLockFreeTimestamp(String topic, String group, int queueId, Or
* @param msgQueueOffsetList the queue offsets of messages
* @param orderInfoBuilder will append order info to this builder
*/
public void update(boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime,
public void update(String attemptId, boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime,
List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
Expand All @@ -106,12 +106,12 @@ public void update(boolean isRetry, String topic, String group, int queueId, lon
OrderInfo orderInfo = qs.get(queueId);

if (orderInfo != null) {
OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList, orderInfo.offsetConsumedCount);
OrderInfo newOrderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
newOrderInfo.mergeOffsetConsumedCount(orderInfo.attemptId, orderInfo.offsetList, orderInfo.offsetConsumedCount);

orderInfo = newOrderInfo;
} else {
orderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
orderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0);
}
qs.put(queueId, orderInfo);

Expand Down Expand Up @@ -140,7 +140,7 @@ public void update(boolean isRetry, String topic, String group, int queueId, lon
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
}

public boolean checkBlock(String topic, String group, int queueId, long invisibleTime) {
public boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
Expand All @@ -156,7 +156,7 @@ public boolean checkBlock(String topic, String group, int queueId, long invisibl
if (orderInfo == null) {
return false;
}
return orderInfo.needBlock(invisibleTime);
return orderInfo.needBlock(attemptId, invisibleTime);
}

public void clearBlock(String topic, String group, int queueId) {
Expand Down Expand Up @@ -391,17 +391,20 @@ public static class OrderInfo {
*/
@JSONField(name = "cm")
private long commitOffsetBit;
@JSONField(name = "a")
private String attemptId;

public OrderInfo() {
}

public OrderInfo(long popTime, long invisibleTime, List<Long> queueOffsetList, long lastConsumeTimestamp,
public OrderInfo(String attemptId, long popTime, long invisibleTime, List<Long> queueOffsetList, long lastConsumeTimestamp,
long commitOffsetBit) {
this.popTime = popTime;
this.invisibleTime = invisibleTime;
this.offsetList = buildOffsetList(queueOffsetList);
this.lastConsumeTimestamp = lastConsumeTimestamp;
this.commitOffsetBit = commitOffsetBit;
this.attemptId = attemptId;
}

public List<Long> getOffsetList() {
Expand Down Expand Up @@ -460,6 +463,14 @@ public void setOffsetConsumedCount(Map<Long, Integer> offsetConsumedCount) {
this.offsetConsumedCount = offsetConsumedCount;
}

public String getAttemptId() {
return attemptId;
}

public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}

public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
List<Long> simple = new ArrayList<>();
if (queueOffsetList.size() == 1) {
Expand All @@ -475,10 +486,13 @@ public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
}

@JSONField(serialize = false, deserialize = false)
public boolean needBlock(long currentInvisibleTime) {
public boolean needBlock(String attemptId, long currentInvisibleTime) {
if (offsetList == null || offsetList.isEmpty()) {
return false;
}
if (this.attemptId != null && this.attemptId.equals(attemptId)) {
return false;
}
int num = offsetList.size();
int i = 0;
if (this.invisibleTime == null || this.invisibleTime <= 0) {
Expand Down Expand Up @@ -586,11 +600,15 @@ public boolean isNotAck(int offsetIndex) {
* @param prevOffsetConsumedCount the offset list of message
*/
@JSONField(serialize = false, deserialize = false)
public void mergeOffsetConsumedCount(List<Long> preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
public void mergeOffsetConsumedCount(String preAttemptId, List<Long> preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
Map<Long, Integer> offsetConsumedCount = new HashMap<>();
if (prevOffsetConsumedCount == null) {
prevOffsetConsumedCount = new HashMap<>();
}
if (preAttemptId != null && preAttemptId.equals(this.attemptId)) {
this.offsetConsumedCount = prevOffsetConsumedCount;
return;
}
Set<Long> preQueueOffsetSet = new HashSet<>();
for (int i = 0; i < preOffsetList.size(); i++) {
preQueueOffsetSet.add(getQueueOffset(preOffsetList, i));
Expand Down Expand Up @@ -619,6 +637,7 @@ public String toString() {
.add("offsetConsumedCount", offsetConsumedCount)
.add("lastConsumeTimestamp", lastConsumeTimestamp)
.add("commitOffsetBit", commitOffsetBit)
.add("attemptId", attemptId)
.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
}
if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) {
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
}

private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
if (this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
return false;
}
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
Expand All @@ -425,12 +425,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
// read all queue
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
} else {
int queueId = requestHeader.getQueueId();
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
// if not full , fetch retry again
Expand All @@ -440,7 +440,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
if (retryTopicConfig != null) {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
}
}
Expand Down Expand Up @@ -523,7 +523,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return null;
}

private CompletableFuture<Long> popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
private CompletableFuture<Long> popMsgFromQueue(String attemptId, boolean isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
Expand All @@ -545,7 +545,7 @@ private CompletableFuture<Long> popMsgFromQueue(boolean isRetry, GetMessageResul
future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
true, lockKey, true);
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic,
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
return future;
Expand Down Expand Up @@ -618,7 +618,7 @@ private CompletableFuture<Long> popMsgFromQueue(boolean isRetry, GetMessageResul
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes);

if (isOrder) {
this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic,
requestHeader.getConsumerGroup(),
queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
orderCountInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void before() {
@Test
public void testConsumeMessageThenNoAck() {
consumerOrderInfoManager.update(
null,
false,
TOPIC,
GROUP,
Expand All @@ -83,6 +84,7 @@ public void testConsumeMessageThenNoAck() {
@Test
public void testConsumeMessageThenAck() {
consumerOrderInfoManager.update(
null,
false,
TOPIC,
GROUP,
Expand All @@ -106,6 +108,7 @@ public void testConsumeMessageThenAck() {
@Test
public void testConsumeTheChangeInvisibleLonger() {
consumerOrderInfoManager.update(
null,
false,
TOPIC,
GROUP,
Expand All @@ -130,6 +133,7 @@ public void testConsumeTheChangeInvisibleLonger() {
@Test
public void testConsumeTheChangeInvisibleShorter() {
consumerOrderInfoManager.update(
null,
false,
TOPIC,
GROUP,
Expand All @@ -155,6 +159,7 @@ public void testConsumeTheChangeInvisibleShorter() {
public void testRecover() {
ConsumerOrderInfoManager savedConsumerOrderInfoManager = new ConsumerOrderInfoManager();
savedConsumerOrderInfoManager.update(
null,
false,
TOPIC,
GROUP,
Expand Down
Loading

0 comments on commit 2549542

Please sign in to comment.