diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 60ce75d667fb..e7a325296fb7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -543,9 +543,10 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, } } + final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); // delete pop retry topics first for (String topicName : brokerController.getTopicConfigManager().getTopicConfigTable().keySet()) { - if (KeyBuilder.isPopRetryTopicOfTopic(topicName, topic)) { + if (groups.contains(KeyBuilder.getPopRetryTopicGroup(topicName, topic))) { deleteTopicInBroker(topicName); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 170c7323416d..8597e7406adc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -94,7 +94,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.endsWith; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -329,19 +328,28 @@ public void testDeleteTopicOnSlave() throws Exception { @Test public void testDeleteWithPopRetryTopic() throws Exception { - String topic = "TEST_DELETE_TOPIC"; + String topic = "topicA"; + String anotherTopic = "another_topicA"; topicConfigManager = mock(TopicConfigManager.class); when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); final ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); topicConfigTable.put(topic, new TopicConfig()); - topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid"), new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig()); + + topicConfigTable.put(anotherTopic, new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig()); when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1")); + RemotingCommand request = buildDeleteTopicRequest(topic); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); - verify(topicConfigManager, times(2)).deleteTopicConfig(endsWith(topic)); + + verify(topicConfigManager).deleteTopicConfig(topic); + verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1")); verify(messageStore, times(2)).deleteTopics(anySet()); } diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java index c724b0d0571f..0e477611f798 100644 --- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java @@ -31,11 +31,19 @@ public static String parseNormalTopic(String topic, String cid) { } } - public static boolean isPopRetryTopicOfTopic(String retryTopic, String topic) { - if (!retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - return false; + /** + * Get consumer group name by pop retry topic and real topic + * + * @param retryTopic pop retry topic + * @param topic real topic + * @return consumer group name + */ + public static String getPopRetryTopicGroup(String retryTopic, String topic) { + if (!retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || !retryTopic.endsWith("_" + topic)) { + return null; } - return retryTopic.endsWith("_" + topic); + String group = retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); + return group.substring(0, group.length() - ("_" + topic).length()); } public static String buildPollingKey(String topic, String cid, int queueId) {