Skip to content

Commit

Permalink
[fix] [broker] Fix can not subscribe partitioned topic with a suffix-…
Browse files Browse the repository at this point in the history
…matched regexp (#22025)

(cherry picked from commit b375e86)
  • Loading branch information
poorbarcode committed Feb 28, 2024
1 parent 6af01e5 commit 89e061c
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public TopicResources(MetadataStore store) {
store.registerListener(this::handleNotification);
}

/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> listPersistentTopicsAsync(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,9 @@ private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicNam
});
}

/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,18 @@ public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError err
}
}

/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
*/
@Override
public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) {
BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics);
interceptAndWriteCommand(command);
}

/***
* {@inheritDoc}
*/
@Override
public void sendWatchTopicListUpdate(long watcherId,
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
Expand All @@ -42,11 +43,16 @@ public class TopicListService {

public static class TopicListWatcher implements BiConsumer<String, NotificationType> {

/** Topic names which are matching, the topic name contains the partition suffix. **/
private final List<String> matchingTopics;
private final TopicListService topicListService;
private final long id;
/** The regexp for the topic name(not contains partition suffix). **/
private final Pattern topicsPattern;

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(TopicListService topicListService, long id,
Pattern topicsPattern, List<String> topics) {
this.topicListService = topicListService;
Expand All @@ -59,9 +65,12 @@ public List<String> getMatchingTopics() {
return matchingTopics;
}

/***
* @param topicName topic name which contains partition suffix.
*/
@Override
public void accept(String topicName, NotificationType notificationType) {
if (topicsPattern.matcher(topicName).matches()) {
if (topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches()) {
List<String> newTopics;
List<String> deletedTopics;
if (notificationType == NotificationType.Deleted) {
Expand Down Expand Up @@ -109,6 +118,9 @@ public void inactivate() {
}
}

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, Pattern topicsPattern,
String topicsHash, Semaphore lookupSemaphore) {

Expand Down Expand Up @@ -184,7 +196,9 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
});
}


/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, Pattern topicsPattern) {
namespaceService.getListOfPersistentTopics(namespace).
Expand Down Expand Up @@ -242,6 +256,10 @@ public void deleteTopicListWatcher(Long watcherId) {
log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId);
}

/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public void sendTopicListUpdate(long watcherId, String topicsHash, List<String> deletedTopics,
List<String> newTopics) {
connection.getCommandSender().sendWatchTopicListUpdate(watcherId, newTopics, deletedTopics, topicsHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -68,6 +69,7 @@ public void setup() throws Exception {
isTcpLookup = true;
// enabled transaction, to test pattern consumers not subscribe to transaction system topic.
conf.setTransactionCoordinatorEnabled(true);
conf.setSubscriptionPatternMaxLength(10000);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -574,8 +576,9 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception
Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
// Disable automatic discovery.
.patternAutoDiscoveryPeriod(1000)
// Enable automatic discovery.
// See https://github.com/apache/pulsar/pull/20779 and https://github.com/apache/pulsar/pull/21853.
.patternAutoDiscoveryPeriod(0)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
Expand All @@ -597,32 +600,49 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
});

// cleanup.
consumer.close();
admin.topics().deletePartitionedTopic(topicName, false);
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
@DataProvider(name= "regexpConsumerArgs")
public Object[][] regexpConsumerArgs(){
return new Object[][]{
{true},
{false}
//{true, true},
{true, false},
//{false, true},
//{false, false}
};
}

@Test(timeOut = testTimeout, dataProvider = "partitioned")
public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
private void waitForTopicListWatcherStarted(Consumer consumer) {
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
log.info("isDone: {}, isCompletedExceptionally: {}", completableFuture.isDone(),
completableFuture.isCompletedExceptionally());
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});
}

@Test(timeOut = testTimeout, dataProvider = "regexpConsumerArgs")
public void testPreciseRegexpSubscribe(boolean partitioned, boolean createTopicAfterWatcherStarted) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscriptionName = "s1";
final Pattern pattern = Pattern.compile(String.format("%s$", topicName));

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
// Disable automatic discovery.
.patternAutoDiscoveryPeriod(1000)
// Enable automatic discovery.
// See https://github.com/apache/pulsar/pull/20779 and https://github.com/apache/pulsar/pull/21853.
.patternAutoDiscoveryPeriod(0)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
if (createTopicAfterWatcherStarted) {
waitForTopicListWatcherStarted(consumer);
}

// 1. create topic.
if (partitioned) {
Expand Down Expand Up @@ -652,6 +672,14 @@ public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
}
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
return new Object[][]{
{true},
{true}
};
}

@Test(timeOut = 240 * 1000, dataProvider = "partitioned")
public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> topics(List<String> topicNames);

/**
* Specify a pattern for topics that this consumer subscribes on.
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
*
* <p>The pattern is applied to subscribe to all the topics, within a single namespace, that matches the
* pattern.
*
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* a regular expression to select a list of topics to subscribe to
* a regular expression to select a list of topics(not contains the partition suffix) to subscribe to
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);

/**
* Specify a pattern for topics that this consumer subscribes on.
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
*
* <p>It accepts regular expression that is compiled into a pattern internally. Eg.
* "persistent://public/default/pattern-topic-.*"
Expand All @@ -151,7 +151,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* given regular expression for topics pattern
* given regular expression for topics(not contains the partition suffix) pattern
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(String topicsPattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,10 @@ private void removeTopic(String topic) {
}
}

// subscribe one more given topic
/***
* Subscribe one more given topic.
* @param topicName topic name without the partition suffix.
*/
public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
Expand Down Expand Up @@ -1249,7 +1252,10 @@ public CompletableFuture<Void> unsubscribeAsync(String topicName) {
return unsubscribeFuture;
}

// Remove a consumer for a topic
/***
* Remove a consumer for a topic.
* @param topicName topic name contains the partition suffix.
*/
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private volatile Timeout recheckPatternTimeout = null;
private volatile String topicsHash;

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
String topicsHash,
PulsarClientImpl client,
Expand Down Expand Up @@ -167,14 +170,26 @@ void setTopicsHash(String topicsHash) {
}

interface TopicsChangedListener {
// unsubscribe and delete ConsumerImpl in the `consumers` map in `MultiTopicsConsumerImpl` based on added topics
/***
* unsubscribe and delete {@link ConsumerImpl} in the {@link MultiTopicsConsumerImpl#consumers} map in
* {@link MultiTopicsConsumerImpl}.
* @param removedTopics topic names removed(contains the partition suffix).
*/
CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics);
// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in
// `MultiTopicsConsumerImpl`.

/***
* subscribe and create a list of new {@link ConsumerImpl}, added them to the
* {@link MultiTopicsConsumerImpl#consumers} map in {@link MultiTopicsConsumerImpl}.
* @param addedTopics topic names added(contains the partition suffix).
*/
CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
}

private class PatternTopicsChangedListener implements TopicsChangedListener {

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
CompletableFuture<Void> removeFuture = new CompletableFuture<>();
Expand All @@ -196,6 +211,9 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
return removeFuture;
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
CompletableFuture<Void> addFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
private final AtomicReference<ClientCnx> clientCnxUsedForWatcherRegistration = new AtomicReference<>();


/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener,
PulsarClientImpl client, Pattern topicsPattern, long watcherId,
NamespaceName namespace, String topicsHash,
Expand Down Expand Up @@ -138,7 +141,6 @@ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
return;
}
}

this.connectionHandler.resetBackoff();

watcherFuture.complete(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,9 @@ public static BaseCommand newWatchTopicList(
return cmd;
}

/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
*/
public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherId, String topicsHash,
List<String> topics) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS);
Expand All @@ -1532,6 +1535,10 @@ public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherI
return cmd;
}

/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public static BaseCommand newWatchTopicUpdate(long watcherId,
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE);
Expand Down

0 comments on commit 89e061c

Please sign in to comment.