Skip to content

Commit

Permalink
Revert "[fix][broker] Fix potential case cause retention policy not w…
Browse files Browse the repository at this point in the history
…orking on topic level (#21041)"

This reverts commit a7f0bc4.
  • Loading branch information
Technoboy- committed Sep 12, 2023
1 parent 7022606 commit 0c7d8c8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,7 @@ private void disposeTopic(CompletableFuture<?> closeFuture) {
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
Expand All @@ -1371,8 +1372,7 @@ public CompletableFuture<Void> checkDeduplicationStatus() {
return messageDeduplication.checkStatus();
}

@VisibleForTesting
CompletableFuture<Void> checkPersistencePolicies() {
private CompletableFuture<Void> checkPersistencePolicies() {
TopicName topicName = TopicName.get(topic);
CompletableFuture<Void> future = new CompletableFuture<>();
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
Expand Down Expand Up @@ -3110,14 +3110,16 @@ public void onUpdate(TopicPolicies policies) {
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
})
.thenCompose(__ -> checkReplicationAndRetryOnFailure())
.thenCompose(__ -> checkDeduplicationStatus())
.thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> checkPersistencePolicies())
.thenAccept(__ -> log.info("[{}] Policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
checkReplicationAndRetryOnFailure();

checkDeduplicationStatus();

preCreateSubscriptionForCompactionIfNeeded();

// update managed ledger config
checkPersistencePolicies();
}).exceptionally(e -> {
Throwable t = e instanceof CompletionException ? e.getCause() : e;
log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -76,9 +75,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.junit.Assert;
Expand Down Expand Up @@ -491,32 +488,4 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy)
return !topic.getManagedLedger().getCursors().iterator().hasNext();
});
}

@Test
public void testCheckPersistencePolicies() throws Exception {
final String myNamespace = "prop/ns";
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testConfig" + UUID.randomUUID();
conf.setForceDeleteNamespaceAllowed(true);
pulsarClient.newProducer().topic(topic).create().close();
RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
PersistentTopic persistentTopic = spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get());
TopicPoliciesService policiesService = spy(pulsar.getTopicPoliciesService());
doReturn(policiesService).when(pulsar).getTopicPoliciesService();
TopicPolicies policies = new TopicPolicies();
policies.setRetentionPolicies(retentionPolicies);
doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic));
persistentTopic.onUpdate(policies);
verify(persistentTopic, times(1)).checkPersistencePolicies();
Awaitility.await().untilAsserted(() -> {
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
});
// throw exception
doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(persistentTopic).checkPersistencePolicies();
policies.setRetentionPolicies(new RetentionPolicies(2, 2));
persistentTopic.onUpdate(policies);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
}
}

0 comments on commit 0c7d8c8

Please sign in to comment.