diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index 113c91297e4..7ccdda7952d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -179,6 +179,43 @@ public boolean notifyMessageArriving(final String topic, final String cid, final return wakeUp(popRequest); } + /** + * Notify all consumer groups that subscribe to the topic + */ + public boolean notifyRetryMessageArriving(final String topic) { + boolean result = true; + if (!topicCidMap.containsKey(topic)) { + return result; + } + for (String cid : topicCidMap.get(topic).keySet()) { + for (Map.Entry> entry : pollingMap.entrySet()) { + if (!entry.getKey().startsWith(KeyBuilder.buildPollingKeyPrefix(topic, cid))) { + continue; + } + ConcurrentSkipListSet remotingCommands = entry.getValue(); + if (remotingCommands == null || remotingCommands.isEmpty()) { + continue; + } + PopRequest popRequest = remotingCommands.pollFirst(); + //clean inactive channel + while (popRequest != null && !popRequest.getChannel().isActive()) { + totalPollingNum.decrementAndGet(); + popRequest = remotingCommands.pollFirst(); + } + + if (popRequest == null) { + continue; + } + totalPollingNum.decrementAndGet(); + if (brokerController.getBrokerConfig().isEnablePopLog()) { + POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest); + } + result = result && wakeUp(popRequest); + } + } + return result; + } + public boolean wakeUp(final PopRequest request) { if (request == null || !request.complete()) { return false; @@ -219,15 +256,8 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re if (requestHeader.getPollTime() <= 0 || this.isStopped()) { return NOT_POLLING; } - ConcurrentHashMap cids = topicCidMap.get(requestHeader.getTopic()); - if (cids == null) { - cids = new ConcurrentHashMap<>(); - ConcurrentHashMap old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids); - if (old != null) { - cids = old; - } - } - cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE); + topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>()) + .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE); long expired = requestHeader.getBornTime() + requestHeader.getPollTime(); final PopRequest request = new PopRequest(remotingCommand, ctx, expired); boolean isFull = totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 53e17256140..15129e00e62 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -192,6 +192,10 @@ public boolean notifyMessageArriving(final String topic, final String cid, final return popLongPollingService.notifyMessageArriving(topic, cid, queueId); } + public boolean notifyRetryMessageArriving(final String topic) { + return popLongPollingService.notifyRetryMessageArriving(topic); + } + @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 93167db373a..8ecdae0af93 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -148,6 +148,8 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) popCheckPoint.getCId(), -1 ); + brokerController.getPopMessageProcessor().notifyRetryMessageArriving( + KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId())); brokerController.getNotificationProcessor().notifyMessageArriving( KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1); } diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java index e1532d9399b..f5d1ce80212 100644 --- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java @@ -31,6 +31,10 @@ public static String parseNormalTopic(String topic, String cid) { } } + public static String buildPollingKeyPrefix(String topic, String cid) { + return topic + PopAckConstants.SPLIT + cid; + } + public static String buildPollingKey(String topic, String cid, int queueId) { return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId; }