Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix can not subscribe partitioned topic with a suffix-matched regexp #22025

Merged
merged 3 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public TopicResources(MetadataStore store) {
store.registerListener(this::handleNotification);
}

/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> listPersistentTopicsAsync(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,9 @@ private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicNam
});
}

/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,18 @@ public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError err
writeAndFlush(outBuf);
}

/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
*/
@Override
public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) {
BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics);
interceptAndWriteCommand(command);
}

/***
* {@inheritDoc}
*/
@Override
public void sendWatchTopicListUpdate(long watcherId,
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
Expand All @@ -42,11 +43,16 @@ public class TopicListService {

public static class TopicListWatcher implements BiConsumer<String, NotificationType> {

/** Topic names which are matching, the topic name contains the partition suffix. **/
private final List<String> matchingTopics;
private final TopicListService topicListService;
private final long id;
/** The regexp for the topic name(not contains partition suffix). **/
private final Pattern topicsPattern;

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(TopicListService topicListService, long id,
Pattern topicsPattern, List<String> topics) {
this.topicListService = topicListService;
Expand All @@ -59,9 +65,12 @@ public List<String> getMatchingTopics() {
return matchingTopics;
}

/***
* @param topicName topic name which contains partition suffix.
*/
@Override
public void accept(String topicName, NotificationType notificationType) {
if (topicsPattern.matcher(topicName).matches()) {
if (topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches()) {
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
List<String> newTopics;
List<String> deletedTopics;
if (notificationType == NotificationType.Deleted) {
Expand Down Expand Up @@ -109,6 +118,9 @@ public void inactivate() {
}
}

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, Pattern topicsPattern,
String topicsHash, Semaphore lookupSemaphore) {

Expand Down Expand Up @@ -184,7 +196,9 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
});
}


/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, Pattern topicsPattern) {
namespaceService.getListOfPersistentTopics(namespace).
Expand Down Expand Up @@ -246,6 +260,10 @@ public void deleteTopicListWatcher(Long watcherId) {
log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId);
}

/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public void sendTopicListUpdate(long watcherId, String topicsHash, List<String> deletedTopics,
List<String> newTopics) {
connection.getCommandSender().sendWatchTopicListUpdate(watcherId, newTopics, deletedTopics, topicsHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,27 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchi
}
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
@DataProvider(name= "regexpConsumerArgs")
public Object[][] regexpConsumerArgs(){
return new Object[][]{
{true},
{false}
{true, true},
{true, false},
{false, true},
{false, false}
};
}

@Test(timeOut = testTimeout, dataProvider = "partitioned")
public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
private void waitForTopicListWatcherStarted(Consumer consumer) {
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
log.info("isDone: {}, isCompletedExceptionally: {}", completableFuture.isDone(),
completableFuture.isCompletedExceptionally());
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});
}

@Test(timeOut = testTimeout, dataProvider = "regexpConsumerArgs")
public void testPreciseRegexpSubscribe(boolean partitioned, boolean createTopicAfterWatcherStarted) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscriptionName = "s1";
final Pattern pattern = Pattern.compile(String.format("%s$", topicName));
Expand All @@ -704,6 +715,9 @@ public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
if (createTopicAfterWatcherStarted) {
waitForTopicListWatcherStarted(consumer);
}

// 1. create topic.
if (partitioned) {
Expand Down Expand Up @@ -733,6 +747,14 @@ public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
}
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
return new Object[][]{
{true},
{true}
};
}

@Test(timeOut = 240 * 1000, dataProvider = "partitioned")
public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> topics(List<String> topicNames);

/**
* Specify a pattern for topics that this consumer subscribes to.
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
*
* <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the
* pattern.
*
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* a regular expression to select a list of topics to subscribe to
* a regular expression to select a list of topics(not contains the partition suffix) to subscribe to
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);

/**
* Specify a pattern for topics that this consumer subscribes to.
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
*
* <p>It accepts a regular expression that is compiled into a pattern internally. E.g.,
* "persistent://public/default/pattern-topic-.*"
Expand All @@ -151,7 +151,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* given regular expression for topics pattern
* given regular expression for topics(not contains the partition suffix) pattern
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(String topicsPattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,10 @@ private void removeTopic(String topic) {
}
}

// subscribe one more given topic
/***
* Subscribe one more given topic.
* @param topicName topic name without the partition suffix.
*/
public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
Expand Down Expand Up @@ -1251,7 +1254,10 @@ public CompletableFuture<Void> unsubscribeAsync(String topicName) {
return unsubscribeFuture;
}

// Remove a consumer for a topic
/***
* Remove a consumer for a topic.
* @param topicName topic name contains the partition suffix.
*/
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private volatile Timeout recheckPatternTimeout = null;
private volatile String topicsHash;

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
String topicsHash,
PulsarClientImpl client,
Expand Down Expand Up @@ -220,14 +223,26 @@ void setTopicsHash(String topicsHash) {
}

interface TopicsChangedListener {
// unsubscribe and delete ConsumerImpl in the `consumers` map in `MultiTopicsConsumerImpl` based on added topics
/***
* unsubscribe and delete {@link ConsumerImpl} in the {@link MultiTopicsConsumerImpl#consumers} map in
* {@link MultiTopicsConsumerImpl}.
* @param removedTopics topic names removed(contains the partition suffix).
*/
CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics);
// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in
// `MultiTopicsConsumerImpl`.

/***
* subscribe and create a list of new {@link ConsumerImpl}, added them to the
* {@link MultiTopicsConsumerImpl#consumers} map in {@link MultiTopicsConsumerImpl}.
* @param addedTopics topic names added(contains the partition suffix).
*/
CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
}

private class PatternTopicsChangedListener implements TopicsChangedListener {

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
CompletableFuture<Void> removeFuture = new CompletableFuture<>();
Expand All @@ -249,6 +264,9 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
return removeFuture;
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
CompletableFuture<Void> addFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
private final Runnable recheckTopicsChangeAfterReconnect;


/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener,
PulsarClientImpl client, Pattern topicsPattern, long watcherId,
NamespaceName namespace, String topicsHash,
Expand Down Expand Up @@ -142,7 +145,6 @@ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
return;
}
}

this.connectionHandler.resetBackoff();

recheckTopicsChangeAfterReconnect.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

@ApiModelProperty(
name = "topicsPattern",
value = "Topic pattern"
value = "The regexp for the topic name(not contains partition suffix)."
)
private Pattern topicsPattern;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,9 @@ public static BaseCommand newWatchTopicList(
return cmd;
}

/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
*/
public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherId, String topicsHash,
List<String> topics) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS);
Expand All @@ -1600,6 +1603,10 @@ public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherI
return cmd;
}

/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public static BaseCommand newWatchTopicUpdate(long watcherId,
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE);
Expand Down
Loading