Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] fix flaky test PatternTopicsConsumerImplTest #21222

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -67,6 +68,7 @@ public void setup() throws Exception {
isTcpLookup = true;
// enabled transaction, to test pattern consumers not subscribe to transaction system topic.
conf.setTransactionCoordinatorEnabled(true);
conf.setSubscriptionPatternMaxLength(10000);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -210,6 +212,12 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -287,6 +295,12 @@ public void testBinaryProtoSubscribeAllTopicOfNamespace() throws Exception {
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -364,6 +378,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio
.subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -455,6 +475,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -525,6 +551,11 @@ public void testStartEmptyPatternConsumer() throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 3. verify consumer get methods, to get 5 number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
Expand Down Expand Up @@ -605,6 +636,12 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 1. create partition
String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
Expand Down Expand Up @@ -665,6 +702,12 @@ public void testAutoSubscribePatternConsumer() throws Exception {
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);

// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
Expand Down Expand Up @@ -775,6 +818,12 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);

// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
Expand Down Expand Up @@ -861,6 +910,12 @@ public void testTopicDeletion() throws Exception {
.subscriptionName("sub")
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

Expand Down
Loading