Skip to content

Commit

Permalink
[fix][broker] Check the markDeletePosition and calculate the backlog (a…
Browse files Browse the repository at this point in the history
…pache#22947)

Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 82b8d98)
(cherry picked from commit c0e1bff)
  • Loading branch information
nodece authored and srinath-ctds committed Jul 1, 2024
1 parent 7d13506 commit 8edc5ab
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,13 @@ public long getEstimatedSizeSinceMarkDeletePosition() {
return ledger.estimateBacklogFromPosition(markDeletePosition);
}

private long getNumberOfEntriesInBacklog() {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
}
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}

@Override
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
Expand All @@ -1120,16 +1127,13 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
}
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
return getNumberOfEntriesInBacklog();
}

long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
backlog = getNumberOfEntriesInBacklog();
}

return backlog;
Expand Down

0 comments on commit 8edc5ab

Please sign in to comment.