Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up commits when global checkpoint advanced #28140

Merged
merged 11 commits into from
Jan 18, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ private void maybeFSyncTranslogs() {
try {
Translog translog = shard.getTranslog();
if (translog.syncNeeded()) {
translog.sync();
shard.sync();
}
} catch (AlreadyClosedException ex) {
// fine - continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
Expand Down Expand Up @@ -214,6 +214,21 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
return 0;
}

/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
*/
boolean hasUnreferencedCommits() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an idea for a follow up, shall we extend this method to look at the fact that a snapshotted commit was released (via a special flag we set when a snapshot count reaches 0)? we need to figure where to call it, but I think might be worth exploring.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes, I am thinking to always revisit the index deletion policy (without checking the unreferenced condition) when releasing a snapshotted commit in an Engine (InternalEngine#acquireIndexCommit). We don't acquire index commits too frequently and revisiting the policy should not be expensive. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn I think I wasn't clear. The idea was to add a "pendingSnapshots" flag that is set when a snapshot reference goes to 0 and is cleared onCommit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bleskes. I got the idea.

final IndexCommit lastCommit = this.lastCommit;
if (safeCommit != lastCommit) { // Race condition can happen but harmless
if (lastCommit.getUserData().containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
// We can clean up the current safe commit if the last commit is safe
return globalCheckpointSupplier.getAsLong() >= maxSeqNoFromLastCommit;
}
}
return false;
}

/**
* A wrapper of an index commit that prevents it from being deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.stream.Stream;

public abstract class Engine implements Closeable {

Expand Down Expand Up @@ -549,6 +550,13 @@ public enum SearcherScope {
/** returns the translog for this engine */
public abstract Translog getTranslog();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

protected void ensureOpen() {
if (isClosed.get()) {
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -94,6 +93,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -520,6 +520,27 @@ public Translog getTranslog() {
return translog;
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
if (synced) {
revisitIndexDeletionPolicyOnTranslogSynced();
}
return synced;
}

@Override
public void syncTranslog() throws IOException {
translog.sync();
revisitIndexDeletionPolicyOnTranslogSynced();
}

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
}

@Override
public String getHistoryUUID() {
return historyUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ protected ReplicaResult shardOperationOnReplica(final Request request, final Ind
}

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getTranslog().sync();
indexShard.getTranslog().getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2316,8 +2316,7 @@ public int getActiveOperationsCount() {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
final Engine engine = getEngine();
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
getEngine().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
Expand All @@ -2342,9 +2341,9 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
translogSyncProcessor.put(location, syncListener);
}

public final void sync() throws IOException {
public void sync() throws IOException {
verifyNotClosed();
getEngine().getTranslog().sync();
getEngine().syncTranslog();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,44 @@ public void testKeepOnlyStartingCommitOnInit() throws Exception {
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
}

public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 50);
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen));
}
IndexCommit safeCommit = randomFrom(commitList);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
indexPolicy.onCommit(commitList);
if (safeCommit == commitList.get(commitList.size() - 1)) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), lastMaxSeqNo - 1));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
// Advanced enough
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
}
}

IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4414,4 +4414,29 @@ public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
}
}

public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
final int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
}
}
engine.flush(false, randomBoolean());
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Global checkpoint advanced but not enough - all commits are kept.
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
// Global checkpoint advanced enough - only the last commit is kept.
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ protected PrimaryResult performOnPrimary(

@Override
protected void performOnReplica(final GlobalCheckpointSyncAction.Request request, final IndexShard replica) throws IOException {
replica.getTranslog().sync();
replica.sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
}

if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
verify(translog, never()).sync();
verify(indexShard, never()).sync();
} else {
verify(translog).sync();
verify(indexShard).sync();
}
}

Expand Down