Skip to content

Commit

Permalink
[fix][test] Fix test
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Sep 9, 2024
1 parent 68f470d commit 14e29a0
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testEvents(String topicTypePersistence, String topicTypePartitioned,
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

events.clear();
if (topicTypePartitioned.equals("partitioned")) {
Expand All @@ -145,7 +145,7 @@ public void testEventsWithUnload(String topicTypePersistence, String topicTypePa
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

events.clear();
admin.topics().unload(topicName);
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Expand Down Expand Up @@ -233,7 +233,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

admin.namespaces().setInactiveTopicPolicies(namespace,
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
Expand All @@ -257,33 +257,37 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti
);
}

private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception {
private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName)
throws Exception {
final String[] expectedEvents;
if (topicTypePartitioned.equals("partitioned")) {
topicNameToWatch = topicName + "-partition-1";
admin.topics().createPartitionedTopic(topicName, 2);
triggerPartitionsCreation(topicName);

if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};

} else {
topicNameToWatch = topicName;
admin.topics().createNonPartitionedTopic(topicName);

expectedEvents = new String[]{
// Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic
// was already exists, and the action "check topic exists" will try to load Managed ledger,
// the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE].
// #21995 fixed this wrong behavior, so remove these two events.
"LOAD__BEFORE",
"LOAD__FAILURE",
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};

}
if (topicTypePartitioned.equals("partitioned")) {
topicNameToWatch = topicName + "-partition-1";
admin.topics().createPartitionedTopic(topicName, 2);
triggerPartitionsCreation(topicName);
} else {
topicNameToWatch = topicName;
admin.topics().createNonPartitionedTopic(topicName);
}

Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2643,6 +2643,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
URL pulsarUrl = new URL(pulsar.getWebServiceAddress());

admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
.subscriptionType(SubscriptionType.Shared).subscribe();
Expand All @@ -2652,18 +2653,16 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);

// create a subscription for few new partition which can fail
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);

try {
admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false);
} catch (PulsarAdminException.PreconditionFailedException e) {
// Ok
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
} catch (PulsarAdminException.ConflictException ex) {
// OK
}
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);

admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true);
// validate subscription is created for new partition.
assertNotNull(admin.topics().getStats(partitionedTopicName + "-partition-" + 6).getSubscriptions().get(subName1));
for (int i = startPartitions; i < newPartitions; i++) {
assertNotNull(
admin.topics().getStats(partitionedTopicName + "-partition-" + i).getSubscriptions().get(subName1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,26 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create();) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}


// verify that the topic does not exist
pulsar.getPulsarResources().getNamespaceResources()
.setPolicies(NamespaceName.get(namespaceName), old -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Policy.Expiration;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerBrokerUsageStats;
Expand All @@ -35,12 +33,9 @@
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testPersistentPartitionedTopicUnload() throws Exception {

assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
pulsar.getBrokerService().getTopicIfExists(topicName).get();
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName));
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));

// ref of partitioned-topic name should be empty
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
Expand Down

0 comments on commit 14e29a0

Please sign in to comment.