diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index f5dc4b778aa09..74f7c4137a2d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -48,7 +48,7 @@ /** * Cached topic policies service will cache the system topic reader and the topic policies - * + *

* While reader cache for the namespace was removed, the topic policies will remove automatically. */ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService { @@ -118,7 +118,7 @@ private CompletableFuture sendTopicPolicyEvent(TopicName topicName, Action } } }); - }) + }) ); } }); @@ -141,6 +141,18 @@ private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, T } private void notifyListener(Message msg) { + // delete policies + if (msg.getValue() == null) { + TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()); + policiesCache.remove(topicName); + if (listeners.get(topicName) != null) { + for (TopicPolicyListener listener : listeners.get(topicName)) { + listener.onUpdate(null); + } + } + return; + } + if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { return; } @@ -252,12 +264,11 @@ public void unLoad(NamespaceBundle bundle) { removeOwnedNamespaceBundleAsync(bundle); } - @Override - public boolean test(NamespaceBundle namespaceBundle) { - return true; - } - - }); + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return true; + } + }); } private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture future) { @@ -393,7 +404,8 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + }); + + admin.topics().setMaxConsumersPerSubscription(topic, 1); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); + }); + + admin.topics().delete(topic); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + }); + } + } + + @Test + public void testLoopCreateAndDeleteTopicPolicies() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + + int n = 0; + while (n < 2) { + n++; + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + }); + + admin.topics().setMaxConsumersPerSubscription(topic, 1); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); + }); + + admin.topics().delete(topic); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + }); + } + } }