Skip to content

Commit

Permalink
[ISSUE apache#6681] fix: fix pop retry message notification
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb committed Aug 9, 2023
1 parent 378b98b commit aa75ca0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,43 @@ public boolean notifyMessageArriving(final String topic, final String cid, final
return wakeUp(popRequest);
}

/**
* Notify all consumer groups that subscribe to the topic
*/
public boolean notifyRetryMessageArriving(final String topic) {
boolean result = true;
if (!topicCidMap.containsKey(topic)) {
return result;
}
for (String cid : topicCidMap.get(topic).keySet()) {
for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : pollingMap.entrySet()) {
if (!entry.getKey().startsWith(KeyBuilder.buildPollingKeyPrefix(topic, cid))) {
continue;
}
ConcurrentSkipListSet<PopRequest> remotingCommands = entry.getValue();
if (remotingCommands == null || remotingCommands.isEmpty()) {
continue;
}
PopRequest popRequest = remotingCommands.pollFirst();
//clean inactive channel
while (popRequest != null && !popRequest.getChannel().isActive()) {
totalPollingNum.decrementAndGet();
popRequest = remotingCommands.pollFirst();
}

if (popRequest == null) {
continue;
}
totalPollingNum.decrementAndGet();
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest);
}
result = result && wakeUp(popRequest);
}
}
return result;
}

public boolean wakeUp(final PopRequest request) {
if (request == null || !request.complete()) {
return false;
Expand Down Expand Up @@ -219,15 +256,8 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
return NOT_POLLING;
}
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
if (cids == null) {
cids = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
if (old != null) {
cids = old;
}
}
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
long expired = requestHeader.getBornTime() + requestHeader.getPollTime();
final PopRequest request = new PopRequest(remotingCommand, ctx, expired);
boolean isFull = totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public boolean notifyMessageArriving(final String topic, final String cid, final
return popLongPollingService.notifyMessageArriving(topic, cid, queueId);
}

public boolean notifyRetryMessageArriving(final String topic) {
return popLongPollingService.notifyRetryMessageArriving(topic);
}

@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
popCheckPoint.getCId(),
-1
);
brokerController.getPopMessageProcessor().notifyRetryMessageArriving(
KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()));
brokerController.getNotificationProcessor().notifyMessageArriving(
KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static String parseNormalTopic(String topic, String cid) {
}
}

public static String buildPollingKeyPrefix(String topic, String cid) {
return topic + PopAckConstants.SPLIT + cid;
}

public static String buildPollingKey(String topic, String cid, int queueId) {
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
}
Expand Down

0 comments on commit aa75ca0

Please sign in to comment.