Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix potential case cause retention policy not working o…
Browse files Browse the repository at this point in the history
…n topic level (apache#21041)
  • Loading branch information
Technoboy- authored Aug 22, 2023
1 parent 9e2195c commit d3a6df3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1547,7 +1547,6 @@ private void disposeTopic(CompletableFuture<?> closeFuture) {
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
Expand All @@ -1567,7 +1566,8 @@ public CompletableFuture<Void> checkDeduplicationStatus() {
return messageDeduplication.checkStatus();
}

private CompletableFuture<Void> checkPersistencePolicies() {
@VisibleForTesting
CompletableFuture<Void> checkPersistencePolicies() {
TopicName topicName = TopicName.get(topic);
CompletableFuture<Void> future = new CompletableFuture<>();
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
Expand Down Expand Up @@ -3506,16 +3506,14 @@ public void onUpdate(TopicPolicies policies) {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
checkReplicationAndRetryOnFailure();

checkDeduplicationStatus();

preCreateSubscriptionForCompactionIfNeeded();

// update managed ledger config
checkPersistencePolicies();
}).exceptionally(e -> {
Throwable t = e instanceof CompletionException ? e.getCause() : e;
})
.thenCompose(__ -> checkReplicationAndRetryOnFailure())
.thenCompose(__ -> checkDeduplicationStatus())
.thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> checkPersistencePolicies())
.thenAccept(__ -> log.info("[{}] Policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -77,7 +78,9 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -603,4 +606,32 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
return !topic.getManagedLedger().getCursors().iterator().hasNext();
});
}

@Test
public void testCheckPersistencePolicies() throws Exception {
final String myNamespace = "prop/ns";
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testConfig" + UUID.randomUUID();
conf.setForceDeleteNamespaceAllowed(true);
pulsarClient.newProducer().topic(topic).create().close();
RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
PersistentTopic persistentTopic = spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get());
TopicPoliciesService policiesService = spy(pulsar.getTopicPoliciesService());
doReturn(policiesService).when(pulsar).getTopicPoliciesService();
TopicPolicies policies = new TopicPolicies();
policies.setRetentionPolicies(retentionPolicies);
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
persistentTopic.onUpdate(policies);
verify(persistentTopic, times(1)).checkPersistencePolicies();
Awaitility.await().untilAsserted(() -> {
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
});
// throw exception
doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(persistentTopic).checkPersistencePolicies();
policies.setRetentionPolicies(new RetentionPolicies(2, 2));
persistentTopic.onUpdate(policies);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
}
}

0 comments on commit d3a6df3

Please sign in to comment.