Skip to content

Commit

Permalink
[fix][client] Fix pattern consumer create crash if a part of partitio…
Browse files Browse the repository at this point in the history
…ns of a topic have been deleted (apache#22854)
  • Loading branch information
poorbarcode authored Jul 9, 2024
1 parent 7924f9c commit 9626e7e
Show file tree
Hide file tree
Showing 17 changed files with 1,187 additions and 197 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.fail;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -95,4 +98,38 @@ private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
consumer.close();
}

@Test(timeOut = 30000)
public void testFailedSubscribe() throws Exception {
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
final String topicName3 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
final String subName = "s1";
admin.topics().createPartitionedTopic(topicName1, 2);
admin.topics().createPartitionedTopic(topicName2, 3);
admin.topics().createNonPartitionedTopic(topicName3);

// Register a exclusive consumer to makes the pattern consumer failed to subscribe.
Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName3).subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(subName).subscribe();

try {
PatternMultiTopicsConsumerImpl<String> consumer =
(PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topicsPattern("persistent://public/default/tp_test.*")
.subscriptionType(SubscriptionType.Failover)
.subscriptionName(subName)
.subscribe();
fail("Expected a consumer busy error.");
} catch (Exception ex) {
log.info("consumer busy", ex);
}

c1.close();
// Verify all internal consumer will be closed.
// If delete topic without "-f" work, it means the internal consumers were closed.
admin.topics().delete(topicName3);
admin.topics().deletePartitionedTopic(topicName2);
admin.topics().deletePartitionedTopic(topicName1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
Expand All @@ -35,7 +34,6 @@
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import io.netty.util.Timeout;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -53,6 +51,7 @@
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -1024,17 +1023,17 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {

// 6. remove producer 1,3; verify only consumer 2 left
// seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
List<String> topicNames = Lists.newArrayList(topicName2);
String tp2p0 = TopicName.get(topicName2).getPartition(0).toString();
String tp2p1 = TopicName.get(topicName2).getPartition(1).toString();
List<String> topicNames = Lists.newArrayList(tp2p0, tp2p1);
NamespaceService nss = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

// 7. call recheckTopics to unsubscribe topic 1,3, verify topics number: 2=6-1-3
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
Timeout recheckPatternTimeout = spy(consumer1.getRecheckPatternTimeout());
doReturn(false).when(recheckPatternTimeout).isCancelled();
consumer1.run(recheckPatternTimeout);
PatternConsumerUpdateQueue taskQueue = WhiteboxImpl.getInternalState(consumer, "updateTaskQueue");
taskQueue.appendRecheckOp();
Thread.sleep(100);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitions().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.util.Timeout;
import java.time.Duration;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
Expand Down Expand Up @@ -1321,7 +1322,6 @@ public void testPartitionsUpdatesForMultipleTopics() throws Exception {
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);

admin.topics().updatePartitionedTopic(topicName0, 5);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());

Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
Expand All @@ -1341,9 +1341,8 @@ public void testPartitionsUpdatesForMultipleTopics() throws Exception {
});

admin.topics().updatePartitionedTopic(topicName1, 5);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());

Awaitility.await().untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -1285,5 +1286,10 @@ public boolean hasBatchReceiveTimeout() {
return batchReceiveTimeout != null;
}

@VisibleForTesting
CompletableFuture<Consumer<T>> getSubscribeFuture() {
return subscribeFuture;
}

private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,14 @@ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicNam
InetSocketAddress resolveHost();

/**
* Returns all the topics name for a given namespace.
* Returns all the topics that matches {@param topicPattern} for a given namespace.
*
* Note: {@param topicPattern} it relate to the topic name(without the partition suffix). For example:
* - There is a partitioned topic "tp-a" with two partitions.
* - tp-a-partition-0
* - tp-a-partition-1
* - If {@param topicPattern} is "tp-a", the consumer will subscribe to the two partitions.
* - if {@param topicPattern} is "tp-a-partition-0", the consumer will subscribe nothing.
*
* @param namespace : namespace-name
* @return
Expand Down
Loading

0 comments on commit 9626e7e

Please sign in to comment.