Skip to content

Commit

Permalink
Fix the null point caused by deleting the system topic policy
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Oct 18, 2021
1 parent 69fb802 commit 0c72bcb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

/**
* Cached topic policies service will cache the system topic reader and the topic policies
*
* <p>
* While reader cache for the namespace was removed, the topic policies will remove automatically.
*/
public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService {
Expand Down Expand Up @@ -118,7 +118,7 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
}
}
});
})
})
);
}
});
Expand All @@ -141,6 +141,18 @@ private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, T
}

private void notifyListener(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
policiesCache.remove(topicName);
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(null);
}
}
return;
}

if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
return;
}
Expand Down Expand Up @@ -252,12 +264,11 @@ public void unLoad(NamespaceBundle bundle) {
removeOwnedNamespaceBundleAsync(bundle);
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return true;
}

});
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return true;
}
});
}

private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
Expand Down Expand Up @@ -393,7 +404,8 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<Puls
if (e != null) {
future.completeExceptionally(e);
}
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
if (msg != null && msg.getValue() != null
&& EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent topicPoliciesEvent = msg.getValue().getTopicPoliciesEvent();
if (topicName.equals(TopicName.get(
topicPoliciesEvent.getDomain(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2643,4 +2643,51 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
});
}

@Test
public void testMultipleCreateAndDeleteTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();

int n = 0;
while (n < 2) {
n++;
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
});

admin.topics().setMaxConsumersPerSubscription(topic, 1);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
});

admin.topics().delete(topic);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
});
}
}

@Test
public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();

int n = 0;
while (n < 2) {
n++;
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
});

admin.topics().setMaxConsumersPerSubscription(topic, 1);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
});

admin.topics().delete(topic);
Awaitility.await().untilAsserted(() -> {
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
});
}
}
}

0 comments on commit 0c72bcb

Please sign in to comment.