diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index eb75963061edd..0e9c09d08021b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import com.google.common.collect.Sets; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; @@ -55,7 +56,9 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; @@ -717,5 +720,16 @@ protected void sleepSeconds(int seconds){ } } + public static void reconnectAllConnections(PulsarClientImpl c) throws Exception { + ConnectionPool pool = c.getCnxPool(); + Method closeAllConnections = ConnectionPool.class.getDeclaredMethod("closeAllConnections", new Class[]{}); + closeAllConnections.setAccessible(true); + closeAllConnections.invoke(pool, new Object[]{}); + } + + public void reconnectAllConnections() throws Exception { + reconnectAllConnections((PulsarClientImpl) pulsarClient); + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index c708b4cae0a19..9bcbdfed4c9ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; +import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -35,6 +36,7 @@ import java.util.stream.IntStream; import io.netty.util.Timeout; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; @@ -679,6 +681,111 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchi } } + @DataProvider(name= "partitioned") + public Object[][] partitioned(){ + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(timeOut = testTimeout, dataProvider = "partitioned") + public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscriptionName = "s1"; + final Pattern pattern = Pattern.compile(String.format("%s$", topicName)); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + // Disable automatic discovery. + .patternAutoDiscoveryPeriod(1000) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + // 1. create topic. + if (partitioned) { + admin.topics().createPartitionedTopic(topicName, 1); + } else { + admin.topics().createNonPartitionedTopic(topicName); + } + + // 2. verify consumer can subscribe the topic. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + Awaitility.await().untilAsserted(() -> { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 1); + if (partitioned) { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); + } else { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0); + } + }); + + // cleanup. + consumer.close(); + if (partitioned) { + admin.topics().deletePartitionedTopic(topicName); + } else { + admin.topics().delete(topicName); + } + } + + @Test(timeOut = 240 * 1000, dataProvider = "partitioned") + public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscriptionName = "s1"; + final Pattern pattern = Pattern.compile(String.format("%s$", topicName)); + + // Close all ServerCnx by close client-side sockets to make the config changes effect. + pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(false); + reconnectAllConnections(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + // Disable brokerSideSubscriptionPatternEvaluation will leading disable topic list watcher. + // So set patternAutoDiscoveryPeriod to a little value. + .patternAutoDiscoveryPeriod(1) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + // 1. create topic. + if (partitioned) { + admin.topics().createPartitionedTopic(topicName, 1); + } else { + admin.topics().createNonPartitionedTopic(topicName); + } + + // 2. verify consumer can subscribe the topic. + // Since the minimum value of `patternAutoDiscoveryPeriod` is 60s, we set the test timeout to a triple value. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + Awaitility.await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 1); + if (partitioned) { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); + } else { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0); + } + }); + + // cleanup. + consumer.close(); + if (partitioned) { + admin.topics().deletePartitionedTopic(topicName); + } else { + admin.topics().delete(topicName); + } + // Close all ServerCnx by close client-side sockets to make the config changes effect. + pulsar.getConfig().setEnableBrokerSideSubscriptionPatternEvaluation(true); + reconnectAllConnections(); + } + private PulsarClient createDelayWatchTopicsClient() throws Exception { ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); return InjectedClientCnxClientBuilder.create(clientBuilder, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index 250cea217ee5f..4c0a8d500b703 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -47,13 +47,16 @@ public static List filterTopics(List original, String regex) { } public static List filterTopics(List original, Pattern topicsPattern) { - final Pattern shortenedTopicsPattern = topicsPattern.toString().contains(SCHEME_SEPARATOR) - ? Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) : topicsPattern; + final Pattern shortenedTopicsPattern = Pattern.compile(removeTopicDomainScheme(topicsPattern.toString())); return original.stream() .map(TopicName::get) + .filter(topicName -> { + String partitionedTopicName = topicName.getPartitionedTopicName(); + String removedScheme = SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1]; + return shortenedTopicsPattern.matcher(removedScheme).matches(); + }) .map(TopicName::toString) - .filter(topic -> shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches()) .collect(Collectors.toList()); } @@ -78,4 +81,16 @@ public static Set minus(Collection list1, Collection lis s1.removeAll(list2); return s1; } + + private static String removeTopicDomainScheme(String originalRegexp) { + if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) { + return originalRegexp; + } + String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1]; + if (originalRegexp.contains("^")) { + return String.format("^%s", removedTopicDomain); + } else { + return removedTopicDomain; + } + } }