diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
index e21b816aefd80..103817224790c 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -1877,11 +1877,6 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;
- /**
- * Do not replay translog operations, but make the engine be ready.
- */
- public abstract void skipTranslogRecovery();
-
/**
* Returns true
iff this engine is currently recovering from translog.
*/
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index dd4b00f2a3f7f..e83b1dc5cbbad 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -447,12 +447,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
return this;
}
- @Override
- public void skipTranslogRecovery() {
- assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
- pendingTranslogRecovery.set(false); // we are good - now we can commit
- }
-
private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
index 79c8331061636..5092f19a5d921 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
@@ -444,10 +444,6 @@ public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryR
return this;
}
- @Override
- public void skipTranslogRecovery() {
- }
-
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
}
diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 2372353ef85c1..a809c5ec30de7 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -1393,8 +1393,10 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
+ *
+ * @param recoverUpToSeqNo the upper bound of sequence number to be recovered (inclusive)
**/
- public void openEngineAndRecoverFromTranslog() throws IOException {
+ public void openEngineAndRecoverFromTranslog(long recoverUpToSeqNo) throws IOException {
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
@@ -1403,16 +1405,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
- getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
- }
-
- /**
- * Opens the engine on top of the existing lucene engine and translog.
- * The translog is kept but its operations won't be replayed.
- */
- public void openEngineAndSkipTranslogRecovery() throws IOException {
- innerOpenEngineAndTranslog();
- getEngine().skipTranslogRecovery();
+ getEngine().recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}
private void innerOpenEngineAndTranslog() throws IOException {
@@ -1788,8 +1781,10 @@ public List segments(boolean verbose) {
return getEngine().segments(verbose);
}
- public void flushAndCloseEngine() throws IOException {
- getEngine().flushAndClose();
+ public void closeEngine() throws IOException {
+ synchronized (mutex) {
+ IOUtils.close(this.currentEngineReference.getAndSet(null));
+ }
}
public String getHistoryUUID() {
diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
index fae3703027f9e..05b894e156c90 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
@@ -422,7 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
}
- indexShard.openEngineAndRecoverFromTranslog();
+ indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
@@ -480,7 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
- indexShard.openEngineAndRecoverFromTranslog();
+ indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
index f53e8edecd9e6..3e02160c3458c 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
@@ -71,7 +71,7 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
- transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC,
+ transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
new StartRecoveryTransportRequestHandler());
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 6b1a893667f2c..630da951ab1a5 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -54,7 +54,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
@@ -119,7 +118,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
- RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
+ RecoveryPrepareForTranslogRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
@@ -345,9 +344,10 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
+ final long globalCheckpoint = recoveryTarget.readGlobalCheckpointFromTranslog();
final long startingSeqNo;
if (metadataSnapshot.size() > 0) {
- startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
+ startingSeqNo = getStartingSeqNo(logger, recoveryTarget, globalCheckpoint);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
@@ -370,7 +370,8 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
- startingSeqNo);
+ startingSeqNo,
+ globalCheckpoint);
return request;
}
@@ -381,11 +382,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
- public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
+ public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget, final long globalCheckpoint) {
try {
final Store store = recoveryTarget.store();
- final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
- final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
@@ -424,14 +423,15 @@ public interface RecoveryListener {
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}
- class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler {
+ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler {
@Override
- public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
+ public void messageReceived(RecoveryPrepareForTranslogRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
- final ActionListener listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
+ final ActionListener listener = new ChannelActionListener<>(
+ channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
- ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
+ request.recoverUpToSeqNo(), ActionListener.map(listener, RecoveryPrepareForTranslogOperationsResponse::new));
}
}
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java
new file mode 100644
index 0000000000000..d573bdce45424
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.indices.recovery;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.transport.TransportResponse;
+
+import java.io.IOException;
+import java.util.Optional;
+
+final class RecoveryPrepareForTranslogOperationsResponse extends TransportResponse {
+ final Optional targetMetadata;
+
+ RecoveryPrepareForTranslogOperationsResponse(Optional targetMetadata) {
+ this.targetMetadata = targetMetadata;
+ }
+
+ RecoveryPrepareForTranslogOperationsResponse(final StreamInput in) throws IOException {
+ super(in);
+ if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+ targetMetadata = Optional.ofNullable(in.readOptionalWriteable(Store.MetadataSnapshot::new));
+ } else {
+ targetMetadata = Optional.empty();
+ }
+ }
+
+ @Override
+ public void writeTo(final StreamOutput out) throws IOException {
+ super.writeTo(out);
+ if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+ out.writeOptionalWriteable(targetMetadata.orElse(null));
+ }
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogRequest.java
similarity index 71%
rename from server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
rename to server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogRequest.java
index 6e2557176a82e..ecd3b0fd0d7c0 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogRequest.java
@@ -19,33 +19,43 @@
package org.elasticsearch.indices.recovery;
+import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
-class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
+final class RecoveryPrepareForTranslogRequest extends TransportRequest {
private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean fileBasedRecovery;
+ private final long recoverUpToSeqNo;
- RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) {
+ RecoveryPrepareForTranslogRequest(long recoveryId, ShardId shardId, int totalTranslogOps,
+ boolean fileBasedRecovery, long recoverUpToSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.fileBasedRecovery = fileBasedRecovery;
+ this.recoverUpToSeqNo = recoverUpToSeqNo;
}
- RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
+ RecoveryPrepareForTranslogRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
totalTranslogOps = in.readVInt();
fileBasedRecovery = in.readBoolean();
+ if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+ recoverUpToSeqNo = in.readZLong();
+ } else {
+ recoverUpToSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
+ }
}
public long recoveryId() {
@@ -67,6 +77,10 @@ public boolean isFileBasedRecovery() {
return fileBasedRecovery;
}
+ public long recoverUpToSeqNo() {
+ return recoverUpToSeqNo;
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@@ -74,5 +88,8 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
out.writeBoolean(fileBasedRecovery);
+ if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+ out.writeZLong(recoverUpToSeqNo);
+ }
}
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
index fdada82c5bc56..ae7760981f8f7 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
@@ -69,6 +69,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -155,45 +156,55 @@ public void recoverToTarget(ActionListener listener) {
shard, cancellableThreads, logger);
final Closeable retentionLock = shard.acquireRetentionLock();
resources.add(retentionLock);
- final long startingSeqNo;
- final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
- isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
- final SendFileResult sendFileResult;
- if (isSequenceNumberBasedRecovery) {
- logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
- startingSeqNo = request.startingSeqNo();
- sendFileResult = SendFileResult.EMPTY;
- } else {
- final Engine.IndexCommitRef phase1Snapshot;
- try {
- phase1Snapshot = shard.acquireSafeIndexCommit();
- } catch (final Exception e) {
- throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
- }
- // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
- // still filter out legacy operations without seqNo.
- startingSeqNo = 0;
- try {
- final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
- sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps);
- } catch (final Exception e) {
- throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
- } finally {
+
+ final StepListener prepareTargetStep = new StepListener<>();
+ prepareTargetForSequenceBasedRecovery(prepareTargetStep);
+
+ final StepListener phase1Step = new StepListener<>();
+ prepareTargetStep.whenComplete(r -> ActionListener.completeWith(phase1Step, () -> {
+ if (r.isSequenceNumberRecovery) {
+ logger.trace("performing sequence numbers based recovery. starting at [{}]", r.startingSeqNo);
+ return SendFileResult.EMPTY;
+ } else {
+ final Engine.IndexCommitRef phase1Snapshot;
try {
- IOUtils.close(phase1Snapshot);
- } catch (final IOException ex) {
- logger.warn("releasing snapshot caused exception", ex);
+ phase1Snapshot = shard.acquireSafeIndexCommit();
+ } catch (final Exception e) {
+ throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
+ }
+ try {
+ final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", r.startingSeqNo);
+ return phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(),
+ () -> estimateNumOps, r.recoveryTargetMetadata);
+ } catch (final Exception e) {
+ throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
+ } finally {
+ try {
+ IOUtils.close(phase1Snapshot);
+ } catch (final IOException ex) {
+ logger.warn("releasing snapshot caused exception", ex);
+ }
}
}
- }
- assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
+ }), onFailure);
+
+ final StepListener startEngineStep = new StepListener<>();
+ phase1Step.whenComplete(r -> {
+ if (prepareTargetStep.result().openEngineTime != null) {
+ // engine is started already since we have asked it to recover up to the global checkpoint
+ startEngineStep.onResponse(prepareTargetStep.result().openEngineTime);
+ } else {
+ startEngineOnTarget(
+ prepareTargetStep.result().isSequenceNumberRecovery == false,
+ shard.estimateNumberOfHistoryOperations("peer-recovery", prepareTargetStep.result().startingSeqNo),
+ SequenceNumbers.UNASSIGNED_SEQ_NO,
+ ActionListener.map(startEngineStep, v -> v.tookTime));
+ }
+ }, onFailure);
- final StepListener prepareEngineStep = new StepListener<>();
- // For a sequence based recovery, the target can keep its local translog
- prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
- shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
final StepListener sendSnapshotStep = new StepListener<>();
- prepareEngineStep.whenComplete(prepareEngineTime -> {
+ startEngineStep.whenComplete(r -> {
+ final long startingSeqNo = prepareTargetStep.result().startingSeqNo;
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
@@ -221,24 +232,24 @@ public void recoverToTarget(ActionListener listener) {
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
sendSnapshotStep.whenComplete(
- r -> IOUtils.close(phase2Snapshot),
+ v -> IOUtils.close(phase2Snapshot),
e -> {
IOUtils.closeWhileHandlingException(phase2Snapshot);
onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
});
-
}, onFailure);
final StepListener finalizeStep = new StepListener<>();
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
finalizeStep.whenComplete(r -> {
+ final SendFileResult sendFileResult = phase1Step.result();
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
- prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
+ startEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
} finally {
@@ -295,6 +306,57 @@ public void onFailure(Exception e) {
});
}
+ static final class PrepareTargetResult {
+ final boolean isSequenceNumberRecovery;
+ final long startingSeqNo;
+ final TimeValue openEngineTime;
+ final Store.MetadataSnapshot recoveryTargetMetadata;
+
+ PrepareTargetResult(boolean isSequenceNumberRecovery, long startingSeqNo,
+ TimeValue openEngineTime, Store.MetadataSnapshot recoveryTargetMetadata) {
+ this.isSequenceNumberRecovery = isSequenceNumberRecovery;
+ this.startingSeqNo = startingSeqNo;
+ this.openEngineTime = openEngineTime;
+ this.recoveryTargetMetadata = recoveryTargetMetadata;
+ }
+ }
+
+ /**
+ * Prepares the recovery target for sequence number based recovery if possible. If the recovery source has all required operations
+ * up to the {@link StartRecoveryRequest#startingSeqNo()}, then it can perform sequence number based recovery without coordinating
+ * with the recovery target. However, if its history is only up to the global checkpoint on the target, it needs to ask the target
+ * to locally recover up to the global checkpoint then perform a sequence number based recovery. If the target can recover properly,
+ * then it will leave its engine open for a sequence number recovery; otherwise it needs to close its engine, capture and send back
+ * the latest metadata to the recovery source for a file-based recovery.
+ */
+ void prepareTargetForSequenceBasedRecovery(ActionListener listener) throws IOException {
+ if (request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory()) {
+ if (shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())) {
+ listener.onResponse(new PrepareTargetResult(true, request.startingSeqNo(), null, request.metadataSnapshot()));
+ } else {
+ final long startingSeqNo = request.globalCheckpoint() + 1;
+ if (request.startingSeqNo() < startingSeqNo && shard.hasCompleteHistoryOperations("peer-recovery", startingSeqNo)) {
+ startEngineOnTarget(
+ false,
+ shard.estimateNumberOfHistoryOperations("peer-recovery", request.globalCheckpoint()),
+ request.globalCheckpoint(),
+ ActionListener.map(listener, r -> {
+ if (r.newSnapshot == null) {
+ return new PrepareTargetResult(true, startingSeqNo, r.tookTime, request.metadataSnapshot());
+ } else {
+ // the target does not have the required translog - fallback to file-based recovery with the new snapshot
+ return new PrepareTargetResult(false, 0, null, r.newSnapshot);
+ }
+ }));
+ } else {
+ listener.onResponse(new PrepareTargetResult(false, 0, null, request.metadataSnapshot()));
+ }
+ }
+ } else {
+ listener.onResponse(new PrepareTargetResult(false, 0, null, request.metadataSnapshot()));
+ }
+ }
+
static final class SendFileResult {
final List phase1FileNames;
final List phase1FileSizes;
@@ -330,7 +392,9 @@ static final class SendFileResult {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
- public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) {
+ SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint,
+ final Supplier translogOps, final Store.MetadataSnapshot recoveryTargetMetadata) {
+ // TODO: pass new metadata
cancellableThreads.checkForCancel();
// Total size of segment files that are recovered
long totalSize = 0;
@@ -359,11 +423,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp
recoverySourceMetadata.asMap().size() + " files", name);
}
}
- if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
+ if (canSkipPhase1(recoverySourceMetadata, recoveryTargetMetadata) == false) {
// Generate a "diff" of all the identical, different, and missing
// segment files on the target node, using the existing files on
// the source node
- final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
+ final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
phase1ExistingFileNames.add(md.name());
phase1ExistingFileSizes.add(md.length());
@@ -378,9 +442,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp
phase1Files.addAll(diff.different);
phase1Files.addAll(diff.missing);
for (StoreFileMetaData md : phase1Files) {
- if (request.metadataSnapshot().asMap().containsKey(md.name())) {
+ if (recoveryTargetMetadata.asMap().containsKey(md.name())) {
logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
- md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
+ md.name(), recoveryTargetMetadata.asMap().get(md.name()), md);
} else {
logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());
}
@@ -478,21 +542,32 @@ boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot targ
return true;
}
- void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
+ static final class StartEngineResult {
+ final TimeValue tookTime;
+ final Store.MetadataSnapshot newSnapshot;
+
+ StartEngineResult(TimeValue tookTime, Store.MetadataSnapshot newSnapshot) {
+ this.tookTime = tookTime;
+ this.newSnapshot = newSnapshot;
+ }
+ }
+
+ void startEngineOnTarget(boolean fileBasedRecovery, int totalTranslogOps,
+ long recoverUpToSeqNo, ActionListener listener) {
StopWatch stopWatch = new StopWatch().start();
- final ActionListener wrappedListener = ActionListener.wrap(
- nullVal -> {
+ final ActionListener> wrappedListener = ActionListener.wrap(
+ r -> {
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
- listener.onResponse(tookTime);
+ listener.onResponse(new StartEngineResult(tookTime, r.orElse(null)));
},
e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)));
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
logger.trace("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.execute(() ->
- recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener));
+ recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, wrappedListener));
}
/**
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index b3c6d12ab96e3..285fdb1723215 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -47,10 +47,11 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogCorruptedException;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -284,11 +285,21 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener> listener) {
ActionListener.completeWith(listener, () -> {
- state().getTranslog().totalOperations(totalTranslogOps);
- indexShard().openEngineAndSkipTranslogRecovery();
- return null;
+ if (Assertions.ENABLED && recoverUpToSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+ final long gcp = readGlobalCheckpointFromTranslog();
+ assert recoverUpToSeqNo == gcp : recoverUpToSeqNo + " != " + gcp;
+ }
+ indexShard().openEngineAndRecoverFromTranslog(recoverUpToSeqNo);
+ if (indexShard.getLocalCheckpoint() < recoverUpToSeqNo) {
+ // This can happen if the previous recovery was aborted after copying segments, then the target does not have all translog.
+ indexShard.closeEngine();
+ return Optional.of(indexShard.snapshotStoreMetadata());
+ }
+ state().getTranslog().totalOperations(state().getTranslog().totalOperations() + totalTranslogOps);
+ return Optional.empty();
});
}
@@ -458,7 +469,17 @@ public String getTempNameForFile(String origFile) {
return multiFileWriter.getTempNameForFile(origFile);
}
- Path translogLocation() {
- return indexShard().shardPath().resolveTranslog();
+ long readGlobalCheckpointFromTranslog() {
+ try {
+ final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
+ return Translog.readGlobalCheckpoint(indexShard.shardPath().resolveTranslog(), translogUUID);
+ } catch (final TranslogCorruptedException | IOException e) {
+ /*
+ * This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
+ * translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
+ * proceeds to attempt a sequence-number-based recovery.
+ */
+ return SequenceNumbers.UNASSIGNED_SEQ_NO;
+ }
}
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
index d03fe42d90146..5934a212fc749 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
@@ -28,16 +28,24 @@
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
public interface RecoveryTargetHandler {
/**
- * Prepares the target to receive translog operations, after all file have been copied
+ * Starts an engine an target and recovers locally up to the given sequence number (inclusive). If the target can't recover up to
+ * {@code recoverUpToSeqNo}, it has to close the engine and send back the latest index commit; otherwise it leaves the engine open.
*
* @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent
+ * @param recoverUpToSeqNo the upper bound sequence number (inclusive) that the recovery target should
+ * recover locally after opening its engine.
+ * @param listener if the engine is started and recovered properly up to @{code recoverUpToSeqNo}, this listener contains
+ * a null value; otherwise it's the latest metadata snapshot on the target. The recovery source needs to
+ * use this latest value for a file-based recovery.
*/
- void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener);
+ void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener> listener);
/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
index ec3c22d42a191..56207ae7834f5 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -77,12 +78,13 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
- new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
+ new RecoveryPrepareForTranslogRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery, recoverUpToSeqNo),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
- new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
- in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
+ new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.targetMetadata),
+ RecoveryPrepareForTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
index 4ec20d17ac5be..a9bb042cc8759 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java
@@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery;
+import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -34,17 +35,15 @@
*/
public class StartRecoveryRequest extends TransportRequest {
- private long recoveryId;
- private ShardId shardId;
- private String targetAllocationId;
- private DiscoveryNode sourceNode;
- private DiscoveryNode targetNode;
- private Store.MetadataSnapshot metadataSnapshot;
- private boolean primaryRelocation;
- private long startingSeqNo;
-
- public StartRecoveryRequest() {
- }
+ private final long recoveryId;
+ private final ShardId shardId;
+ private final String targetAllocationId;
+ private final DiscoveryNode sourceNode;
+ private final DiscoveryNode targetNode;
+ private final Store.MetadataSnapshot metadataSnapshot;
+ private final boolean primaryRelocation;
+ private final long startingSeqNo;
+ private final long globalCheckpoint;
/**
* Construct a request for starting a peer recovery.
@@ -56,7 +55,8 @@ public StartRecoveryRequest() {
* @param metadataSnapshot the Lucene metadata
* @param primaryRelocation whether or not the recovery is a primary relocation
* @param recoveryId the recovery ID
- * @param startingSeqNo the starting sequence number
+ * @param startingSeqNo the starting sequence number that the recovery target requires for a operation-based recovery
+ * @param globalCheckpoint the persisted global checkpoint on target
*/
public StartRecoveryRequest(final ShardId shardId,
final String targetAllocationId,
@@ -65,7 +65,9 @@ public StartRecoveryRequest(final ShardId shardId,
final Store.MetadataSnapshot metadataSnapshot,
final boolean primaryRelocation,
final long recoveryId,
- final long startingSeqNo) {
+ final long startingSeqNo,
+ final long globalCheckpoint) {
+ assert startingSeqNo <= (globalCheckpoint + 1) : startingSeqNo + " > " + (globalCheckpoint + 1);
this.recoveryId = recoveryId;
this.shardId = shardId;
this.targetAllocationId = targetAllocationId;
@@ -74,6 +76,7 @@ public StartRecoveryRequest(final ShardId shardId,
this.metadataSnapshot = metadataSnapshot;
this.primaryRelocation = primaryRelocation;
this.startingSeqNo = startingSeqNo;
+ this.globalCheckpoint = globalCheckpoint;
assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null :
"starting seq no is set but not history uuid";
}
@@ -110,9 +113,12 @@ public long startingSeqNo() {
return startingSeqNo;
}
- @Override
- public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
+ public long globalCheckpoint() {
+ return globalCheckpoint;
+ }
+
+ StartRecoveryRequest(StreamInput in) throws IOException {
+ super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
targetAllocationId = in.readString();
@@ -121,6 +127,12 @@ public void readFrom(StreamInput in) throws IOException {
metadataSnapshot = new Store.MetadataSnapshot(in);
primaryRelocation = in.readBoolean();
startingSeqNo = in.readLong();
+ if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+ globalCheckpoint = in.readZLong();
+ assert startingSeqNo <= (globalCheckpoint + 1) : startingSeqNo + " > " + (globalCheckpoint + 1);
+ } else {
+ globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
+ }
}
@Override
@@ -134,5 +146,8 @@ public void writeTo(StreamOutput out) throws IOException {
metadataSnapshot.writeTo(out);
out.writeBoolean(primaryRelocation);
out.writeLong(startingSeqNo);
+ if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+ out.writeZLong(globalCheckpoint);
+ }
}
}
diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 59bbee9f1bbf5..e66757a64ed6f 100644
--- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -3022,7 +3022,7 @@ public void testSkipTranslogReplay() throws IOException {
assertVisibleCount(engine, numDocs);
engine.close();
try (InternalEngine engine = new InternalEngine(config)) {
- engine.skipTranslogRecovery();
+ engine.recoverFromTranslog(translogHandler, UNASSIGNED_SEQ_NO);
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits.value, equalTo(0L));
diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
index e25557eaabcf6..7513714099dbc 100644
--- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
+++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
@@ -61,6 +61,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
@@ -197,14 +198,14 @@ public IndexResult index(Index op) throws IOException {
Future fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener) {
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
- ActionListener listener) {
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener> listener) {
try {
indexedOnPrimary.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
- super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
+ super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener);
}
});
fut.get();
diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 7e284aa4f73e9..0d5d00ea331dd 100644
--- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -144,6 +144,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
@@ -2655,7 +2656,7 @@ public void testShardActiveDuringInternalRecovery() throws IOException {
shard.prepareForIndexRecovery();
// Shard is still inactive since we haven't started recovering yet
assertFalse(shard.isActive());
- shard.openEngineAndRecoverFromTranslog();
+ shard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
// Shard should now be active since we did recover:
assertTrue(shard.isActive());
closeShards(shard);
@@ -2739,8 +2740,9 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
new RecoveryTarget(shard, discoveryNode, recoveryListener) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
- super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps,
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener> listener) {
+ super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
@@ -3812,11 +3814,13 @@ public void testCloseShardWhileResettingEngine() throws Exception {
@Override
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
long recoverUpToSeqNo) throws IOException {
- readyToCloseLatch.countDown();
- try {
- closeDoneLatch.await();
- } catch (InterruptedException e) {
- throw new AssertionError(e);
+ if (recoverUpToSeqNo != UNASSIGNED_SEQ_NO) {
+ readyToCloseLatch.countDown();
+ try {
+ closeDoneLatch.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
}
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}
@@ -3867,11 +3871,13 @@ public void testSnapshotWhileResettingEngine() throws Exception {
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
long recoverUpToSeqNo) throws IOException {
InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
- readyToSnapshotLatch.countDown();
- try {
- snapshotDoneLatch.await();
- } catch (InterruptedException e) {
- throw new AssertionError(e);
+ if (recoverUpToSeqNo != UNASSIGNED_SEQ_NO) {
+ readyToSnapshotLatch.countDown();
+ try {
+ snapshotDoneLatch.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
}
return internalEngine;
}
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
index 72eb2baeca942..3b4baf9c163b4 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
@@ -42,7 +42,7 @@ public void testDuplicateRecoveries() throws IOException {
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
- SequenceNumbers.UNASSIGNED_SEQ_NO);
+ SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.NO_OPS_PERFORMED);
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
index bb4c25e6186de..a7a9b3a2e12d4 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java
@@ -57,12 +57,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
public void testGetStartingSeqNo() throws Exception {
final IndexShard replica = newShard(false);
+ long globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
try {
// Empty store
{
recoveryEmptyReplica(replica, true);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), equalTo(0L));
recoveryTarget.decRef();
}
// Last commit is good - use it.
@@ -75,10 +76,11 @@ public void testGetStartingSeqNo() throws Exception {
}
}
flushShard(replica);
- replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test");
+ globalCheckpoint = initDocs - 1;
+ replica.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
replica.sync();
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), equalTo(initDocs));
recoveryTarget.decRef();
}
// Global checkpoint does not advance, last commit is not good - use the previous commit
@@ -92,15 +94,17 @@ public void testGetStartingSeqNo() throws Exception {
}
flushShard(replica);
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), equalTo(initDocs));
recoveryTarget.decRef();
}
// Advances the global checkpoint, a safe commit also advances
{
- replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test");
+ globalCheckpoint = initDocs + moreDocs - 1;
+ replica.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
replica.sync();
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs));
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint),
+ equalTo(initDocs + moreDocs));
recoveryTarget.decRef();
}
// Different translogUUID, fallback to file-based
@@ -119,7 +123,9 @@ public void testGetStartingSeqNo() throws Exception {
writer.commit();
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null);
- assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
+ assertThat(recoveryTarget.readGlobalCheckpointFromTranslog(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
+ assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, SequenceNumbers.NO_OPS_PERFORMED),
+ equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
recoveryTarget.decRef();
}
} finally {
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
index b00e89575ccd5..6a1c51e9a3e58 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
@@ -50,7 +50,6 @@
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
@@ -92,6 +91,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -203,6 +203,8 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY :
new Store.MetadataSnapshot(Collections.emptyMap(),
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100));
+ final long startingSeqNo = randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
+ SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong();
return new StartRecoveryRequest(
shardId,
null,
@@ -211,8 +213,8 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
metadataSnapshot,
randomBoolean(),
randomNonNegativeLong(),
- randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
- SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
+ startingSeqNo,
+ startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ? startingSeqNo : randomLongBetween(startingSeqNo, Long.MAX_VALUE));
}
public void testSendSnapshotSendsOps() throws IOException {
@@ -478,15 +480,17 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
between(1, 8)) {
@Override
- public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) {
+ SendFileResult phase1(IndexCommit snapshot, long globalCheckpoint, Supplier translogOps,
+ Store.MetadataSnapshot recoveryTargetMetadata) {
phase1Called.set(true);
- return super.phase1(snapshot, globalCheckpoint, translogOps);
+ return super.phase1(snapshot, globalCheckpoint, translogOps, recoveryTargetMetadata);
}
@Override
- void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
+ void startEngineOnTarget(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener listener) {
prepareTargetForTranslogCalled.set(true);
- super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener);
+ super.startEngineOnTarget(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener);
}
@Override
@@ -729,8 +733,11 @@ private List generateFiles(Store store, int numFiles, IntSupp
}
class TestRecoveryTargetHandler implements RecoveryTargetHandler {
+
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
+ long recoverUpToSeqNo, ActionListener> listener) {
+
}
@Override
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
index c3f6a3aae89fb..031fff8b7fe18 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
@@ -51,6 +51,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@@ -330,8 +331,9 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception {
updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData());
recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
- super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo,
+ ActionListener> listener) {
+ super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener);
assertThat(replicaShard.getLastKnownGlobalCheckpoint(), equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
}
@Override
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
index e77bf5f8d4ae5..52097de47051b 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java
@@ -46,6 +46,10 @@ public void testSerialization() throws Exception {
Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY :
new Store.MetadataSnapshot(Collections.emptyMap(),
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100));
+ final long startingSeqNo = randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
+ SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong();
+ final long globalCheckpoint = startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ? SequenceNumbers.UNASSIGNED_SEQ_NO
+ : randomLongBetween(startingSeqNo - 1, Long.MAX_VALUE);
final StartRecoveryRequest outRequest = new StartRecoveryRequest(
new ShardId("test", "_na_", 0),
UUIDs.randomBase64UUID(),
@@ -54,8 +58,8 @@ public void testSerialization() throws Exception {
metadataSnapshot,
randomBoolean(),
randomNonNegativeLong(),
- randomBoolean() || metadataSnapshot.getHistoryUUID() == null ?
- SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
+ startingSeqNo,
+ globalCheckpoint);
final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
@@ -65,17 +69,21 @@ public void testSerialization() throws Exception {
final ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
- final StartRecoveryRequest inRequest = new StartRecoveryRequest();
- inRequest.readFrom(in);
+ final StartRecoveryRequest inRequest = new StartRecoveryRequest(in);
- assertThat(outRequest.shardId(), equalTo(inRequest.shardId()));
- assertThat(outRequest.targetAllocationId(), equalTo(inRequest.targetAllocationId()));
- assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode()));
- assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode()));
- assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap()));
- assertThat(outRequest.isPrimaryRelocation(), equalTo(inRequest.isPrimaryRelocation()));
- assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId()));
- assertThat(outRequest.startingSeqNo(), equalTo(inRequest.startingSeqNo()));
+ assertThat(inRequest.shardId(), equalTo(outRequest.shardId()));
+ assertThat(inRequest.targetAllocationId(), equalTo(outRequest.targetAllocationId()));
+ assertThat(inRequest.sourceNode(), equalTo(outRequest.sourceNode()));
+ assertThat(inRequest.targetNode(), equalTo(outRequest.targetNode()));
+ assertThat(inRequest.metadataSnapshot().asMap(), equalTo(outRequest.metadataSnapshot().asMap()));
+ assertThat(inRequest.isPrimaryRelocation(), equalTo(outRequest.isPrimaryRelocation()));
+ assertThat(inRequest.recoveryId(), equalTo(outRequest.recoveryId()));
+ assertThat(inRequest.startingSeqNo(), equalTo(startingSeqNo));
+ if (targetNodeVersion.onOrAfter(Version.V_8_0_0)) {
+ assertThat(inRequest.globalCheckpoint(), equalTo(globalCheckpoint));
+ } else {
+ assertThat(inRequest.globalCheckpoint(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
+ }
}
}
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 4b5be29205778..89c25ca764ca4 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -620,15 +620,18 @@ protected final void recoverUnstartedReplica(final IndexShard replica,
final String targetAllocationId = recoveryTarget.indexShard().routingEntry().allocationId().getId();
final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica);
+ final long globalCheckpoint;
final long startingSeqNo;
if (snapshot.size() > 0) {
- startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget);
+ final String translogUUID = replica.store().readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
+ globalCheckpoint = Translog.readGlobalCheckpoint(replica.shardPath().resolveTranslog(), translogUUID);
+ startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint);
} else {
+ globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
-
final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
- pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
+ pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo, globalCheckpoint);
final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
index d5a7ab8109e12..c32cd9b0070e6 100644
--- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
+++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java
@@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.Executor;
/**
@@ -46,8 +47,9 @@ public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) {
}
@Override
- public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) {
- executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener));
+ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
+ long recoverUpToSeqNo, ActionListener> listener) {
+ executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener));
}
@Override
diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
index 98bfa1b2068bb..045f9a010644b 100644
--- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
+++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
@@ -494,7 +494,8 @@ private void runFollowTest(CheckedBiConsumer