Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix topic status for oldestBacklogMessageAgeSeconds continuously increases even when there is no backlog. #22907

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
} else if (failIfHasBacklogs) {
if (hasBacklogs()) {
if (hasBacklogs(false)) {
List<String> backlogSubs =
subscriptions.values().stream()
.filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
Expand Down Expand Up @@ -2638,12 +2638,9 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.backlogQuotaLimitTime = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();

TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = timeBasedBacklogQuotaCheckResult;
stats.oldestBacklogMessageAgeSeconds = (backlogQuotaCheckResult == null)
? (long) -1
: TimeUnit.MILLISECONDS.toSeconds(
Clock.systemUTC().millis() - backlogQuotaCheckResult.getPositionPublishTimestampInMillis());

stats.oldestBacklogMessageAgeSeconds = getBestEffortOldestUnacknowledgedMessageAgeSeconds();
stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult == null)
|| !hasBacklogs(getStatsOptions.isGetPreciseBacklog())
? null
: backlogQuotaCheckResult.getCursorName();

Expand Down Expand Up @@ -2906,7 +2903,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) {
}
break;
case delete_when_subscriptions_caught_up:
if (hasBacklogs()) {
if (hasBacklogs(false)) {
return true;
}
break;
Expand All @@ -2919,8 +2916,8 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) {
}
}

private boolean hasBacklogs() {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0);
private boolean hasBacklogs(boolean getPreciseBacklog) {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(getPreciseBacklog) > 0);
}

@Override
Expand Down Expand Up @@ -3466,6 +3463,9 @@ public boolean isSizeBacklogExceeded() {

@Override
public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
if (!hasBacklogs(false)) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
return 0;
}
TimeBasedBacklogQuotaCheckResult result = timeBasedBacklogQuotaCheckResult;
if (result == null) {
return -1;
Expand Down Expand Up @@ -3553,6 +3553,9 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
}

if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
if (!hasBacklogs(true)) {
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
// Check if first unconsumed message(first message after mark delete position)
// for slowest cursor's has expired.
Expand Down Expand Up @@ -3606,6 +3609,9 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
return future;
} else {
try {
if (!hasBacklogs(false)) {
return CompletableFuture.completedFuture(false);
}
EstimateTimeBasedBacklogQuotaCheckResult checkResult =
estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.within;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -296,8 +297,12 @@ public void testBacklogQuotaWithReader() throws Exception {
}

private TopicStats getTopicStats(String topic1) throws PulsarAdminException {
return getTopicStats(topic1, true);
}

private TopicStats getTopicStats(String topic1, boolean getPreciseBacklog) throws PulsarAdminException {
TopicStats stats =
admin.topics().getStats(topic1, GetStatsOptions.builder().getPreciseBacklog(true).build());
admin.topics().getStats(topic1, GetStatsOptions.builder().getPreciseBacklog(getPreciseBacklog).build());
return stats;
}

Expand Down Expand Up @@ -502,9 +507,117 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce

// Cache should be used, since position hasn't changed
assertThat(getReadEntries(topic1)).isEqualTo(readEntries);

// Move subscription 1 and 2 to end
Message<byte[]> msg = consumer1.receive();
consumer1.acknowledge(msg);
consumer2.acknowledge(secondOldestMessage);
for (int i = 0; i < 2; i++) {
Message<byte[]> message = consumer2.receive();
log.info("Subscription 2 about to ack message ID {}", message.getMessageId());
consumer2.acknowledge(message);
}

log.info("Subscription 1 and 2 moved to end. Now should not backlog");
waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
waitForQuotaCheckToRunTwice();

topicStats = getTopicStats(topic1);
assertThat(topicStats.getBacklogSize()).isEqualTo(0);
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0);
assertThat(topicStats.getSubscriptions().get(subName2).getMsgBacklog()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();

metrics = prometheusMetricsClient.getMetrics();
backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
assertThat(backlogAgeMetric.tags).containsExactly(
entry("cluster", CLUSTER_NAME),
entry("namespace", namespace),
entry("topic", topic1));
assertThat((long) backlogAgeMetric.value).isEqualTo(0);

// producer should create success.
Producer<byte[]> producer2 = createProducer(client, topic1);
assertNotNull(producer2);
}
}

