Skip to content

Commit

Permalink
Recover peers using history from Lucene
Browse files Browse the repository at this point in the history
Thanks to peer recovery retention leases we now retain the history needed to
perform peer recoveries from the index instead of from the translog. This
commit adjusts the peer recovery process to do so, and also adjusts it to use
the existence of a retention lease to decide whether or not to attempt an
operations-based recovery.

Reverts elastic#38904 and elastic#42211
Relates elastic#41536
  • Loading branch information
DaveCTurner committed Jul 25, 2019
1 parent 6275cd7 commit 674ddf2
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,18 +514,30 @@ public void syncTranslog() throws IOException {
}

/**
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
}

return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) {
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
}

return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}

Expand Down Expand Up @@ -2568,6 +2580,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
}

final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
Expand Down Expand Up @@ -2597,15 +2613,7 @@ public final long getMinRetainedSeqNo() {
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
final Closeable translogRetentionLock;
try {
translogRetentionLock = translog.acquireRetentionLock();
} catch (Exception e) {
softDeletesRetentionLock.close();
throw e;
}
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
Expand All @@ -52,7 +53,8 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -149,6 +151,10 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
};

final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE;
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();

runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
Expand All @@ -158,13 +164,32 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get(
ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
final Closeable retentionLock = shard.acquireRetentionLock();
resources.add(retentionLock);

final long startingSeqNo;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
final boolean isSequenceNumberBasedRecovery
= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
&& (useRetentionLeases == false
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));

final Closeable retentionLock;
if (isSequenceNumberBasedRecovery && useRetentionLeases) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock = () -> {};
logger.trace("history is retained by {}", retentionLeaseRef.get());
} else {
// temporarily prevent any history from being discarded, and do this before acquiring the safe commit so that we can
// be certain that all operations after the safe commit's local checkpoint will be retained for the duration of this
// recovery.
retentionLock = shard.acquireRetentionLock();
resources.add(retentionLock);
logger.trace("history is retained by retention lock");
}

final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<ReplicationResponse> establishRetentionLeaseStep = new StepListener<>();
Expand All @@ -184,9 +209,22 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
// still filter out legacy operations without seqNo.
startingSeqNo = 0;

// Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being
// able to recover other replicas using operations-based recoveries. If we are not using retention leases then we
// conservatively copy all available operations. If we are using retention leases then "enough operations" is just the
// operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains
// at least as much history as anything else. The safe commit will often contain all the history retained by the current set
// of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a
// retention lease for some history that this primary already discarded, since we discard history when the global checkpoint
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down.
startingSeqNo = useRetentionLeases
? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
: 0;
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
shard.store().incRef();
Expand All @@ -201,8 +239,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
});

final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
if (shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
if (useRetentionLeases) {
runUnderPrimaryPermit(() -> {
try {
// If the target previously had a copy of this shard then a file-based recovery might move its global
Expand Down Expand Up @@ -233,20 +270,19 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;

sendFileStep.whenComplete(r -> {
if (shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
if (useRetentionLeases && isSequenceNumberBasedRecovery == false) {
// We can in general use retention leases for peer recovery, but there is no lease for the target node right now.
runUnderPrimaryPermit(() -> {
try {
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate
final long globalCheckpoint = startingSeqNo - 1;
// blindly create the lease. TODO integrate this with the recovery process
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint,
// Start the lease off retaining all the history needed from the local checkpoint of the safe commit that we've
// just established on the replica. This primary is certainly retaining such history, but other replicas might
// not be. No big deal if this recovery succeeds, but if this primary fails then the new primary might have to
// repeat phase 1 to recover this replica.
// TODO TBD maybe do this earlier?
final long localCheckpointOfSafeCommit = startingSeqNo - 1;
logger.trace("creating new retention lease at [{}]", localCheckpointOfSafeCommit);
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), localCheckpointOfSafeCommit,
new ThreadedActionListener<>(logger, shard.getThreadPool(),
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
} catch (RetentionLeaseAlreadyExistsException e) {
logger.debug("peer-recovery retention lease already exists", e);
establishRetentionLeaseStep.onResponse(null);
}
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
shard, cancellableThreads, logger);
} else {
Expand All @@ -255,6 +291,11 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
}, onFailure);

establishRetentionLeaseStep.whenComplete(r -> {
if (useRetentionLeases) {
// all the history we need is now retained by a retention lease so we can discard the retention lock
retentionLock.close();
}

assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
Expand All @@ -273,14 +314,16 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (logger.isTraceEnabled()) {
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close();

if (useRetentionLeases == false) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close();
}

// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -427,8 +428,12 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
.setSettings(Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 1)

// disable merges to keep segments the same
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)

// expire retention leases quickly
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
).get();

logger.info("--> indexing docs");
Expand Down Expand Up @@ -472,10 +477,13 @@ public Settings onNodeStopped(String nodeName) throws Exception {
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "0s")
).get();
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
assertBusy(() -> assertThat(client().admin().indices().prepareStats("test").get().getShards()[0]
.getRetentionLeaseStats().retentionLeases().leases().size(), equalTo(1)));
client().admin().indices().prepareFlush("test").setForce(true).get();
if (softDeleteEnabled) { // We need an extra flush to advance the min_retained_seqno of the SoftDeletesPolicy
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
client().admin().indices().prepareFlush("test").setForce(true).get();
}
return super.onNodeStopped(nodeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -79,6 +80,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
Expand Down Expand Up @@ -290,6 +292,15 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception {
// We need an extra flush to advance the min_retained_seqno on the new primary so ops-based won't happen.
// The min_retained_seqno only advances when a merge asks for the retention query.
newPrimary.flush(new FlushRequest().force(true));

// We also need to make sure that there is no retention lease holding on to any history. The lease for the old primary
// expires since there are no unassigned shards in this replication group).
assertBusy(() -> {
newPrimary.syncRetentionLeases();
//noinspection OptionalGetWithoutIsPresent since there must be at least one lease
assertThat(newPrimary.getRetentionLeases().leases().stream().mapToLong(RetentionLease::retainingSequenceNumber)
.min().getAsLong(), greaterThan(newPrimary.seqNoStats().getMaxSeqNo()));
});
}
uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10));
totalDocs += uncommittedOpsOnPrimary;
Expand Down
Loading

0 comments on commit 674ddf2

Please sign in to comment.