Skip to content

Commit

Permalink
[fix][broker]Support setting autoSkipNonRecoverableData dynamically…
Browse files Browse the repository at this point in the history
… in expiryMon… (apache#21991)

Co-authored-by: atomchchen <[email protected]>
(cherry picked from commit 220a3d6)
(cherry picked from commit 09cb541)
  • Loading branch information
chenhongSZ authored and mukesh-ctds committed Mar 6, 2024
1 parent 4efd40a commit c9c4f65
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
Expand Down Expand Up @@ -51,7 +52,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
private final boolean autoSkipNonRecoverableData;
private final PersistentSubscription subscription;

private static final int FALSE = 0;
Expand All @@ -71,8 +71,12 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription
this.subscription = subscription;
this.msgExpired = new Rate();
this.totalMsgExpired = new LongAdder();
}

@VisibleForTesting
public boolean isAutoSkipNonRecoverableData() {
// check to avoid test failures
this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
return this.cursor.getManagedLedger() != null
&& this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
}

Expand Down Expand Up @@ -230,7 +234,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception);
}
if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent()
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
exception.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,4 +635,30 @@ public void testCheckPersistencePolicies() throws Exception {
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1));
}

@Test
public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exception {
pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
final String topicName = "persistent://prop/ns-abc/testAutoSkipNonRecoverableData";
final String subName = "test_sub";

Consumer<byte[]> subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
PersistentSubscription subscription = persistentTopic.getSubscription(subName);

assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());

String key = "autoSkipNonRecoverableData";
admin.brokers().updateDynamicConfiguration(key, "true");
Awaitility.await()
.untilAsserted(() -> assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true"));

assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());

subscribe.close();
admin.topics().delete(topicName);
}
}

0 comments on commit c9c4f65

Please sign in to comment.