Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greedily advance safe commit on new global checkpoint #48559

Merged
merged 2 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile long maxSeqNoOfNextSafeCommit;
private volatile IndexCommit lastCommit; // the most recent commit point
private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY;

Expand Down Expand Up @@ -83,6 +84,11 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
this.safeCommitInfo = SafeCommitInfo.EMPTY;
this.lastCommit = commits.get(commits.size() - 1);
this.safeCommit = commits.get(keptPosition);
if (keptPosition == commits.size() - 1) {
this.maxSeqNoOfNextSafeCommit = Long.MAX_VALUE;
} else {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
Expand Down Expand Up @@ -217,16 +223,10 @@ synchronized boolean hasSnapshottedCommits() {
}

/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
* Checks if the deletion policy can delete some index commits with the latest global checkpoint.
*/
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
return false;
boolean hasUnreferencedCommits() {
return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,28 @@ public void testCheckUnreferencedCommits() throws Exception {
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
}
IndexCommit safeCommit = randomFrom(commitList);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
int safeCommitIndex = randomIntBetween(0, commitList.size() - 1);
globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
commitList.forEach(this::resetDeletion);
indexPolicy.onCommit(commitList);
if (safeCommit == commitList.get(commitList.size() - 1)) {

if (safeCommitIndex == commitList.size() - 1) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
// Advanced enough
// Advanced but not enough for any commit after the safe commit becomes safe
IndexCommit nextSafeCommit = commitList.get(safeCommitIndex + 1);
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(),
Long.parseLong(nextSafeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1));
assertFalse(indexPolicy.hasUnreferencedCommits());
// Advanced enough for some index commit becomes safe
globalCheckpoint.set(randomLongBetween(
Long.parseLong(nextSafeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), lastMaxSeqNo));
assertTrue(indexPolicy.hasUnreferencedCommits());
// Advanced enough for the last commit becomes safe
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
commitList.forEach(this::resetDeletion);
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
Expand Down