Skip to content

Commit

Permalink
[ISSUE apache#7208] fix: when deleting topic also delete its pop retr…
Browse files Browse the repository at this point in the history
…y topic
  • Loading branch information
HScarb committed Aug 21, 2023
1 parent 2b93e1e commit eb661db
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -542,16 +543,27 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
}
}

this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
// delete pop retry topics first
for (String topicName : brokerController.getTopicConfigManager().getTopicConfigTable().keySet()) {
if (KeyBuilder.isPopRetryTopicOfTopic(topicName, topic)) {
deleteTopicInBroker(topicName);
}
}
// delete topic
deleteTopicInBroker(requestHeader.getTopic());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

private void deleteTopicInBroker(String topic) {
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
this.brokerController.getTopicQueueMappingManager().delete(topic);
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
}

private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.broker.BrokerController;
Expand All @@ -41,6 +42,7 @@
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
Expand Down Expand Up @@ -90,8 +92,12 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -321,6 +327,24 @@ public void testDeleteTopicOnSlave() throws Exception {
"please execute it from master broker.");
}

@Test
public void testDeleteWithPopRetryTopic() throws Exception {
String topic = "TEST_DELETE_TOPIC";

topicConfigManager = mock(TopicConfigManager.class);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
topicConfigTable.put(topic, new TopicConfig());
topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid"), new TopicConfig());
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);

RemotingCommand request = buildDeleteTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
verify(topicConfigManager, times(2)).deleteTopicConfig(endsWith(topic));
verify(messageStore, times(2)).deleteTopics(anySet());
}

@Test
public void testGetAllTopicConfigInRocksdb() throws Exception {
if (notToBeExecuted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public static String parseNormalTopic(String topic, String cid) {
}
}

public static boolean isPopRetryTopicOfTopic(String retryTopic, String topic) {
if (!retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return false;
}
return retryTopic.endsWith("_" + topic);
}

public static String buildPollingKey(String topic, String cid, int queueId) {
return topic + PopAckConstants.SPLIT + cid + PopAckConstants.SPLIT + queueId;
}
Expand Down

0 comments on commit eb661db

Please sign in to comment.