diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index bbddcec2d7fb..60ce75d667fb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -51,6 +51,7 @@ import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -542,16 +543,27 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, } } - this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); - this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic()); - this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic()); - this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic()); - this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic())); + // delete pop retry topics first + for (String topicName : brokerController.getTopicConfigManager().getTopicConfigTable().keySet()) { + if (KeyBuilder.isPopRetryTopicOfTopic(topicName, topic)) { + deleteTopicInBroker(topicName); + } + } + // delete topic + deleteTopicInBroker(requestHeader.getTopic()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } + private void deleteTopicInBroker(String topic) { + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); + this.brokerController.getTopicQueueMappingManager().delete(topic); + this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic); + this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic); + this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic)); + } + private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index d33a217f76dd..170c7323416d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.LongAdder; import org.apache.rocketmq.broker.BrokerController; @@ -41,6 +42,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -90,8 +92,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.endsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -321,6 +327,24 @@ public void testDeleteTopicOnSlave() throws Exception { "please execute it from master broker."); } + @Test + public void testDeleteWithPopRetryTopic() throws Exception { + String topic = "TEST_DELETE_TOPIC"; + + topicConfigManager = mock(TopicConfigManager.class); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + final ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); + topicConfigTable.put(topic, new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid"), new TopicConfig()); + when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); + + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + verify(topicConfigManager, times(2)).deleteTopicConfig(endsWith(topic)); + verify(messageStore, times(2)).deleteTopics(anySet()); + } + @Test public void testGetAllTopicConfigInRocksdb() throws Exception { if (notToBeExecuted()) { diff --git a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java index e1532d9399be..c724b0d0571f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java +++ b/common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java @@ -31,6 +31,13 @@ public static String parseNormalTopic(String topic, String cid) { } } + public static boolean isPopRetryTopicOfTopic(String retryTopic, String topic) { + if (!retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + return false; + } + return retryTopic.endsWith("_" + topic); + } + public static String buildPollingKey(String topic, String cid, int queueId) { return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId; }