From 9626e7e090e9481e12441a47cf7e89f209aadd03 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 Jul 2024 08:53:54 +0800 Subject: [PATCH] [fix][client] Fix pattern consumer create crash if a part of partitions of a topic have been deleted (#22854) --- .../pulsar/broker/service/TopicGCTest.java | 291 +++++++++++++++++- .../api/PatternMultiTopicsConsumerTest.java | 37 +++ .../impl/PatternTopicsConsumerImplTest.java | 13 +- .../client/impl/TopicsConsumerImplTest.java | 5 +- .../pulsar/client/impl/ConsumerBase.java | 6 + .../pulsar/client/impl/LookupService.java | 9 +- .../client/impl/MultiTopicsConsumerImpl.java | 188 ++++++----- .../impl/PatternConsumerUpdateQueue.java | 254 +++++++++++++++ .../impl/PatternMultiTopicsConsumerImpl.java | 249 ++++++++++----- .../pulsar/client/impl/PulsarClientImpl.java | 15 +- .../pulsar/client/impl/TopicListWatcher.java | 16 +- .../impl/PatternConsumerUpdateQueueTest.java | 247 +++++++++++++++ .../PatternMultiTopicsConsumerImplTest.java | 6 +- .../client/impl/TopicListWatcherTest.java | 12 +- .../pulsar/common/lookup/GetTopicsResult.java | 24 ++ .../pulsar/common/naming/TopicName.java | 9 + .../apache/pulsar/common/util/FutureUtil.java | 3 + 17 files changed, 1187 insertions(+), 197 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java index 8fdf0723ea8d1..172bd3702e129 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java @@ -18,24 +18,34 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -58,14 +68,38 @@ protected void cleanup() throws Exception { @EqualsAndHashCode.Include protected void doInitConf() throws Exception { super.doInitConf(); - this.conf.setBrokerDeleteInactiveTopicsEnabled(true); - this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); - this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10); + conf.setBrokerDeleteInactiveTopicsEnabled(true); + conf.setBrokerDeleteInactiveTopicsMode( + InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10); } - @Test - public void testCreateConsumerAfterOnePartDeleted() throws Exception { + private enum SubscribeTopicType { + MULTI_PARTITIONED_TOPIC, + REGEX_TOPIC; + } + + @DataProvider(name = "subscribeTopicTypes") + public Object[][] subTopicTypes() { + return new Object[][]{ + {SubscribeTopicType.MULTI_PARTITIONED_TOPIC}, + {SubscribeTopicType.REGEX_TOPIC} + }; + } + + private void setSubscribeTopic(ConsumerBuilder consumerBuilder, SubscribeTopicType subscribeTopicType, + String topicName, String topicPattern) { + if (subscribeTopicType.equals(SubscribeTopicType.MULTI_PARTITIONED_TOPIC)) { + consumerBuilder.topic(topicName); + } else { + consumerBuilder.topicsPattern(Pattern.compile(topicPattern)); + } + } + + @Test(dataProvider = "subscribeTopicTypes", timeOut = 300 * 1000) + public void testRecreateConsumerAfterOnePartGc(SubscribeTopicType subscribeTopicType) throws Exception { final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topicPattern = "persistent://public/default/tp.*"; final String partition0 = topic + "-partition-0"; final String partition1 = topic + "-partition-1"; final String subscription = "s1"; @@ -77,8 +111,12 @@ public void testCreateConsumerAfterOnePartDeleted() throws Exception { .enableBatching(false).create(); Producer producer1 = pulsarClient.newProducer(Schema.STRING).topic(partition1) .enableBatching(false).create(); - org.apache.pulsar.client.api.Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topic) - .subscriptionName(subscription).isAckReceiptEnabled(true).subscribe(); + ConsumerBuilder consumerBuilder1 = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(subscription) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared); + setSubscribeTopic(consumerBuilder1, subscribeTopicType, topic, topicPattern); + Consumer consumer1 = consumerBuilder1.subscribe(); // Make consume all messages for one topic, do not consume any messages for another one. producer0.send("1"); @@ -97,18 +135,247 @@ public void testCreateConsumerAfterOnePartDeleted() throws Exception { }); // Verify that the consumer subscribed with partitioned topic can be created successful. - Consumer consumerAllPartition = pulsarClient.newConsumer(Schema.STRING).topic(topic) - .subscriptionName(subscription).isAckReceiptEnabled(true).subscribe(); - Message msg = consumerAllPartition.receive(2, TimeUnit.SECONDS); + ConsumerBuilder consumerBuilder2 = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(subscription) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared); + setSubscribeTopic(consumerBuilder2, subscribeTopicType, topic, topicPattern); + Consumer consumer2 = consumerBuilder2.subscribe(); + Message msg = consumer2.receive(2, TimeUnit.SECONDS); + String receivedMsgValue = msg.getValue(); + log.info("received msg: {}", receivedMsgValue); + consumer2.acknowledge(msg); + + // cleanup. + consumer2.close(); + producer0.close(); + producer1.close(); + admin.topics().deletePartitionedTopic(topic); + } + + @Test(dataProvider = "subscribeTopicTypes", timeOut = 300 * 1000) + public void testAppendCreateConsumerAfterOnePartGc(SubscribeTopicType subscribeTopicType) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topicPattern = "persistent://public/default/tp.*"; + final String partition0 = topic + "-partition-0"; + final String partition1 = topic + "-partition-1"; + final String subscription = "s1"; + admin.topics().createPartitionedTopic(topic, 2); + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + + // create consumers and producers. + Producer producer0 = pulsarClient.newProducer(Schema.STRING).topic(partition0) + .enableBatching(false).create(); + Producer producer1 = pulsarClient.newProducer(Schema.STRING).topic(partition1) + .enableBatching(false).create(); + ConsumerBuilder consumerBuilder1 = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(subscription) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared); + setSubscribeTopic(consumerBuilder1, subscribeTopicType, topic, topicPattern); + Consumer consumer1 = consumerBuilder1.subscribe(); + + // Make consume all messages for one topic, do not consume any messages for another one. + producer0.send("partition-0-1"); + producer1.send("partition-1-1"); + producer1.send("partition-1-2"); + producer1.send("partition-1-4"); + admin.topics().skipAllMessages(partition0, subscription); + + // Wait for topic GC. + // Partition 0 will be deleted about 20s later, left 2min to avoid flaky. + producer0.close(); + Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> { + CompletableFuture> tp1 = pulsar.getBrokerService().getTopic(partition0, false); + CompletableFuture> tp2 = pulsar.getBrokerService().getTopic(partition1, false); + assertTrue(tp1 == null || !tp1.get().isPresent()); + assertTrue(tp2 != null && tp2.get().isPresent()); + }); + + // Verify that the messages under "partition-1" still can be ack. + for (int i = 0; i < 2; i++) { + Message msg = consumer1.receive(2, TimeUnit.SECONDS); + assertNotNull(msg, "Expected at least received 2 messages."); + log.info("received msg[{}]: {}", i, msg.getValue()); + TopicMessageId messageId = (TopicMessageId) msg.getMessageId(); + if (messageId.getOwnerTopic().equals(partition1)) { + consumer1.acknowledgeAsync(msg); + } + } + consumer1.close(); + + // Verify that the consumer subscribed with partitioned topic can be created successful. + ConsumerBuilder consumerBuilder2 = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(subscription) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared); + setSubscribeTopic(consumerBuilder2, subscribeTopicType, topic, topicPattern); + Consumer consumer2 = consumerBuilder2.subscribe(); + producer1.send("partition-1-5"); + Message msg = consumer2.receive(2, TimeUnit.SECONDS); assertNotNull(msg); String receivedMsgValue = msg.getValue(); log.info("received msg: {}", receivedMsgValue); - consumerAllPartition.acknowledge(msg); + consumer2.acknowledge(msg); // cleanup. - consumerAllPartition.close(); + consumer2.close(); producer0.close(); producer1.close(); admin.topics().deletePartitionedTopic(topic); } + + @Test(timeOut = 180 * 1000) + public void testPhasePartDeletion() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topicPattern = "persistent://public/default/tp.*"; + final String partition0 = topic + "-partition-0"; + final String partition1 = topic + "-partition-1"; + final String partition2 = topic + "-partition-2"; + final String subscription = "s1"; + admin.topics().createPartitionedTopic(topic, 3); + // Create consumer. + PatternMultiTopicsConsumerImpl c1 = (PatternMultiTopicsConsumerImpl) pulsarClient + .newConsumer(Schema.STRING) + .subscriptionName(subscription) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared) + .topicsPattern(Pattern.compile(topicPattern)).subscribe(); + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 1); + assertEquals(partitionedTopics.get(topic), 3); + assertEquals(consumers.size(), 3); + assertTrue(consumers.containsKey(partition0)); + assertTrue(consumers.containsKey(partition1)); + assertTrue(consumers.containsKey(partition2)); + }); + // Delete partitions the first time. + admin.topics().delete(partition0, true); + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 1); + assertEquals(partitionedTopics.get(topic), 3); + assertEquals(consumers.size(), 2); + assertTrue(consumers.containsKey(partition1)); + assertTrue(consumers.containsKey(partition2)); + }); + // Delete partitions the second time. + admin.topics().delete(partition1, true); + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 1); + assertEquals(partitionedTopics.get(topic), 3); + assertEquals(consumers.size(), 1); + assertTrue(consumers.containsKey(partition2)); + }); + // Delete partitions the third time. + admin.topics().delete(partition2, true); + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 0); + assertEquals(consumers.size(), 0); + }); + + // cleanup. + c1.close(); + admin.topics().deletePartitionedTopic(topic); + } + + @Test(timeOut = 180 * 1000) + public void testExpandPartitions() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String topicPattern = "persistent://public/default/tp.*"; + final String partition0 = topic + "-partition-0"; + final String partition1 = topic + "-partition-1"; + final String subscription = "s1"; + admin.topics().createPartitionedTopic(topic, 2); + // Delete partitions. + admin.topics().delete(partition0, true); + admin.topics().delete(partition1, true); + // Create consumer. + PatternMultiTopicsConsumerImpl c1 = (PatternMultiTopicsConsumerImpl) pulsarClient + .newConsumer(Schema.STRING) + .subscriptionName(subscription) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared) + .topicsPattern(Pattern.compile(topicPattern)).subscribe(); + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 0); + assertEquals(consumers.size(), 0); + }); + // Trigger partitions creation. + pulsarClient.newConsumer(Schema.STRING).subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared).topic(topic).subscribe().close(); + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 1); + assertEquals(partitionedTopics.get(topic), 2); + assertEquals(consumers.size(), 2); + assertTrue(consumers.containsKey(partition0)); + assertTrue(consumers.containsKey(partition1)); + }); + // Expand partitions the first time. + admin.topics().updatePartitionedTopic(topic, 3); + final String partition2 = topic + "-partition-2"; + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 1); + assertEquals(partitionedTopics.get(topic), 3); + assertEquals(consumers.size(), 3); + assertTrue(consumers.containsKey(partition0)); + assertTrue(consumers.containsKey(partition1)); + assertTrue(consumers.containsKey(partition2)); + }); + // Expand partitions the second time. + admin.topics().updatePartitionedTopic(topic, 4); + final String partition3 = topic + "-partition-3"; + // Check subscriptions. + Awaitility.await().untilAsserted(() -> { + ConcurrentHashMap> consumers + = WhiteboxImpl.getInternalState(c1, "consumers"); + ConcurrentHashMap partitionedTopics + = WhiteboxImpl.getInternalState(c1, "partitionedTopics"); + assertEquals(partitionedTopics.size(), 1); + assertEquals(partitionedTopics.get(topic), 4); + assertEquals(consumers.size(), 4); + assertTrue(consumers.containsKey(partition0)); + assertTrue(consumers.containsKey(partition1)); + assertTrue(consumers.containsKey(partition2)); + assertTrue(consumers.containsKey(partition3)); + }); + + // cleanup. + c1.close(); + admin.topics().deletePartitionedTopic(topic); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java index 00a47c3957150..475477ac52149 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java @@ -18,11 +18,14 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.fail; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -95,4 +98,38 @@ private void testWithConsumer(Consumer consumer) throws Exception { consumer.close(); } + @Test(timeOut = 30000) + public void testFailedSubscribe() throws Exception { + final String topicName1 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test"); + final String topicName2 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test"); + final String topicName3 = BrokerTestUtil.newUniqueName("persistent://public/default/tp_test"); + final String subName = "s1"; + admin.topics().createPartitionedTopic(topicName1, 2); + admin.topics().createPartitionedTopic(topicName2, 3); + admin.topics().createNonPartitionedTopic(topicName3); + + // Register a exclusive consumer to makes the pattern consumer failed to subscribe. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName3).subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(subName).subscribe(); + + try { + PatternMultiTopicsConsumerImpl consumer = + (PatternMultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topicsPattern("persistent://public/default/tp_test.*") + .subscriptionType(SubscriptionType.Failover) + .subscriptionName(subName) + .subscribe(); + fail("Expected a consumer busy error."); + } catch (Exception ex) { + log.info("consumer busy", ex); + } + + c1.close(); + // Verify all internal consumer will be closed. + // If delete topic without "-f" work, it means the internal consumers were closed. + admin.topics().delete(topicName3); + admin.topics().deletePartitionedTopic(topicName2); + admin.topics().deletePartitionedTopic(topicName1); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index c5504a0c02a0c..9c19fadffb137 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -35,7 +34,6 @@ import java.util.regex.Pattern; import java.util.stream.IntStream; -import io.netty.util.Timeout; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; @@ -53,6 +51,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -1024,17 +1023,17 @@ public void testAutoUnsubscribePatternConsumer() throws Exception { // 6. remove producer 1,3; verify only consumer 2 left // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic. - List topicNames = Lists.newArrayList(topicName2); + String tp2p0 = TopicName.get(topicName2).getPartition(0).toString(); + String tp2p1 = TopicName.get(topicName2).getPartition(1).toString(); + List topicNames = Lists.newArrayList(tp2p0, tp2p1); NamespaceService nss = pulsar.getNamespaceService(); doReturn(CompletableFuture.completedFuture(topicNames)).when(nss) .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); // 7. call recheckTopics to unsubscribe topic 1,3, verify topics number: 2=6-1-3 log.debug("recheck topics change"); - PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); - Timeout recheckPatternTimeout = spy(consumer1.getRecheckPatternTimeout()); - doReturn(false).when(recheckPatternTimeout).isCancelled(); - consumer1.run(recheckPatternTimeout); + PatternConsumerUpdateQueue taskQueue = WhiteboxImpl.getInternalState(consumer, "updateTaskQueue"); + taskQueue.appendRecheckOp(); Thread.sleep(100); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 2); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index c343ab0d6e294..83cb5f2a4400b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.util.Timeout; +import java.time.Duration; import lombok.Cleanup; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; @@ -1321,7 +1322,6 @@ public void testPartitionsUpdatesForMultipleTopics() throws Exception { Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2); admin.topics().updatePartitionedTopic(topicName0, 5); - consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); Awaitility.await().untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5); @@ -1341,9 +1341,8 @@ public void testPartitionsUpdatesForMultipleTopics() throws Exception { }); admin.topics().updatePartitionedTopic(topicName1, 5); - consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10); Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10); }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 05081dcaa07ea..74abb82bfe809 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.util.Timeout; import java.nio.charset.StandardCharsets; @@ -1285,5 +1286,10 @@ public boolean hasBatchReceiveTimeout() { return batchReceiveTimeout != null; } + @VisibleForTesting + CompletableFuture> getSubscribeFuture() { + return subscribeFuture; + } + private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index ccd1f6b23f2f3..2fe457059c1e9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -117,7 +117,14 @@ CompletableFuture getPartitionedTopicMetadata(TopicNam InetSocketAddress resolveHost(); /** - * Returns all the topics name for a given namespace. + * Returns all the topics that matches {@param topicPattern} for a given namespace. + * + * Note: {@param topicPattern} it relate to the topic name(without the partition suffix). For example: + * - There is a partitioned topic "tp-a" with two partitions. + * - tp-a-partition-0 + * - tp-a-partition-1 + * - If {@param topicPattern} is "tp-a", the consumer will subscribe to the two partitions. + * - if {@param topicPattern} is "tp-a-partition-0", the consumer will subscribe nothing. * * @param namespace : namespace-name * @return diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8047e05351ac1..e8cbf71e500c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.annotation.Nullable; @@ -68,6 +70,7 @@ import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.CompletableFutureCancellationHandler; @@ -81,14 +84,14 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-"; // Map , when get do ACK, consumer will by find by topic name - private final ConcurrentHashMap> consumers; + protected final ConcurrentHashMap> consumers; // Map , store partition number for each topic protected final ConcurrentHashMap partitionedTopics; // Queue of partition consumers on which we have stopped calling receiveAsync() because the // shared incoming queue was full - private final ConcurrentLinkedQueue> pausedConsumers; + protected final ConcurrentLinkedQueue> pausedConsumers; // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. AtomicInteger allTopicPartitionsNumber; @@ -1009,8 +1012,12 @@ CompletableFuture subscribeAsync(String topicName, int numberPartitions) { new PulsarClientException.AlreadyClosedException("Topic name not valid")); } String fullTopicName = topicNameInstance.toString(); - if (consumers.containsKey(fullTopicName) - || partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) { + if (consumers.containsKey(fullTopicName)) { + return FutureUtil.failedFuture( + new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName)); + } + if (!topicNameInstance.isPartitioned() + && partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) { return FutureUtil.failedFuture( new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName)); } @@ -1047,7 +1054,7 @@ private void doSubscribeTopicPartitions(Schema schema, log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions); } - List>> futureList; + CompletableFuture subscribeAllPartitionsFuture; if (numPartitions != PartitionedTopicMetadata.NON_PARTITIONED) { // Below condition is true if subscribeAsync() has been invoked second time with same // topicName before the first invocation had reached this point. @@ -1067,30 +1074,50 @@ private void doSubscribeTopicPartitions(Schema schema, ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setReceiverQueueSize(receiverQueueSize); - futureList = IntStream - .range(0, numPartitions) - .mapToObj( - partitionIndex -> { - String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); - CompletableFuture> subFuture = new CompletableFuture<>(); - configurationData.setStartPaused(paused); - ConsumerImpl newConsumer = createInternalConsumer(configurationData, partitionName, - partitionIndex, subFuture, createIfDoesNotExist, schema); - synchronized (pauseMutex) { - if (paused) { - newConsumer.pause(); - } else { - newConsumer.resume(); - } - consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + CompletableFuture> partitionsFuture; + if (createIfDoesNotExist || !TopicName.get(topicName).isPersistent()) { + partitionsFuture = CompletableFuture.completedFuture(IntStream.range(0, numPartitions) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toList())); + } else { + partitionsFuture = getExistsPartitions(topicName.toString()); + } + subscribeAllPartitionsFuture = partitionsFuture.thenCompose(partitions -> { + if (partitions.isEmpty()) { + partitionedTopics.remove(topicName, numPartitions); + return CompletableFuture.completedFuture(null); + } + List>> subscribeList = new ArrayList<>(); + for (int partitionIndex : partitions) { + String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); + CompletableFuture> subFuture = new CompletableFuture<>(); + configurationData.setStartPaused(paused); + ConsumerImpl newConsumer = createInternalConsumer(configurationData, partitionName, + partitionIndex, subFuture, createIfDoesNotExist, schema); + synchronized (pauseMutex) { + if (paused) { + newConsumer.pause(); + } else { + newConsumer.resume(); } - return subFuture; - }) - .collect(Collectors.toList()); + Consumer originalValue = consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + if (originalValue != null) { + newConsumer.closeAsync().exceptionally(ex -> { + log.error("[{}] [{}] Failed to close the orphan consumer", + partitionName, subscription, ex); + return null; + }); + } + } + subscribeList.add(subFuture); + } + return FutureUtil.waitForAll(subscribeList); + }); } else { allTopicPartitionsNumber.incrementAndGet(); - CompletableFuture> subFuture = new CompletableFuture<>(); + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + subscribeAllPartitionsFuture = subscribeFuture.thenAccept(__ -> {}); synchronized (pauseMutex) { consumers.compute(topicName, (key, existingValue) -> { @@ -1104,7 +1131,7 @@ private void doSubscribeTopicPartitions(Schema schema, } else { internalConfig.setStartPaused(paused); ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, - -1, subFuture, createIfDoesNotExist, schema); + -1, subscribeFuture, createIfDoesNotExist, schema); if (paused) { newConsumer.pause(); } else { @@ -1114,11 +1141,10 @@ private void doSubscribeTopicPartitions(Schema schema, } }); } - futureList = Collections.singletonList(subFuture); + } - FutureUtil.waitForAll(futureList) - .thenAccept(finalFuture -> { + subscribeAllPartitionsFuture.thenAccept(finalFuture -> { if (allTopicPartitionsNumber.get() > getCurrentReceiverQueueSize()) { setCurrentReceiverQueueSize(allTopicPartitionsNumber.get()); } @@ -1139,6 +1165,8 @@ private void doSubscribeTopicPartitions(Schema schema, return; }) .exceptionally(ex -> { + log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, + ex.getMessage()); handleSubscribeOneTopicError(topicName, ex, subscribeResult); return null; }); @@ -1162,7 +1190,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf } // handling failure during subscribe new topic, unsubscribe success created partitions - private void handleSubscribeOneTopicError(String topicName, + protected void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture subscribeFuture) { log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage()); @@ -1255,59 +1283,6 @@ public CompletableFuture unsubscribeAsync(String topicName) { return unsubscribeFuture; } - /*** - * Remove a consumer for a topic. - * @param topicName topic name contains the partition suffix. - */ - public CompletableFuture removeConsumerAsync(String topicName) { - checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName); - - if (getState() == State.Closing || getState() == State.Closed) { - return FutureUtil.failedFuture( - new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed")); - } - - CompletableFuture unsubscribeFuture = new CompletableFuture<>(); - String topicPartName = TopicName.get(topicName).getPartitionedTopicName(); - - - List> consumersToClose = consumers.values().stream() - .filter(consumer -> { - String consumerTopicName = consumer.getTopic(); - return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName); - }).collect(Collectors.toList()); - - List> futureList = consumersToClose.stream() - .map(ConsumerImpl::closeAsync).collect(Collectors.toList()); - - FutureUtil.waitForAll(futureList) - .whenComplete((r, ex) -> { - if (ex == null) { - consumersToClose.forEach(consumer1 -> { - consumers.remove(consumer1.getTopic()); - pausedConsumers.remove(consumer1); - allTopicPartitionsNumber.decrementAndGet(); - }); - - removeTopic(topicName); - if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker) { - ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName); - } - - unsubscribeFuture.complete(null); - log.info("[{}] [{}] [{}] Removed Topics Consumer, allTopicPartitionsNumber: {}", - topicName, subscription, consumerName, allTopicPartitionsNumber); - } else { - unsubscribeFuture.completeExceptionally(ex); - setState(State.Failed); - log.error("[{}] [{}] [{}] Could not remove Topics Consumer", - topicName, subscription, consumerName, ex.getCause()); - } - }); - - return unsubscribeFuture; - } - // get topics name public List getPartitionedTopics() { @@ -1573,4 +1548,51 @@ protected void setCurrentReceiverQueueSize(int newSize) { CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize); resumeReceivingFromPausedConsumersIfNeeded(); } + + /** + * Get the exists partitions of a partitioned topic, the result does not contain the partitions which has not been + * created yet(in other words, the partitions that do not exist in the response of "pulsar-admin topics list"). + * @return sorted partitions list if it is a partitioned topic; @return an empty list if it is a non-partitioned + * topic. + */ + private CompletableFuture> getExistsPartitions(String topic) { + TopicName topicName = TopicName.get(topic); + if (!topicName.isPersistent()) { + return FutureUtil.failedFuture(new IllegalArgumentException("The method getExistsPartitions" + + " does not support non-persistent topic yet.")); + } + return client.getLookup().getTopicsUnderNamespace(topicName.getNamespaceObject(), + CommandGetTopicsOfNamespace.Mode.PERSISTENT, + TopicName.getPattern(topicName.getPartitionedTopicName()), + null).thenApply(getTopicsResult -> { + if (getTopicsResult.getNonPartitionedOrPartitionTopics() == null + || getTopicsResult.getNonPartitionedOrPartitionTopics().isEmpty()) { + return Collections.emptyList(); + } + // If broker version is less than "2.11.x", it does not support broker-side pattern check, so append + // a client-side pattern check. + // If lookup service is typed HttpLookupService, the HTTP API does not support broker-side pattern + // check yet, so append a client-side pattern check. + Predicate clientSideFilter; + if (getTopicsResult.isFiltered()) { + clientSideFilter = __ -> true; + } else { + clientSideFilter = + tp -> Pattern.compile(TopicName.getPartitionPattern(topic)).matcher(tp).matches(); + } + ArrayList list = new ArrayList<>(getTopicsResult.getNonPartitionedOrPartitionTopics().size()); + for (String partition : getTopicsResult.getNonPartitionedOrPartitionTopics()) { + int partitionIndex = TopicName.get(partition).getPartitionIndex(); + if (partitionIndex < 0) { + // It is not a partition. + continue; + } + if (clientSideFilter.test(partition)) { + list.add(partitionIndex); + } + } + Collections.sort(list); + return list; + }); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java new file mode 100644 index 0000000000000..d6eba6463a07d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Used to make all tasks that will modify subscriptions will be executed one by one, and skip the unnecessary updating. + * + * So far, four three scenarios that will modify subscriptions: + * 1. When start pattern consumer. + * 2. After topic list watcher reconnected, it will call {@link PatternMultiTopicsConsumerImpl#recheckTopicsChange()}. + * this scenario only exists in the version >= 2.11 (both client-version and broker version are >= 2.11). + * 3. A scheduled task will call {@link PatternMultiTopicsConsumerImpl#recheckTopicsChange()}, this scenario only + * exists in the version < 2.11. + * 4. The topics change events will trigger a + * {@link PatternMultiTopicsConsumerImpl#topicsChangeListener#onTopicsRemoved(Collection)} or + * {@link PatternMultiTopicsConsumerImpl#topicsChangeListener#onTopicsAdded(Collection)}. + * + * When you are using this client connect to the broker whose version >= 2.11, there are three scenarios: [1, 2, 4]. + * When you are using this client connect to the broker whose version < 2.11, there is only one scenario: [3] and all + * the event will run in the same thread. + */ +@Slf4j +@SuppressFBWarnings("EI_EXPOSE_REP2") +public class PatternConsumerUpdateQueue { + + private static final Pair> RECHECK_OP = + Pair.of(UpdateSubscriptionType.RECHECK, null); + + private final LinkedBlockingQueue>> pendingTasks; + + private final PatternMultiTopicsConsumerImpl patternConsumer; + + private final PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener; + + /** + * Whether there is a task is in progress, this variable is used to confirm whether a next-task triggering is + * needed. + */ + private Pair> taskInProgress = null; + + /** + * Whether there is a recheck task in queue. + * - Since recheck task will do all changes, it can be used to compress multiple tasks to one. + * - To avoid skipping the newest changes, once the recheck task is starting to work, this variable will be set + * to "false". + */ + private boolean recheckTaskInQueue = false; + + private volatile long lastRecheckTaskStartingTimestamp = 0; + + private boolean closed; + + public PatternConsumerUpdateQueue(PatternMultiTopicsConsumerImpl patternConsumer) { + this(patternConsumer, patternConsumer.topicsChangeListener); + } + + /** This constructor is only for test. **/ + @VisibleForTesting + public PatternConsumerUpdateQueue(PatternMultiTopicsConsumerImpl patternConsumer, + PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener) { + this.patternConsumer = patternConsumer; + this.topicsChangeListener = topicsChangeListener; + this.pendingTasks = new LinkedBlockingQueue<>(); + // To avoid subscribing and topics changed events execute concurrently, let the change events starts after the + // subscribing task. + doAppend(Pair.of(UpdateSubscriptionType.CONSUMER_INIT, null)); + } + + synchronized void appendTopicsAddedOp(Collection topics) { + if (topics == null || topics.isEmpty()) { + return; + } + doAppend(Pair.of(UpdateSubscriptionType.TOPICS_ADDED, topics)); + } + + synchronized void appendTopicsRemovedOp(Collection topics) { + if (topics == null || topics.isEmpty()) { + return; + } + doAppend(Pair.of(UpdateSubscriptionType.TOPICS_REMOVED, topics)); + } + + synchronized void appendRecheckOp() { + doAppend(RECHECK_OP); + } + + synchronized void doAppend(Pair> task) { + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] try to append task. {} {}", patternConsumer.getSubscription(), + task.getLeft(), task.getRight() == null ? "" : task.getRight()); + } + // Once there is a recheck task in queue, it means other tasks can be skipped. + if (recheckTaskInQueue) { + return; + } + + // Once there are too many tasks in queue, compress them as a recheck task. + if (pendingTasks.size() >= 30 && !task.getLeft().equals(UpdateSubscriptionType.RECHECK)) { + appendRecheckOp(); + return; + } + + pendingTasks.add(task); + if (task.getLeft().equals(UpdateSubscriptionType.RECHECK)) { + recheckTaskInQueue = true; + } + + // If no task is in-progress, trigger a task execution. + if (taskInProgress == null) { + triggerNextTask(); + } + } + + synchronized void triggerNextTask() { + if (closed) { + return; + } + + final Pair> task = pendingTasks.poll(); + + // No pending task. + if (task == null) { + taskInProgress = null; + return; + } + + // If there is a recheck task in queue, skip others and only call the recheck task. + if (recheckTaskInQueue && !task.getLeft().equals(UpdateSubscriptionType.RECHECK)) { + triggerNextTask(); + return; + } + + // Execute pending task. + CompletableFuture newTaskFuture = null; + switch (task.getLeft()) { + case CONSUMER_INIT: { + newTaskFuture = patternConsumer.getSubscribeFuture().thenAccept(__ -> {}).exceptionally(ex -> { + // If the subscribe future was failed, the consumer will be closed. + synchronized (PatternConsumerUpdateQueue.this) { + this.closed = true; + patternConsumer.closeAsync().exceptionally(ex2 -> { + log.error("Pattern consumer failed to close, this error may left orphan consumers." + + " Subscription: {}", patternConsumer.getSubscription()); + return null; + }); + } + return null; + }); + break; + } + case TOPICS_ADDED: { + newTaskFuture = topicsChangeListener.onTopicsAdded(task.getRight()); + break; + } + case TOPICS_REMOVED: { + newTaskFuture = topicsChangeListener.onTopicsRemoved(task.getRight()); + break; + } + case RECHECK: { + recheckTaskInQueue = false; + lastRecheckTaskStartingTimestamp = System.currentTimeMillis(); + newTaskFuture = patternConsumer.recheckTopicsChange(); + break; + } + default: { + throw new RuntimeException("Un-support UpdateSubscriptionType"); + } + } + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] starting task. {} {} ", patternConsumer.getSubscription(), + task.getLeft(), task.getRight() == null ? "" : task.getRight()); + } + // Trigger next pending task. + taskInProgress = Pair.of(task.getLeft(), newTaskFuture); + newTaskFuture.thenAccept(ignore -> { + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] task finished. {} {} ", patternConsumer.getSubscription(), + task.getLeft(), task.getRight() == null ? "" : task.getRight()); + } + triggerNextTask(); + }).exceptionally(ex -> { + /** + * Once a updating fails, trigger a delayed new recheck task to guarantee all things is correct. + * - Skip if there is already a recheck task in queue. + * - Skip if the last recheck task has been executed after the current time. + */ + log.error("Pattern consumer [{}] task finished. {} {}. But it failed", patternConsumer.getSubscription(), + task.getLeft(), task.getRight() == null ? "" : task.getRight(), ex); + // Skip if there is already a recheck task in queue. + synchronized (PatternConsumerUpdateQueue.this) { + if (recheckTaskInQueue || PatternConsumerUpdateQueue.this.closed) { + return null; + } + } + // Skip if the last recheck task has been executed after the current time. + long failedTime = System.currentTimeMillis(); + patternConsumer.getClient().timer().newTimeout(timeout -> { + if (lastRecheckTaskStartingTimestamp <= failedTime) { + appendRecheckOp(); + } + }, 10, TimeUnit.SECONDS); + triggerNextTask(); + return null; + }); + } + + public synchronized CompletableFuture cancelAllAndWaitForTheRunningTask() { + this.closed = true; + if (taskInProgress == null) { + return CompletableFuture.completedFuture(null); + } + // If the in-progress task is consumer init task, it means nothing is in-progress. + if (taskInProgress.getLeft().equals(UpdateSubscriptionType.CONSUMER_INIT)) { + return CompletableFuture.completedFuture(null); + } + return taskInProgress.getRight().thenAccept(__ -> {}).exceptionally(ex -> null); + } + + private enum UpdateSubscriptionType { + /** A marker that indicates the consumer's subscribe task.**/ + CONSUMER_INIT, + /** Triggered by {@link PatternMultiTopicsConsumerImpl#topicsChangeListener}.**/ + TOPICS_ADDED, + /** Triggered by {@link PatternMultiTopicsConsumerImpl#topicsChangeListener}.**/ + TOPICS_REMOVED, + /** A fully check for pattern consumer. **/ + RECHECK; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index ffca79dfa4342..70ba3e33963f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -28,12 +28,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -42,6 +42,7 @@ import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.BackoffBuilder; @@ -51,7 +52,7 @@ public class PatternMultiTopicsConsumerImpl extends MultiTopicsConsumerImpl implements TimerTask { private final Pattern topicsPattern; - private final TopicsChangedListener topicsChangeListener; + final TopicsChangedListener topicsChangeListener; private final Mode subscriptionMode; private final CompletableFuture watcherFuture = new CompletableFuture<>(); protected NamespaceName namespaceName; @@ -69,6 +70,8 @@ public class PatternMultiTopicsConsumerImpl extends MultiTopicsConsumerImpl recheckTopicsChangeAfterReconnect()); watcherFuture .thenAccept(__ -> recheckPatternTimeout.cancel()) .exceptionally(ex -> { - log.warn("Unable to create topic list watcher. Falling back to only polling for new topics", ex); + log.warn("Pattern consumer [{}] unable to create topic list watcher. Falling back to only polling" + + " for new topics", conf.getSubscriptionName(), ex); return null; }); } else { - log.debug("Not creating topic list watcher for subscription mode {}", subscriptionMode); + log.debug("Pattern consumer [{}] not creating topic list watcher for subscription mode {}", + conf.getSubscriptionName(), subscriptionMode); watcherFuture.complete(null); } } @@ -129,17 +135,7 @@ private void recheckTopicsChangeAfterReconnect() { return; } // Do check. - recheckTopicsChange().whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); - long delayMs = recheckPatternTaskBackoff.next(); - client.timer().newTimeout(timeout -> { - recheckTopicsChangeAfterReconnect(); - }, delayMs, TimeUnit.MILLISECONDS); - } else { - recheckPatternTaskBackoff.reset(); - } - }); + updateTaskQueue.appendRecheckOp(); } // TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change. @@ -148,18 +144,10 @@ public void run(Timeout timeout) throws Exception { if (timeout.isCancelled()) { return; } - recheckTopicsChange().exceptionally(ex -> { - log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); - return null; - }).thenAccept(__ -> { - // schedule the next re-check task - this.recheckPatternTimeout = client.timer() - .newTimeout(PatternMultiTopicsConsumerImpl.this, - Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); - }); + updateTaskQueue.appendRecheckOp(); } - private CompletableFuture recheckTopicsChange() { + CompletableFuture recheckTopicsChange() { String pattern = topicsPattern.pattern(); final int epoch = recheckPatternEpoch.incrementAndGet(); return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, pattern, topicsHash) @@ -172,22 +160,18 @@ private CompletableFuture recheckTopicsChange() { return CompletableFuture.completedFuture(null); } if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}", + log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {}," + + " topicsHash: {}, filtered: {}", + PatternMultiTopicsConsumerImpl.this.getSubscription(), namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(), getTopicsResult.isFiltered()); getTopicsResult.getTopics().forEach(topicName -> log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName)); } - final List oldTopics = new ArrayList<>(getPartitionedTopics()); - for (String partition : getPartitions()) { - TopicName topicName = TopicName.get(partition); - if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) { - oldTopics.add(partition); - } - } + final List oldTopics = new ArrayList<>(getPartitions()); return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult, - topicsChangeListener, oldTopics); + topicsChangeListener, oldTopics, subscription); } }); } @@ -196,7 +180,8 @@ static CompletableFuture updateSubscriptions(Pattern topicsPattern, java.util.function.Consumer topicsHashSetter, GetTopicsResult getTopicsResult, TopicsChangedListener topicsChangedListener, - List oldTopics) { + List oldTopics, + String subscriptionForLog) { topicsHashSetter.accept(getTopicsResult.getTopicsHash()); if (!getTopicsResult.isChanged()) { return CompletableFuture.completedFuture(null); @@ -204,14 +189,20 @@ static CompletableFuture updateSubscriptions(Pattern topicsPattern, List newTopics; if (getTopicsResult.isFiltered()) { - newTopics = getTopicsResult.getTopics(); + newTopics = getTopicsResult.getNonPartitionedOrPartitionTopics(); } else { - newTopics = TopicList.filterTopics(getTopicsResult.getTopics(), topicsPattern); + newTopics = getTopicsResult.filterTopics(topicsPattern).getNonPartitionedOrPartitionTopics(); } final List> listenersCallback = new ArrayList<>(2); - listenersCallback.add(topicsChangedListener.onTopicsAdded(TopicList.minus(newTopics, oldTopics))); - listenersCallback.add(topicsChangedListener.onTopicsRemoved(TopicList.minus(oldTopics, newTopics))); + Set topicsAdded = TopicList.minus(newTopics, oldTopics); + Set topicsRemoved = TopicList.minus(oldTopics, newTopics); + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] Recheck pattern consumer's topics. topicsAdded: {}, topicsRemoved: {}", + subscriptionForLog, topicsAdded, topicsRemoved); + } + listenersCallback.add(topicsChangedListener.onTopicsAdded(topicsAdded)); + listenersCallback.add(topicsChangedListener.onTopicsRemoved(topicsRemoved)); return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback)); } @@ -247,23 +238,68 @@ private class PatternTopicsChangedListener implements TopicsChangedListener { */ @Override public CompletableFuture onTopicsRemoved(Collection removedTopics) { - CompletableFuture removeFuture = new CompletableFuture<>(); - if (removedTopics.isEmpty()) { - removeFuture.complete(null); - return removeFuture; + return CompletableFuture.completedFuture(null); } - List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); - removedTopics.stream().forEach(topic -> futures.add(removeConsumerAsync(topic))); - FutureUtil.waitForAll(futures) - .thenAccept(finalFuture -> removeFuture.complete(null)) - .exceptionally(ex -> { - log.warn("[{}] Failed to unsubscribe from topics: {}", topic, ex.getMessage()); - removeFuture.completeExceptionally(ex); + // Unsubscribe and remove consumers in memory. + List> unsubscribeList = new ArrayList<>(removedTopics.size()); + Set partialRemoved = new HashSet<>(removedTopics.size()); + Set partialRemovedForLog = new HashSet<>(removedTopics.size()); + for (String tp : removedTopics) { + TopicName topicName = TopicName.get(tp); + ConsumerImpl consumer = consumers.get(topicName.toString()); + if (consumer != null) { + CompletableFuture unsubscribeFuture = new CompletableFuture<>(); + consumer.closeAsync().whenComplete((__, ex) -> { + if (ex != null) { + log.error("Pattern consumer [{}] failed to unsubscribe from topics: {}", + PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString(), ex); + unsubscribeFuture.completeExceptionally(ex); + } else { + consumers.remove(topicName.toString(), consumer); + unsubscribeFuture.complete(null); + } + }); + unsubscribeList.add(unsubscribeFuture); + partialRemoved.add(topicName.getPartitionedTopicName()); + partialRemovedForLog.add(topicName.toString()); + } + } + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] remove topics. {}", + PatternMultiTopicsConsumerImpl.this.getSubscription(), + partialRemovedForLog); + } + + // Remove partitioned topics in memory. + return FutureUtil.waitForAll(unsubscribeList).handle((__, ex) -> { + List removedPartitionedTopicsForLog = new ArrayList<>(); + for (String groupedTopicRemoved : partialRemoved) { + Integer partitions = partitionedTopics.get(groupedTopicRemoved); + if (partitions != null) { + boolean allPartitionsHasBeenRemoved = true; + for (int i = 0; i < partitions; i++) { + if (consumers.containsKey( + TopicName.get(groupedTopicRemoved).getPartition(i).toString())) { + allPartitionsHasBeenRemoved = false; + break; + } + } + if (allPartitionsHasBeenRemoved) { + removedPartitionedTopicsForLog.add(String.format("%s with %s partitions", + groupedTopicRemoved, partitions)); + partitionedTopics.remove(groupedTopicRemoved, partitions); + } + } + } + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] remove partitioned topics because all partitions have been" + + " removed. {}", PatternMultiTopicsConsumerImpl.this.getSubscription(), + removedPartitionedTopicsForLog); + } return null; }); - return removeFuture; } /** @@ -271,29 +307,90 @@ public CompletableFuture onTopicsRemoved(Collection removedTopics) */ @Override public CompletableFuture onTopicsAdded(Collection addedTopics) { - CompletableFuture addFuture = new CompletableFuture<>(); - if (addedTopics.isEmpty()) { - addFuture.complete(null); - return addFuture; + return CompletableFuture.completedFuture(null); } - - Set addTopicPartitionedName = addedTopics.stream() - .map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName()) - .collect(Collectors.toSet()); - - List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); - addTopicPartitionedName.forEach(partitionedTopic -> futures.add( - subscribeAsync(partitionedTopic, - false /* createTopicIfDoesNotExist */))); - FutureUtil.waitForAll(futures) - .thenAccept(finalFuture -> addFuture.complete(null)) - .exceptionally(ex -> { - log.warn("[{}] Failed to subscribe to topics: {}", topic, ex.getMessage()); - addFuture.completeExceptionally(ex); - return null; + List> futures = Lists.newArrayListWithExpectedSize(addedTopics.size()); + /** + * Three normal cases: + * 1. Expand partitions. + * 2. Non-partitioned topic, but has been subscribing. + * 3. Non-partitioned topic or Partitioned topic, but has not been subscribing. + * Two unexpected cases: + * Error-1: Received adding non-partitioned topic event, but has subscribed a partitioned topic with the + * same name. + * Error-2: Received adding partitioned topic event, but has subscribed a non-partitioned topic with the + * same name. + * + * Note: The events that triggered by {@link TopicsPartitionChangedListener} after expanding partitions has + * been disabled through "conf.setAutoUpdatePartitions(false)" when creating + * {@link PatternMultiTopicsConsumerImpl}. + */ + Set groupedTopics = new HashSet<>(); + List expendPartitionsForLog = new ArrayList<>(); + for (String tp : addedTopics) { + TopicName topicName = TopicName.get(tp); + groupedTopics.add(topicName.getPartitionedTopicName()); + } + for (String tp : addedTopics) { + TopicName topicName = TopicName.get(tp); + // Case 1: Expand partitions. + if (partitionedTopics.containsKey(topicName.getPartitionedTopicName())) { + if (consumers.containsKey(topicName.toString())) { + // Already subscribed. + } else if (topicName.getPartitionIndex() < 0) { + // Error-1: Received adding non-partitioned topic event, but has subscribed a partitioned topic + // with the same name. + log.error("Pattern consumer [{}] skip to subscribe to the non-partitioned topic {}, because has" + + "subscribed a partitioned topic with the same name", + PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString()); + } else { + if (topicName.getPartitionIndex() + 1 + > partitionedTopics.get(topicName.getPartitionedTopicName())) { + partitionedTopics.put(topicName.getPartitionedTopicName(), + topicName.getPartitionIndex() + 1); + } + expendPartitionsForLog.add(topicName.toString()); + CompletableFuture consumerFuture = subscribeAsync(topicName.toString(), + PartitionedTopicMetadata.NON_PARTITIONED); + consumerFuture.whenComplete((__, ex) -> { + if (ex != null) { + log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}", + PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName, ex); + } + }); + futures.add(consumerFuture); + } + groupedTopics.remove(topicName.getPartitionedTopicName()); + } else if (consumers.containsKey(topicName.toString())) { + // Case-2: Non-partitioned topic, but has been subscribing. + groupedTopics.remove(topicName.getPartitionedTopicName()); + } else if (consumers.containsKey(topicName.getPartitionedTopicName()) + && topicName.getPartitionIndex() >= 0) { + // Error-2: Received adding partitioned topic event, but has subscribed a non-partitioned topic + // with the same name. + log.error("Pattern consumer [{}] skip to subscribe to the partitioned topic {}, because has" + + "subscribed a non-partitioned topic with the same name", + PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName); + groupedTopics.remove(topicName.getPartitionedTopicName()); + } + } + // Case 3: Non-partitioned topic or Partitioned topic, which has not been subscribed. + for (String partitionedTopic : groupedTopics) { + CompletableFuture consumerFuture = subscribeAsync(partitionedTopic, false); + consumerFuture.whenComplete((__, ex) -> { + if (ex != null) { + log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}", + PatternMultiTopicsConsumerImpl.this.getSubscription(), partitionedTopic, ex); + } }); - return addFuture; + futures.add(consumerFuture); + } + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] add topics. expend partitions {}, new subscribing {}", + PatternMultiTopicsConsumerImpl.this.getSubscription(), expendPartitionsForLog, groupedTopics); + } + return FutureUtil.waitForAll(futures); } } @@ -313,7 +410,7 @@ public CompletableFuture closeAsync() { closeFutures.add(watcher.closeAsync()); } } - closeFutures.add(super.closeAsync()); + closeFutures.add(updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(__ -> super.closeAsync())); return FutureUtil.waitForAll(closeFutures); } @@ -322,5 +419,11 @@ Timeout getRecheckPatternTimeout() { return recheckPatternTimeout; } + protected void handleSubscribeOneTopicError(String topicName, + Throwable error, + CompletableFuture subscribeFuture) { + subscribeFuture.completeExceptionally(error); + } + private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f4afb2931cc9e..120bdeb569c69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -585,12 +585,13 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null) .thenAccept(getTopicsResult -> { if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}," - + " topicsHash: {}, changed: {}, filtered: {}", + log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {}," + + " topicsHash: {}, changed: {}, filtered: {}", conf.getSubscriptionName(), namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(), getTopicsResult.isChanged(), getTopicsResult.isFiltered()); getTopicsResult.getTopics().forEach(topicName -> - log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName)); + log.debug("Pattern consumer [{}] get topics under namespace {}, topic: {}", + conf.getSubscriptionName(), namespaceName, topicName)); } List topicsList = getTopicsResult.getTopics(); @@ -598,6 +599,14 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo topicsList = TopicList.filterTopics(getTopicsResult.getTopics(), pattern); } conf.getTopicNames().addAll(topicsList); + + if (log.isDebugEnabled()) { + log.debug("Pattern consumer [{}] initialize topics. {}", conf.getSubscriptionName(), + getTopicsResult.getNonPartitionedOrPartitionTopics()); + } + + // Pattern consumer has his unique check mechanism, so do not need the feature "autoUpdatePartitions". + conf.setAutoUpdatePartitions(false); ConsumerBase consumer = new PatternMultiTopicsConsumerImpl<>(pattern, getTopicsResult.getTopicsHash(), PulsarClientImpl.this, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java index 15922d1180ce0..0007f98b253a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java @@ -43,7 +43,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler. AtomicLongFieldUpdater .newUpdater(TopicListWatcher.class, "createWatcherDeadline"); - private final PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener; + private final PatternConsumerUpdateQueue patternConsumerUpdateQueue; private final String name; private final ConnectionHandler connectionHandler; private final Pattern topicsPattern; @@ -63,13 +63,13 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler. /*** * @param topicsPattern The regexp for the topic name(not contains partition suffix). */ - public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener, + public TopicListWatcher(PatternConsumerUpdateQueue patternConsumerUpdateQueue, PulsarClientImpl client, Pattern topicsPattern, long watcherId, NamespaceName namespace, String topicsHash, CompletableFuture watcherFuture, Runnable recheckTopicsChangeAfterReconnect) { super(client, topicsPattern.pattern()); - this.topicsChangeListener = topicsChangeListener; + this.patternConsumerUpdateQueue = patternConsumerUpdateQueue; this.name = "Watcher(" + topicsPattern + ")"; this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder() @@ -277,13 +277,7 @@ private void cleanupAtClose(CompletableFuture closeFuture, Throwable excep } public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) { - List deleted = update.getDeletedTopicsList(); - if (!deleted.isEmpty()) { - topicsChangeListener.onTopicsRemoved(deleted); - } - List added = update.getNewTopicsList(); - if (!added.isEmpty()) { - topicsChangeListener.onTopicsAdded(added); - } + patternConsumerUpdateQueue.appendTopicsRemovedOp(update.getDeletedTopicsList()); + patternConsumerUpdateQueue.appendTopicsAddedOp(update.getNewTopicsList()); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java new file mode 100644 index 0000000000000..01f0be6a85ef6 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.util.HashedWheelTimer; +import java.io.Closeable; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +@Test(groups = "utils") +public class PatternConsumerUpdateQueueTest { + + private QueueInstance createInstance(CompletableFuture customizedRecheckFuture, + CompletableFuture customizedPartialUpdateFuture, + CompletableFuture customizedConsumerInitFuture) { + return createInstance(customizedRecheckFuture, customizedPartialUpdateFuture, customizedConsumerInitFuture, + null, null); + } + + private QueueInstance createInstance(CompletableFuture customizedRecheckFuture, + CompletableFuture customizedPartialUpdateFuture, + CompletableFuture customizedConsumerInitFuture, + Collection successTopics, + Collection errorTopics) { + HashedWheelTimer timer = new HashedWheelTimer(new ExecutorProvider.ExtendedThreadFactory("timer-x", + Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); + PulsarClientImpl client = mock(PulsarClientImpl.class); + when(client.timer()).thenReturn(timer); + + PatternMultiTopicsConsumerImpl patternConsumer = mock(PatternMultiTopicsConsumerImpl.class); + when(patternConsumer.recheckTopicsChange()).thenReturn(customizedRecheckFuture); + when(patternConsumer.getClient()).thenReturn(client); + if (customizedConsumerInitFuture != null) { + when(patternConsumer.getSubscribeFuture()).thenReturn(customizedConsumerInitFuture); + } else { + when(patternConsumer.getSubscribeFuture()).thenReturn(CompletableFuture.completedFuture(null)); + } + + PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener = + mock(PatternMultiTopicsConsumerImpl.TopicsChangedListener.class); + if (successTopics == null && errorTopics == null) { + when(topicsChangeListener.onTopicsAdded(anyCollection())).thenReturn(customizedPartialUpdateFuture); + when(topicsChangeListener.onTopicsRemoved(anyCollection())).thenReturn(customizedPartialUpdateFuture); + } else { + CompletableFuture ex = FutureUtil.failedFuture(new RuntimeException("mock error")); + when(topicsChangeListener.onTopicsAdded(successTopics)).thenReturn(customizedPartialUpdateFuture); + when(topicsChangeListener.onTopicsRemoved(successTopics)).thenReturn(customizedPartialUpdateFuture); + when(topicsChangeListener.onTopicsAdded(errorTopics)).thenReturn(ex); + when(topicsChangeListener.onTopicsRemoved(errorTopics)).thenReturn(ex); + } + + PatternConsumerUpdateQueue queue = new PatternConsumerUpdateQueue(patternConsumer, topicsChangeListener); + return new QueueInstance(queue, patternConsumer, topicsChangeListener); + } + + private QueueInstance createInstance() { + CompletableFuture completedFuture = CompletableFuture.completedFuture(null); + return createInstance(completedFuture, completedFuture, completedFuture); + } + + @AllArgsConstructor + private static class QueueInstance implements Closeable { + private PatternConsumerUpdateQueue queue; + private PatternMultiTopicsConsumerImpl mockedConsumer; + private PatternMultiTopicsConsumerImpl.TopicsChangedListener mockedListener; + + @Override + public void close() { + mockedConsumer.getClient().timer().stop(); + } + } + + @Test + public void testTopicsChangedEvents() { + QueueInstance instance = createInstance(); + + Collection topics = Arrays.asList("a"); + for (int i = 0; i < 10; i++) { + instance.queue.appendTopicsAddedOp(topics); + instance.queue.appendTopicsRemovedOp(topics); + } + Awaitility.await().untilAsserted(() -> { + verify(instance.mockedListener, times(10)).onTopicsAdded(topics); + verify(instance.mockedListener, times(10)).onTopicsRemoved(topics); + }); + + // cleanup. + instance.close(); + } + + @Test + public void testRecheckTask() { + QueueInstance instance = createInstance(); + + for (int i = 0; i < 10; i++) { + instance.queue.appendRecheckOp(); + } + + Awaitility.await().untilAsserted(() -> { + verify(instance.mockedConsumer, times(10)).recheckTopicsChange(); + }); + + // cleanup. + instance.close(); + } + + @Test + public void testDelayedRecheckTask() { + CompletableFuture recheckFuture = new CompletableFuture<>(); + CompletableFuture partialUpdateFuture = CompletableFuture.completedFuture(null); + CompletableFuture consumerInitFuture = CompletableFuture.completedFuture(null); + QueueInstance instance = createInstance(recheckFuture, partialUpdateFuture, consumerInitFuture); + + for (int i = 0; i < 10; i++) { + instance.queue.appendRecheckOp(); + } + + recheckFuture.complete(null); + Awaitility.await().untilAsserted(() -> { + // The first task will be running, and never completed until all tasks have been added. + // Since the first was started, the second one will not be skipped. + // The others after the second task will be skipped. + // So the times that called "recheckTopicsChange" will be 2. + verify(instance.mockedConsumer, times(2)).recheckTopicsChange(); + }); + + // cleanup. + instance.close(); + } + + @Test + public void testCompositeTasks() { + CompletableFuture recheckFuture = new CompletableFuture<>(); + CompletableFuture partialUpdateFuture = CompletableFuture.completedFuture(null); + CompletableFuture consumerInitFuture = CompletableFuture.completedFuture(null); + QueueInstance instance = createInstance(recheckFuture, partialUpdateFuture, consumerInitFuture); + + Collection topics = Arrays.asList("a"); + for (int i = 0; i < 10; i++) { + instance.queue.appendRecheckOp(); + instance.queue.appendTopicsAddedOp(topics); + instance.queue.appendTopicsRemovedOp(topics); + } + recheckFuture.complete(null); + Awaitility.await().untilAsserted(() -> { + // The first task will be running, and never completed until all tasks have been added. + // Since the first was started, the second one will not be skipped. + // The others after the second task will be skipped. + // So the times that called "recheckTopicsChange" will be 2. + verify(instance.mockedConsumer, times(2)).recheckTopicsChange(); + // The tasks after the second "recheckTopicsChange" will be skipped due to there is a previous + // "recheckTopicsChange" that has not been executed. + // The tasks between the fist "recheckTopicsChange" and the second "recheckTopicsChange" will be skipped + // due to there is a following "recheckTopicsChange". + verify(instance.mockedListener, times(0)).onTopicsAdded(topics); + verify(instance.mockedListener, times(0)).onTopicsRemoved(topics); + }); + + // cleanup. + instance.close(); + } + + @Test + public void testErrorTask() { + CompletableFuture immediatelyCompleteFuture = CompletableFuture.completedFuture(null); + Collection successTopics = Arrays.asList("a"); + Collection errorTopics = Arrays.asList(UUID.randomUUID().toString()); + QueueInstance instance = createInstance(immediatelyCompleteFuture, immediatelyCompleteFuture, + immediatelyCompleteFuture, successTopics, errorTopics); + + instance.queue.appendTopicsAddedOp(successTopics); + instance.queue.appendTopicsRemovedOp(successTopics); + instance.queue.appendTopicsAddedOp(errorTopics); + instance.queue.appendTopicsAddedOp(successTopics); + instance.queue.appendTopicsRemovedOp(successTopics); + + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + verify(instance.mockedListener, times(2)).onTopicsAdded(successTopics); + verify(instance.mockedListener, times(2)).onTopicsRemoved(successTopics); + verify(instance.mockedListener, times(1)).onTopicsAdded(errorTopics); + // After an error task will push a recheck task to offset. + verify(instance.mockedConsumer, times(1)).recheckTopicsChange(); + }); + + // cleanup. + instance.close(); + } + + @Test + public void testFailedSubscribe() { + CompletableFuture immediatelyCompleteFuture = CompletableFuture.completedFuture(null); + CompletableFuture consumerInitFuture = new CompletableFuture<>(); + Collection successTopics = Arrays.asList("a"); + Collection errorTopics = Arrays.asList(UUID.randomUUID().toString()); + QueueInstance instance = createInstance(immediatelyCompleteFuture, immediatelyCompleteFuture, + consumerInitFuture, successTopics, errorTopics); + + instance.queue.appendTopicsAddedOp(successTopics); + instance.queue.appendTopicsRemovedOp(successTopics); + instance.queue.appendTopicsAddedOp(errorTopics); + instance.queue.appendTopicsAddedOp(successTopics); + instance.queue.appendTopicsRemovedOp(successTopics); + + // Consumer init failed after multi topics changes. + // All the topics changes events should be skipped. + consumerInitFuture.completeExceptionally(new RuntimeException("mocked ex")); + Awaitility.await().untilAsserted(() -> { + verify(instance.mockedListener, times(0)).onTopicsAdded(successTopics); + verify(instance.mockedListener, times(0)).onTopicsRemoved(successTopics); + verify(instance.mockedListener, times(0)).onTopicsAdded(errorTopics); + verify(instance.mockedConsumer, times(0)).recheckTopicsChange(); + }); + + // cleanup. + instance.close(); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java index 116a69b63e4ec..3dfb23f31954a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java @@ -61,7 +61,7 @@ public void testChangedUnfilteredResponse() { "persistent://tenant/my-ns/non-matching"), null, false, true), mockListener, - Collections.emptyList()); + Collections.emptyList(), ""); verify(mockListener).onTopicsAdded(Sets.newHashSet( "persistent://tenant/my-ns/name-1", "persistent://tenant/my-ns/name-2")); @@ -80,7 +80,7 @@ public void testChangedFilteredResponse() { "persistent://tenant/my-ns/name-2"), "TOPICS_HASH", true, true), mockListener, - Arrays.asList("persistent://tenant/my-ns/name-0")); + Arrays.asList("persistent://tenant/my-ns/name-0"), ""); verify(mockListener).onTopicsAdded(Sets.newHashSet( "persistent://tenant/my-ns/name-1", "persistent://tenant/my-ns/name-2")); @@ -99,7 +99,7 @@ public void testUnchangedResponse() { "persistent://tenant/my-ns/name-2"), "TOPICS_HASH", true, false), mockListener, - Arrays.asList("persistent://tenant/my-ns/name-0")); + Arrays.asList("persistent://tenant/my-ns/name-0"), ""); verify(mockListener, never()).onTopicsAdded(any()); verify(mockListener, never()).onTopicsRemoved(any()); verify(mockTopicsHashSetter).accept("TOPICS_HASH"); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java index 74a71f3da850d..7daf316c4c576 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.naming.NamespaceName; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -68,8 +69,17 @@ public void setup() { thenReturn(clientCnxFuture.thenApply(clientCnx -> Pair.of(clientCnx, false))); when(client.getConnection(any(), any(), anyInt())).thenReturn(clientCnxFuture); when(connectionPool.getConnection(any(), any(), anyInt())).thenReturn(clientCnxFuture); + + CompletableFuture completedFuture = CompletableFuture.completedFuture(null); + PatternMultiTopicsConsumerImpl patternConsumer = mock(PatternMultiTopicsConsumerImpl.class); + when(patternConsumer.getSubscribeFuture()).thenReturn(completedFuture); + when(patternConsumer.recheckTopicsChange()).thenReturn(completedFuture); + when(listener.onTopicsAdded(anyCollection())).thenReturn(completedFuture); + when(listener.onTopicsRemoved(anyCollection())).thenReturn(completedFuture); + PatternConsumerUpdateQueue queue = new PatternConsumerUpdateQueue(patternConsumer, listener); + watcherFuture = new CompletableFuture<>(); - watcher = new TopicListWatcher(listener, client, + watcher = new TopicListWatcher(queue, client, Pattern.compile(topic), 7, NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {}); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java index 80f16e6c36717..26a295264fcae 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.common.lookup; +import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import lombok.Getter; import lombok.ToString; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; @@ -119,4 +122,25 @@ public List getTopics() { return topics; } } + + public GetTopicsResult filterTopics(Pattern topicsPattern) { + List topicsFiltered = TopicList.filterTopics(getTopics(), topicsPattern); + // If nothing changed. + if (topicsFiltered.equals(getTopics())) { + GetTopicsResult newObj = new GetTopicsResult(nonPartitionedOrPartitionTopics, null, true, true); + newObj.topics = topics; + return newObj; + } + // Filtered some topics. + Set topicsFilteredSet = new HashSet<>(topicsFiltered); + List newTps = new ArrayList<>(); + for (String tp: nonPartitionedOrPartitionTopics) { + if (topicsFilteredSet.contains(TopicName.get(tp).getPartitionedTopicName())) { + newTps.add(tp); + } + } + GetTopicsResult newObj = new GetTopicsResult(newTps, null, true, true); + newObj.topics = topicsFiltered; + return newObj; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index e051e01495dbe..d264eab9574ef 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -23,6 +23,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.re2j.Pattern; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; @@ -102,6 +103,14 @@ public static boolean isValid(String topic) { } } + public static String getPartitionPattern(String topic) { + return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "-partition-[0-9]+$"; + } + + public static String getPattern(String topic) { + return "^" + Pattern.quote(get(topic).getPartitionedTopicName().toString()) + "$"; + } + @SuppressFBWarnings("DCN_NULLPOINTER_EXCEPTION") private TopicName(String completeTopicName) { try { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 0628d494af3af..454eee0f966c5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -54,6 +54,9 @@ public class FutureUtil { * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete */ public static CompletableFuture waitForAll(Collection> futures) { + if (futures == null || futures.isEmpty()) { + return CompletableFuture.completedFuture(null); + } return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); }