Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move trimming unsafe commits from the Engine constructor to Store #29260

Merged
merged 11 commits into from
Mar 29, 2018
4 changes: 2 additions & 2 deletions docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ which returns something similar to:
{
"commit" : {
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
"generation" : 4,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one was tricky to trace - using a starting commit Lucene, immediately increments the pending changes counter, which means that the flush issued in the docs will actually cause a commit in Lucene. Now that we don't use a starting commit, the uncommittedChanges counter in Lucene stays 0 when opening the engine and the flush in the docs becomes a noop.

"generation" : 3,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "3",
"translog_generation" : "2",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,60 +47,27 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) {
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.startingCommit = startingCommit;
this.snapshottedCommits = new ObjectIntHashMap<>();
}

@Override
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
assert commits.isEmpty() == false : "index is opened, but we have no commits";
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
updateTranslogDeletionPolicy();
}

/**
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
* at the recovering time but they can suddenly become safe in the future.
* The following issues can happen if unsafe commits are kept oninit.
* <p>
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
* <p>
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
for (IndexCommit commit : commits) {
if (startingCommit.equals(commit) == false) {
this.deleteCommit(commit);
}
onCommit(commits);
if (safeCommit != commits.get(commits.size() - 1)) {
throw new IllegalStateException("Engine is opened, but the last commit isn't safe. Global checkpoint ["
+ globalCheckpointSupplier.getAsLong() + "], seqNo is last commit ["
+ SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommit.getUserData().entrySet()) + "], "
+ "seqNos in safe commit [" + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()) + "]");
}
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
lastCommit = startingCommit;
safeCommit = startingCommit;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
Expand All @@ -59,6 +57,7 @@
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand All @@ -70,15 +69,13 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -181,12 +178,10 @@ public InternalEngine(EngineConfig engineConfig) {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
final IndexCommit startingCommit = getStartingCommitPoint();
assert startingCommit != null : "Starting commit should be non-null";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(startingCommit);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer);
Objects.requireNonNull(historyUUID, "history uuid should not be null");
Expand Down Expand Up @@ -230,10 +225,11 @@ public InternalEngine(EngineConfig engineConfig) {
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit);
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
Expand Down Expand Up @@ -393,31 +389,6 @@ public void skipTranslogRecovery() {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private IndexCommit getStartingCommitPoint() throws IOException {
final IndexCommit startingIndexCommit;
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
return startingIndexCommit;
}

private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
Expand Down Expand Up @@ -1875,9 +1846,9 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
}
}

private IndexWriter createWriter(IndexCommit startingCommit) throws IOException {
private IndexWriter createWriter() throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit);
final IndexWriterConfig iwc = getIndexWriterConfig();
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand All @@ -1890,11 +1861,10 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
return new IndexWriter(directory, iwc);
}

private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) {
private IndexWriterConfig getIndexWriterConfig() {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,13 @@ public CommitInfo(long maxSeqNo, long localCheckpoint) {
this.maxSeqNo = maxSeqNo;
this.localCheckpoint = localCheckpoint;
}

@Override
public String toString() {
return "CommitInfo{" +
"maxSeqNo=" + maxSeqNo +
", localCheckpoint=" + localCheckpoint +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,9 @@ private void innerOpenEngineAndTranslog() throws IOException {

assertMaxUnsafeAutoIdInCommit();

final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


createNewEngine(config);
verifyNotClosed();
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
Expand Down
Loading