diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 34bab21155ed8..4a3e38c5395b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1352,6 +1352,7 @@ private void disposeTopic(CompletableFuture closeFuture) { CompletableFuture checkReplicationAndRetryOnFailure() { CompletableFuture result = new CompletableFuture(); checkReplication().thenAccept(res -> { + log.info("[{}] Policies updated successfully", topic); result.complete(null); }).exceptionally(th -> { log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(), @@ -1371,8 +1372,7 @@ public CompletableFuture checkDeduplicationStatus() { return messageDeduplication.checkStatus(); } - @VisibleForTesting - CompletableFuture checkPersistencePolicies() { + private CompletableFuture checkPersistencePolicies() { TopicName topicName = TopicName.get(topic); CompletableFuture future = new CompletableFuture<>(); brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> { @@ -3110,14 +3110,16 @@ public void onUpdate(TopicPolicies policies) { updateSubscribeRateLimiter(); replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); checkMessageExpiry(); - }) - .thenCompose(__ -> checkReplicationAndRetryOnFailure()) - .thenCompose(__ -> checkDeduplicationStatus()) - .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded()) - .thenCompose(__ -> checkPersistencePolicies()) - .thenAccept(__ -> log.info("[{}] Policies updated successfully", topic)) - .exceptionally(e -> { - Throwable t = FutureUtil.unwrapCompletionException(e); + checkReplicationAndRetryOnFailure(); + + checkDeduplicationStatus(); + + preCreateSubscriptionForCompactionIfNeeded(); + + // update managed ledger config + checkPersistencePolicies(); + }).exceptionally(e -> { + Throwable t = e instanceof CompletionException ? e.getCause() : e; log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index ca97c09bd672f..50a54aaa0b745 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -59,7 +59,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; @@ -76,9 +75,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.junit.Assert; @@ -491,32 +488,4 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) return !topic.getManagedLedger().getCursors().iterator().hasNext(); }); } - - @Test - public void testCheckPersistencePolicies() throws Exception { - final String myNamespace = "prop/ns"; - admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); - final String topic = "persistent://" + myNamespace + "/testConfig" + UUID.randomUUID(); - conf.setForceDeleteNamespaceAllowed(true); - pulsarClient.newProducer().topic(topic).create().close(); - RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1); - PersistentTopic persistentTopic = spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get()); - TopicPoliciesService policiesService = spy(pulsar.getTopicPoliciesService()); - doReturn(policiesService).when(pulsar).getTopicPoliciesService(); - TopicPolicies policies = new TopicPolicies(); - policies.setRetentionPolicies(retentionPolicies); - doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic)); - persistentTopic.onUpdate(policies); - verify(persistentTopic, times(1)).checkPersistencePolicies(); - Awaitility.await().untilAsserted(() -> { - assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); - assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); - }); - // throw exception - doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(persistentTopic).checkPersistencePolicies(); - policies.setRetentionPolicies(new RetentionPolicies(2, 2)); - persistentTopic.onUpdate(policies); - assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); - assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); - } }