From c9cfc0409b804c3f7a542eca776a2c500619da0d Mon Sep 17 00:00:00 2001 From: cnScarb Date: Thu, 31 Aug 2023 15:50:10 +0800 Subject: [PATCH] [ISSUE #7208] fix: when deleting topic also delete its pop retry topic (#7209) --- .../processor/AdminBrokerProcessor.java | 24 ++++++++++--- .../processor/AdminBrokerProcessorTest.java | 36 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a6ce03dc29f..587bd6bb367 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -51,6 +51,7 @@ import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -534,16 +535,29 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, } } - this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); - this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic()); - this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic()); - this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic()); - this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic())); + final Set groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic); + // delete pop retry topics first + for (String group : groups) { + final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group); + if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) { + deleteTopicInBroker(popRetryTopic); + } + } + // delete topic + deleteTopicInBroker(topic); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } + private void deleteTopicInBroker(String topic) { + this.brokerController.getTopicConfigManager().deleteTopicConfig(topic); + this.brokerController.getTopicQueueMappingManager().delete(topic); + this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic); + this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic); + this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic)); + } + private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index d33a217f76d..9d17011b616 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.LongAdder; import org.apache.rocketmq.broker.BrokerController; @@ -41,6 +42,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -90,8 +92,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -321,6 +326,37 @@ public void testDeleteTopicOnSlave() throws Exception { "please execute it from master broker."); } + @Test + public void testDeleteWithPopRetryTopic() throws Exception { + String topic = "topicA"; + String anotherTopic = "another_topicA"; + + topicConfigManager = mock(TopicConfigManager.class); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + final ConcurrentHashMap topicConfigTable = new ConcurrentHashMap<>(); + topicConfigTable.put(topic, new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig()); + + topicConfigTable.put(anotherTopic, new TopicConfig()); + topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig()); + when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); + when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> { + final String selectTopic = invocation.getArgument(0); + return topicConfigManager.getTopicConfigTable().get(selectTopic); + }); + + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1")); + + RemotingCommand request = buildDeleteTopicRequest(topic); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + verify(topicConfigManager).deleteTopicConfig(topic); + verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1")); + verify(messageStore, times(2)).deleteTopics(anySet()); + } + @Test public void testGetAllTopicConfigInRocksdb() throws Exception { if (notToBeExecuted()) {