diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 2a0d89cb1e2a2..f1940c70447f8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -50,6 +50,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final LongSupplier globalCheckpointSupplier; private final IndexCommit startingCommit; private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. + private volatile int pendingSnapshots; private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point @@ -61,6 +62,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { this.globalCheckpointSupplier = globalCheckpointSupplier; this.startingCommit = startingCommit; this.snapshottedCommits = new ObjectIntHashMap<>(); + this.pendingSnapshots = 0; } @Override @@ -163,6 +165,7 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; snapshottedCommits.addTo(snapshotting, 1); // increase refCount + pendingSnapshots++; return new SnapshotIndexCommit(snapshotting); } @@ -178,6 +181,7 @@ synchronized void releaseCommit(final IndexCommit snapshotCommit) { if (refCount == 0) { snapshottedCommits.remove(releasingCommit); } + pendingSnapshots--; } /** @@ -235,7 +239,7 @@ private static int indexOfKeptCommits(List commits, long */ boolean hasUnreferencedCommits() throws IOException { final IndexCommit lastCommit = this.lastCommit; - if (safeCommit != lastCommit) { // Race condition can happen but harmless + if (safeCommit != lastCommit && pendingSnapshots == 0) { // Race condition can happen but harmless if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) { final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)); // We can clean up the current safe commit if the last commit is safe diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index f663504da9fa7..1aa81a0765162 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -277,6 +277,9 @@ public void testCheckUnreferencedCommits() throws Exception { assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Advanced enough globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); + final IndexCommit snapshot = indexPolicy.acquireIndexCommit(randomBoolean()); + assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); // Having snapshot -> false. + indexPolicy.releaseCommit(snapshot); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList);