Skip to content

Commit

Permalink
Clean up commits when global checkpoint advanced (#28140)
Browse files Browse the repository at this point in the history
Today we keep multiple index commits based on the current global 
checkpoint, but only clean up unneeded index commits when we have a new 
index commit. However, we can release the old index commits earlier once
the global checkpoint has advanced enough. This commit makes an engine
revisit the index deletion policy whenever a new global checkpoint value
is persisted and advanced enough.

Relates #10708
  • Loading branch information
dnhatn authored Jan 18, 2018
1 parent a6a57a7 commit 9db9bd5
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ private void maybeFSyncTranslogs() {
try {
Translog translog = shard.getTranslog();
if (translog.syncNeeded()) {
translog.sync();
shard.sync();
}
} catch (AlreadyClosedException ex) {
// fine - continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
Expand Down Expand Up @@ -214,6 +214,21 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
return 0;
}

/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
*/
boolean hasUnreferencedCommits() throws IOException {
final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
}
return false;
}

/**
* A wrapper of an index commit that prevents it from being deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Stream;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -549,6 +550,13 @@ public enum SearcherScope {
/** returns the translog for this engine */
public abstract Translog getTranslog();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

protected void ensureOpen() {
if (isClosed.get()) {
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -94,6 +93,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -520,6 +520,27 @@ public Translog getTranslog() {
return translog;
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
if (synced) {
revisitIndexDeletionPolicyOnTranslogSynced();
}
return synced;
}

@Override
public void syncTranslog() throws IOException {
translog.sync();
revisitIndexDeletionPolicyOnTranslogSynced();
}

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
}

@Override
public String getHistoryUUID() {
return historyUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind
}

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getTranslog().sync();
indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2316,8 +2316,7 @@ public int getActiveOperationsCount() {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
final Engine engine = getEngine();
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
getEngine().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
Expand All @@ -2342,9 +2341,9 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
translogSyncProcessor.put(location, syncListener);
}

public final void sync() throws IOException {
public void sync() throws IOException {
verifyNotClosed();
getEngine().getTranslog().sync();
getEngine().syncTranslog();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,44 @@ public void testKeepOnlyStartingCommitOnInit() throws Exception {
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
}

public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 50);
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
}
IndexCommit safeCommit = randomFrom(commitList);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
indexPolicy.onCommit(commitList);
if (safeCommit == commitList.get(commitList.size() - 1)) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
// Advanced enough
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
}
}

IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4414,4 +4414,29 @@ public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
}
}

public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Global checkpoint advanced but not enough - all commits are kept.
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
// Global checkpoint advanced enough - only the last commit is kept.
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ protected PrimaryResult performOnPrimary(

@Override
protected void performOnReplica(final GlobalCheckpointSyncAction.Request request, final IndexShard replica) throws IOException {
replica.getTranslog().sync();
replica.sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
}

if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
verify(translog, never()).sync();
verify(indexShard, never()).sync();
} else {
verify(translog).sync();
verify(indexShard).sync();
}
}

Expand Down

0 comments on commit 9db9bd5

Please sign in to comment.