Skip to content

Commit

Permalink
[ISSUE apache#7208] fix: when deleting topic also delete its pop retr…
Browse files Browse the repository at this point in the history
…y topic
  • Loading branch information
HScarb committed Aug 21, 2023
1 parent eb661db commit 2732865
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,10 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
}
}

final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
// delete pop retry topics first
for (String topicName : brokerController.getTopicConfigManager().getTopicConfigTable().keySet()) {
if (KeyBuilder.isPopRetryTopicOfTopic(topicName, topic)) {
if (groups.contains(KeyBuilder.getPopRetryTopicGroup(topicName, topic))) {
deleteTopicInBroker(topicName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -329,19 +328,29 @@ public void testDeleteTopicOnSlave() throws Exception {

@Test
public void testDeleteWithPopRetryTopic() throws Exception {
String topic = "TEST_DELETE_TOPIC";
String topic = "topicA";
String anotherTopic = "another_topicA";

topicConfigManager = mock(TopicConfigManager.class);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
topicConfigTable.put(topic, new TopicConfig());
topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid"), new TopicConfig());
topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig());

topicConfigTable.put(anotherTopic, new TopicConfig());
topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig());
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);

when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1"));
when(consumerOffsetManager.whichGroupByTopic(anotherTopic)).thenReturn(Sets.newHashSet("cid2"));

RemotingCommand request = buildDeleteTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
verify(topicConfigManager, times(2)).deleteTopicConfig(endsWith(topic));

verify(topicConfigManager).deleteTopicConfig(topic);
verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1"));
verify(messageStore, times(2)).deleteTopics(anySet());
}

Expand Down
16 changes: 12 additions & 4 deletions common/src/main/java/org/apache/rocketmq/common/KeyBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,19 @@ public static String parseNormalTopic(String topic, String cid) {
}
}

public static boolean isPopRetryTopicOfTopic(String retryTopic, String topic) {
if (!retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
return false;
/**
* Get consumer group name by pop retry topic and real topic
*
* @param retryTopic pop retry topic
* @param topic real topic
* @return consumer group name
*/
public static String getPopRetryTopicGroup(String retryTopic, String topic) {
if (!retryTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || !retryTopic.endsWith("_" + topic)) {
return null;
}
return retryTopic.endsWith("_" + topic);
String group = retryTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
return group.substring(0, group.length() - ("_" + topic).length());
}

public static String buildPollingKey(String topic, String cid, int queueId) {
Expand Down

0 comments on commit 2732865

Please sign in to comment.