From 76cbf298582e35ca92d1468b0685281ceab4b5a0 Mon Sep 17 00:00:00 2001 From: atomchchen Date: Tue, 30 Jan 2024 16:17:06 +0800 Subject: [PATCH 1/2] Support setting `autoSkipNonRecoverableData` dynamically in expiryMonitor. --- .../persistent/PersistentMessageExpiryMonitor.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 978cd3f886f16..596e12d6eadf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -48,7 +48,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; - private final boolean autoSkipNonRecoverableData; private final PersistentSubscription subscription; private static final int FALSE = 0; @@ -68,8 +67,11 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription this.subscription = subscription; this.msgExpired = new Rate(); this.totalMsgExpired = new LongAdder(); + } + + private boolean isAutoSkipNonRecoverableData() { // check to avoid test failures - this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null + return this.cursor.getManagedLedger() != null && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); } @@ -196,7 +198,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional if (log.isDebugEnabled()) { log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception); } - if (autoSkipNonRecoverableData && failedReadPosition.isPresent() + if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent() && (exception instanceof NonRecoverableLedgerException)) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition, exception.getMessage()); From d15edd8dcb48aa5269b1f8a658c8398a06a801a2 Mon Sep 17 00:00:00 2001 From: atomchchen Date: Sun, 4 Feb 2024 16:58:36 +0800 Subject: [PATCH 2/2] add test for dynamic configuration autoSkipNonRecoverableData --- .../PersistentMessageExpiryMonitor.java | 4 ++- .../persistent/PersistentTopicTest.java | 26 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 596e12d6eadf1..5d3596d0d05eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import com.google.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -69,7 +70,8 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription this.totalMsgExpired = new LongAdder(); } - private boolean isAutoSkipNonRecoverableData() { + @VisibleForTesting + public boolean isAutoSkipNonRecoverableData() { // check to avoid test failures return this.cursor.getManagedLedger() != null && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 4eb2aa15fa292..06a46f86c034e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -635,4 +635,30 @@ public void testCheckPersistencePolicies() throws Exception { assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); } + + @Test + public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exception { + pulsar.getConfiguration().setAutoSkipNonRecoverableData(false); + final String topicName = "persistent://prop/ns-abc/testAutoSkipNonRecoverableData"; + final String subName = "test_sub"; + + Consumer subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + PersistentSubscription subscription = persistentTopic.getSubscription(subName); + + assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData()); + assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData()); + + String key = "autoSkipNonRecoverableData"; + admin.brokers().updateDynamicConfiguration(key, "true"); + Awaitility.await() + .untilAsserted(() -> assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true")); + + assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData()); + assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData()); + + subscribe.close(); + admin.topics().delete(topicName); + } }