From a7f0bc49fa27c9bc066b2ed52bf4be5f1bee160d Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Tue, 22 Aug 2023 22:08:09 +0800 Subject: [PATCH] [fix][broker] Fix potential case cause retention policy not working on topic level (#21041) --- .../service/persistent/PersistentTopic.java | 22 ++++++------- .../persistent/PersistentTopicTest.java | 31 +++++++++++++++++++ 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4a3e38c5395b9..34bab21155ed8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1352,7 +1352,6 @@ private void disposeTopic(CompletableFuture closeFuture) { CompletableFuture checkReplicationAndRetryOnFailure() { CompletableFuture result = new CompletableFuture(); checkReplication().thenAccept(res -> { - log.info("[{}] Policies updated successfully", topic); result.complete(null); }).exceptionally(th -> { log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(), @@ -1372,7 +1371,8 @@ public CompletableFuture checkDeduplicationStatus() { return messageDeduplication.checkStatus(); } - private CompletableFuture checkPersistencePolicies() { + @VisibleForTesting + CompletableFuture checkPersistencePolicies() { TopicName topicName = TopicName.get(topic); CompletableFuture future = new CompletableFuture<>(); brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> { @@ -3110,16 +3110,14 @@ public void onUpdate(TopicPolicies policies) { updateSubscribeRateLimiter(); replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); checkMessageExpiry(); - checkReplicationAndRetryOnFailure(); - - checkDeduplicationStatus(); - - preCreateSubscriptionForCompactionIfNeeded(); - - // update managed ledger config - checkPersistencePolicies(); - }).exceptionally(e -> { - Throwable t = e instanceof CompletionException ? e.getCause() : e; + }) + .thenCompose(__ -> checkReplicationAndRetryOnFailure()) + .thenCompose(__ -> checkDeduplicationStatus()) + .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded()) + .thenCompose(__ -> checkPersistencePolicies()) + .thenAccept(__ -> log.info("[{}] Policies updated successfully", topic)) + .exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 50a54aaa0b745..ca97c09bd672f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; @@ -75,7 +76,9 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.junit.Assert; @@ -488,4 +491,32 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) return !topic.getManagedLedger().getCursors().iterator().hasNext(); }); } + + @Test + public void testCheckPersistencePolicies() throws Exception { + final String myNamespace = "prop/ns"; + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testConfig" + UUID.randomUUID(); + conf.setForceDeleteNamespaceAllowed(true); + pulsarClient.newProducer().topic(topic).create().close(); + RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1); + PersistentTopic persistentTopic = spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get()); + TopicPoliciesService policiesService = spy(pulsar.getTopicPoliciesService()); + doReturn(policiesService).when(pulsar).getTopicPoliciesService(); + TopicPolicies policies = new TopicPolicies(); + policies.setRetentionPolicies(retentionPolicies); + doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic)); + persistentTopic.onUpdate(policies); + verify(persistentTopic, times(1)).checkPersistencePolicies(); + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); + }); + // throw exception + doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(persistentTopic).checkPersistencePolicies(); + policies.setRetentionPolicies(new RetentionPolicies(2, 2)); + persistentTopic.onUpdate(policies); + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); + assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); + } }