@Test
public void backlogsStatsPreciseWithNoBacklog() throws PulsarAdminException, PulsarClientException, InterruptedException {
config.setPreciseTimeBasedBacklogQuotaCheck(true);
config.setExposePreciseBacklogInPrometheus(true);
final String namespace = "prop/ns-quota";
assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
final int timeLimitSeconds = 2;
admin.namespaces().setBacklogQuota(
namespace,
BacklogQuota.builder()
.limitTime(timeLimitSeconds)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build(),
message_age);

try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.maxBackoffInterval(5, SECONDS)
.statsInterval(0, SECONDS).build()) {
final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();

final String subName1 = "c1";
final int numMsgs = 4;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
.acknowledgmentGroupTime(0, SECONDS)
.subscribe();
Producer<byte[]> producer = createProducer(client, topic1);

byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
MessageId send = producer.send(content);
System.out.println(i + ":msg:" + MILLISECONDS.toSeconds(System.currentTimeMillis()));
}

String c1MarkDeletePositionBefore =
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;

// Move subscription 1 to end
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> message1 = consumer1.receive();
consumer1.acknowledge(message1);
}

// This code will wait about 4~5 Seconds, to make sure the oldest message is 4~5 seconds old
c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
waitForQuotaCheckToRunTwice();

Metrics metrics = prometheusMetricsClient.getMetrics();
TopicStats topicStats = getTopicStats(topic1);

assertThat(topicStats.getBacklogQuotaLimitTime()).isEqualTo(timeLimitSeconds);
assertThat(topicStats.getBacklogSize()).isEqualTo(0);
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();

Metric backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
assertThat(backlogAgeMetric.tags).containsExactly(
entry("cluster", CLUSTER_NAME),
entry("namespace", namespace),
entry("topic", topic1));
assertThat((long) backlogAgeMetric.value).isEqualTo(0);

// producer should create success.
Producer<byte[]> producer2 = createProducer(client, topic1);
assertNotNull(producer2);
}
config.setPreciseTimeBasedBacklogQuotaCheck(false);
config.setExposePreciseBacklogInPrometheus(false);
}

private long getReadEntries(String topic1) {
return ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get())
.getManagedLedger().getStats().getEntriesReadTotalCount();
Expand Down Expand Up @@ -609,6 +722,71 @@ public void backlogsStatsNotPrecise() throws PulsarAdminException, PulsarClientE
}
}

@Test
public void backlogsStatsNotPreciseWithNoBacklog() throws PulsarAdminException, PulsarClientException, InterruptedException {
config.setPreciseTimeBasedBacklogQuotaCheck(false);
config.setExposePreciseBacklogInPrometheus(false);
config.setManagedLedgerMaxEntriesPerLedger(6);
final String namespace = "prop/ns-quota";
assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
final int timeLimitSeconds = 2;
admin.namespaces().setBacklogQuota(
namespace,
BacklogQuota.builder()
.limitTime(timeLimitSeconds)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build(),
message_age);

try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.maxBackoffInterval(3, SECONDS)
.statsInterval(0, SECONDS).build()) {
final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();

final String subName1 = "brandNewC1";
final int numMsgs = 5;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
.acknowledgmentGroupTime(0, SECONDS)
.isAckReceiptEnabled(true)
.subscribe();
Producer<byte[]> producer = createProducer(client, topic1);

byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
}

String c1MarkDeletePositionBefore =
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;

log.info("Moved subscription 1 to end");
for (int i = 0; i < numMsgs; i++) {
consumer1.acknowledge(consumer1.receive());
}

c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
waitForQuotaCheckToRunTwice();

// backlog and backlogAceSeconds should be 0
TopicStats topicStats = getTopicStats(topic1, false);
Metrics metrics = prometheusMetricsClient.getMetrics();
assertEquals(topicStats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0);
Metric backlogAgeMetric =
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
Pair.of("topic", topic1));
assertThat(backlogAgeMetric.value).isEqualTo(0);

// producer should create success.
Producer<byte[]> producer2 = createProducer(client, topic1);
assertNotNull(producer2);

config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
}
}

private void unloadAndLoadTopic(String topic, Producer producer) throws PulsarAdminException,
PulsarClientException {
admin.topics().unload(topic);
Expand Down
Loading