From f8c0b1560661961c683fc2659f5ddee0c55697a7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 19 Jun 2019 16:53:18 -0400 Subject: [PATCH] Use global checkpoint as base for seq based recovery --- .../elasticsearch/index/engine/Engine.java | 5 - .../index/engine/InternalEngine.java | 6 - .../index/engine/ReadOnlyEngine.java | 4 - .../elasticsearch/index/shard/IndexShard.java | 21 +-- .../index/shard/StoreRecovery.java | 4 +- .../recovery/PeerRecoverySourceService.java | 2 +- .../recovery/PeerRecoveryTargetService.java | 22 +-- ...yPrepareForTranslogOperationsResponse.java | 54 ++++++ ...=> RecoveryPrepareForTranslogRequest.java} | 23 ++- .../recovery/RecoverySourceHandler.java | 169 +++++++++++++----- .../indices/recovery/RecoveryTarget.java | 35 +++- .../recovery/RecoveryTargetHandler.java | 12 +- .../recovery/RemoteRecoveryTargetHandler.java | 10 +- .../recovery/StartRecoveryRequest.java | 47 +++-- .../index/engine/InternalEngineTests.java | 2 +- .../IndexLevelReplicationTests.java | 7 +- .../index/shard/IndexShardTests.java | 32 ++-- .../PeerRecoverySourceServiceTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 20 ++- .../recovery/RecoverySourceHandlerTests.java | 23 ++- .../indices/recovery/RecoveryTests.java | 6 +- .../recovery/StartRecoveryRequestTests.java | 32 ++-- .../index/shard/IndexShardTestCase.java | 9 +- .../indices/recovery/AsyncRecoveryTarget.java | 6 +- .../index/engine/FollowingEngineTests.java | 3 +- 25 files changed, 382 insertions(+), 174 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java rename server/src/main/java/org/elasticsearch/indices/recovery/{RecoveryPrepareForTranslogOperationsRequest.java => RecoveryPrepareForTranslogRequest.java} (71%) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e21b816aefd80..103817224790c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1877,11 +1877,6 @@ public interface Warmer { */ public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException; - /** - * Do not replay translog operations, but make the engine be ready. - */ - public abstract void skipTranslogRecovery(); - /** * Returns true iff this engine is currently recovering from translog. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dd4b00f2a3f7f..e83b1dc5cbbad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -447,12 +447,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover return this; } - @Override - public void skipTranslogRecovery() { - assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; - pendingTranslogRecovery.set(false); // we are good - now we can commit - } - private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 79c8331061636..5092f19a5d921 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -444,10 +444,6 @@ public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryR return this; } - @Override - public void skipTranslogRecovery() { - } - @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) { } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2372353ef85c1..a809c5ec30de7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1393,8 +1393,10 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat /** * opens the engine on top of the existing lucene engine and translog. * Operations from the translog will be replayed to bring lucene up to date. + * + * @param recoverUpToSeqNo the upper bound of sequence number to be recovered (inclusive) **/ - public void openEngineAndRecoverFromTranslog() throws IOException { + public void openEngineAndRecoverFromTranslog(long recoverUpToSeqNo) throws IOException { final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); @@ -1403,16 +1405,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { translogRecoveryStats::incrementRecoveredOperations); }; innerOpenEngineAndTranslog(); - getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); - } - - /** - * Opens the engine on top of the existing lucene engine and translog. - * The translog is kept but its operations won't be replayed. - */ - public void openEngineAndSkipTranslogRecovery() throws IOException { - innerOpenEngineAndTranslog(); - getEngine().skipTranslogRecovery(); + getEngine().recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); } private void innerOpenEngineAndTranslog() throws IOException { @@ -1788,8 +1781,10 @@ public List segments(boolean verbose) { return getEngine().segments(verbose); } - public void flushAndCloseEngine() throws IOException { - getEngine().flushAndClose(); + public void closeEngine() throws IOException { + synchronized (mutex) { + IOUtils.close(this.currentEngineReference.getAndSet(null)); + } } public String getHistoryUUID() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index fae3703027f9e..05b894e156c90 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -422,7 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe store.associateIndexWithNewTranslog(translogUUID); writeEmptyRetentionLeasesFile(indexShard); } - indexShard.openEngineAndRecoverFromTranslog(); + indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store"); @@ -480,7 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; writeEmptyRetentionLeasesFile(indexShard); - indexShard.openEngineAndRecoverFromTranslog(); + indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index f53e8edecd9e6..3e02160c3458c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -71,7 +71,7 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi this.transportService = transportService; this.indicesService = indicesService; this.recoverySettings = recoverySettings; - transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC, + transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new, new StartRecoveryTransportRequestHandler()); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 6b1a893667f2c..630da951ab1a5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -54,7 +54,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.elasticsearch.tasks.Task; @@ -119,7 +118,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC, RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler()); transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC, - RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler()); + RecoveryPrepareForTranslogRequest::new, new PrepareForTranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new, new TranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new @@ -345,9 +344,10 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); + final long globalCheckpoint = recoveryTarget.readGlobalCheckpointFromTranslog(); final long startingSeqNo; if (metadataSnapshot.size() > 0) { - startingSeqNo = getStartingSeqNo(logger, recoveryTarget); + startingSeqNo = getStartingSeqNo(logger, recoveryTarget, globalCheckpoint); } else { startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; } @@ -370,7 +370,8 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId(), - startingSeqNo); + startingSeqNo, + globalCheckpoint); return request; } @@ -381,11 +382,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove * @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number * failed */ - public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) { + public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget, final long globalCheckpoint) { try { final Store store = recoveryTarget.store(); - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID); final List existingCommits = DirectoryReader.listCommits(store.directory()); final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit); @@ -424,14 +423,15 @@ public interface RecoveryListener { void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure); } - class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { + class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { @Override - public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { + public void messageReceived(RecoveryPrepareForTranslogRequest request, TransportChannel channel, Task task) { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); + final ActionListener listener = new ChannelActionListener<>( + channel, Actions.PREPARE_TRANSLOG, request); recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), - ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); + request.recoverUpToSeqNo(), ActionListener.map(listener, RecoveryPrepareForTranslogOperationsResponse::new)); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java new file mode 100644 index 0000000000000..d573bdce45424 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Optional; + +final class RecoveryPrepareForTranslogOperationsResponse extends TransportResponse { + final Optional targetMetadata; + + RecoveryPrepareForTranslogOperationsResponse(Optional targetMetadata) { + this.targetMetadata = targetMetadata; + } + + RecoveryPrepareForTranslogOperationsResponse(final StreamInput in) throws IOException { + super(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + targetMetadata = Optional.ofNullable(in.readOptionalWriteable(Store.MetadataSnapshot::new)); + } else { + targetMetadata = Optional.empty(); + } + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(targetMetadata.orElse(null)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogRequest.java similarity index 71% rename from server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java rename to server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogRequest.java index 6e2557176a82e..ecd3b0fd0d7c0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogRequest.java @@ -19,33 +19,43 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; -class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { +final class RecoveryPrepareForTranslogRequest extends TransportRequest { private final long recoveryId; private final ShardId shardId; private final int totalTranslogOps; private final boolean fileBasedRecovery; + private final long recoverUpToSeqNo; - RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) { + RecoveryPrepareForTranslogRequest(long recoveryId, ShardId shardId, int totalTranslogOps, + boolean fileBasedRecovery, long recoverUpToSeqNo) { this.recoveryId = recoveryId; this.shardId = shardId; this.totalTranslogOps = totalTranslogOps; this.fileBasedRecovery = fileBasedRecovery; + this.recoverUpToSeqNo = recoverUpToSeqNo; } - RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException { + RecoveryPrepareForTranslogRequest(StreamInput in) throws IOException { super.readFrom(in); recoveryId = in.readLong(); shardId = new ShardId(in); totalTranslogOps = in.readVInt(); fileBasedRecovery = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + recoverUpToSeqNo = in.readZLong(); + } else { + recoverUpToSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } } public long recoveryId() { @@ -67,6 +77,10 @@ public boolean isFileBasedRecovery() { return fileBasedRecovery; } + public long recoverUpToSeqNo() { + return recoverUpToSeqNo; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -74,5 +88,8 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); out.writeVInt(totalTranslogOps); out.writeBoolean(fileBasedRecovery); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeZLong(recoverUpToSeqNo); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index fdada82c5bc56..ae7760981f8f7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -69,6 +69,7 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -155,45 +156,55 @@ public void recoverToTarget(ActionListener listener) { shard, cancellableThreads, logger); final Closeable retentionLock = shard.acquireRetentionLock(); resources.add(retentionLock); - final long startingSeqNo; - final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && - isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); - final SendFileResult sendFileResult; - if (isSequenceNumberBasedRecovery) { - logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); - startingSeqNo = request.startingSeqNo(); - sendFileResult = SendFileResult.EMPTY; - } else { - final Engine.IndexCommitRef phase1Snapshot; - try { - phase1Snapshot = shard.acquireSafeIndexCommit(); - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); - } - // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will - // still filter out legacy operations without seqNo. - startingSeqNo = 0; - try { - final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); - sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps); - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); - } finally { + + final StepListener prepareTargetStep = new StepListener<>(); + prepareTargetForSequenceBasedRecovery(prepareTargetStep); + + final StepListener phase1Step = new StepListener<>(); + prepareTargetStep.whenComplete(r -> ActionListener.completeWith(phase1Step, () -> { + if (r.isSequenceNumberRecovery) { + logger.trace("performing sequence numbers based recovery. starting at [{}]", r.startingSeqNo); + return SendFileResult.EMPTY; + } else { + final Engine.IndexCommitRef phase1Snapshot; try { - IOUtils.close(phase1Snapshot); - } catch (final IOException ex) { - logger.warn("releasing snapshot caused exception", ex); + phase1Snapshot = shard.acquireSafeIndexCommit(); + } catch (final Exception e) { + throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); + } + try { + final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", r.startingSeqNo); + return phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), + () -> estimateNumOps, r.recoveryTargetMetadata); + } catch (final Exception e) { + throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); + } finally { + try { + IOUtils.close(phase1Snapshot); + } catch (final IOException ex) { + logger.warn("releasing snapshot caused exception", ex); + } } } - } - assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + }), onFailure); + + final StepListener startEngineStep = new StepListener<>(); + phase1Step.whenComplete(r -> { + if (prepareTargetStep.result().openEngineTime != null) { + // engine is started already since we have asked it to recover up to the global checkpoint + startEngineStep.onResponse(prepareTargetStep.result().openEngineTime); + } else { + startEngineOnTarget( + prepareTargetStep.result().isSequenceNumberRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", prepareTargetStep.result().startingSeqNo), + SequenceNumbers.UNASSIGNED_SEQ_NO, + ActionListener.map(startEngineStep, v -> v.tookTime)); + } + }, onFailure); - final StepListener prepareEngineStep = new StepListener<>(); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); final StepListener sendSnapshotStep = new StepListener<>(); - prepareEngineStep.whenComplete(prepareEngineTime -> { + startEngineStep.whenComplete(r -> { + final long startingSeqNo = prepareTargetStep.result().startingSeqNo; /* * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. * This means that any document indexed into the primary after this will be replicated to this replica as well @@ -221,24 +232,24 @@ public void recoverToTarget(ActionListener listener) { phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersionOnPrimary, sendSnapshotStep); sendSnapshotStep.whenComplete( - r -> IOUtils.close(phase2Snapshot), + v -> IOUtils.close(phase2Snapshot), e -> { IOUtils.closeWhileHandlingException(phase2Snapshot); onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)); }); - }, onFailure); final StepListener finalizeStep = new StepListener<>(); sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); finalizeStep.whenComplete(r -> { + final SendFileResult sendFileResult = phase1Step.result(); final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result(); final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, - prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); + startEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); try { wrappedListener.onResponse(response); } finally { @@ -295,6 +306,57 @@ public void onFailure(Exception e) { }); } + static final class PrepareTargetResult { + final boolean isSequenceNumberRecovery; + final long startingSeqNo; + final TimeValue openEngineTime; + final Store.MetadataSnapshot recoveryTargetMetadata; + + PrepareTargetResult(boolean isSequenceNumberRecovery, long startingSeqNo, + TimeValue openEngineTime, Store.MetadataSnapshot recoveryTargetMetadata) { + this.isSequenceNumberRecovery = isSequenceNumberRecovery; + this.startingSeqNo = startingSeqNo; + this.openEngineTime = openEngineTime; + this.recoveryTargetMetadata = recoveryTargetMetadata; + } + } + + /** + * Prepares the recovery target for sequence number based recovery if possible. If the recovery source has all required operations + * up to the {@link StartRecoveryRequest#startingSeqNo()}, then it can perform sequence number based recovery without coordinating + * with the recovery target. However, if its history is only up to the global checkpoint on the target, it needs to ask the target + * to locally recover up to the global checkpoint then perform a sequence number based recovery. If the target can recover properly, + * then it will leave its engine open for a sequence number recovery; otherwise it needs to close its engine, capture and send back + * the latest metadata to the recovery source for a file-based recovery. + */ + void prepareTargetForSequenceBasedRecovery(ActionListener listener) throws IOException { + if (request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory()) { + if (shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())) { + listener.onResponse(new PrepareTargetResult(true, request.startingSeqNo(), null, request.metadataSnapshot())); + } else { + final long startingSeqNo = request.globalCheckpoint() + 1; + if (request.startingSeqNo() < startingSeqNo && shard.hasCompleteHistoryOperations("peer-recovery", startingSeqNo)) { + startEngineOnTarget( + false, + shard.estimateNumberOfHistoryOperations("peer-recovery", request.globalCheckpoint()), + request.globalCheckpoint(), + ActionListener.map(listener, r -> { + if (r.newSnapshot == null) { + return new PrepareTargetResult(true, startingSeqNo, r.tookTime, request.metadataSnapshot()); + } else { + // the target does not have the required translog - fallback to file-based recovery with the new snapshot + return new PrepareTargetResult(false, 0, null, r.newSnapshot); + } + })); + } else { + listener.onResponse(new PrepareTargetResult(false, 0, null, request.metadataSnapshot())); + } + } + } else { + listener.onResponse(new PrepareTargetResult(false, 0, null, request.metadataSnapshot())); + } + } + static final class SendFileResult { final List phase1FileNames; final List phase1FileSizes; @@ -330,7 +392,9 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) { + SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, + final Supplier translogOps, final Store.MetadataSnapshot recoveryTargetMetadata) { + // TODO: pass new metadata cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; @@ -359,11 +423,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp recoverySourceMetadata.asMap().size() + " files", name); } } - if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) { + if (canSkipPhase1(recoverySourceMetadata, recoveryTargetMetadata) == false) { // Generate a "diff" of all the identical, different, and missing // segment files on the target node, using the existing files on // the source node - final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); + final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(recoveryTargetMetadata); for (StoreFileMetaData md : diff.identical) { phase1ExistingFileNames.add(md.name()); phase1ExistingFileSizes.add(md.length()); @@ -378,9 +442,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp phase1Files.addAll(diff.different); phase1Files.addAll(diff.missing); for (StoreFileMetaData md : phase1Files) { - if (request.metadataSnapshot().asMap().containsKey(md.name())) { + if (recoveryTargetMetadata.asMap().containsKey(md.name())) { logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", - md.name(), request.metadataSnapshot().asMap().get(md.name()), md); + md.name(), recoveryTargetMetadata.asMap().get(md.name()), md); } else { logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); } @@ -478,21 +542,32 @@ boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot targ return true; } - void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + static final class StartEngineResult { + final TimeValue tookTime; + final Store.MetadataSnapshot newSnapshot; + + StartEngineResult(TimeValue tookTime, Store.MetadataSnapshot newSnapshot) { + this.tookTime = tookTime; + this.newSnapshot = newSnapshot; + } + } + + void startEngineOnTarget(boolean fileBasedRecovery, int totalTranslogOps, + long recoverUpToSeqNo, ActionListener listener) { StopWatch stopWatch = new StopWatch().start(); - final ActionListener wrappedListener = ActionListener.wrap( - nullVal -> { + final ActionListener> wrappedListener = ActionListener.wrap( + r -> { stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); - listener.onResponse(tookTime); + listener.onResponse(new StartEngineResult(tookTime, r.orElse(null))); }, e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e))); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. logger.trace("recovery [phase1]: prepare remote engine for translog"); cancellableThreads.execute(() -> - recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener)); + recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, wrappedListener)); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b3c6d12ab96e3..285fdb1723215 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -47,10 +47,11 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogCorruptedException; import java.io.IOException; -import java.nio.file.Path; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -284,11 +285,21 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener> listener) { ActionListener.completeWith(listener, () -> { - state().getTranslog().totalOperations(totalTranslogOps); - indexShard().openEngineAndSkipTranslogRecovery(); - return null; + if (Assertions.ENABLED && recoverUpToSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + final long gcp = readGlobalCheckpointFromTranslog(); + assert recoverUpToSeqNo == gcp : recoverUpToSeqNo + " != " + gcp; + } + indexShard().openEngineAndRecoverFromTranslog(recoverUpToSeqNo); + if (indexShard.getLocalCheckpoint() < recoverUpToSeqNo) { + // This can happen if the previous recovery was aborted after copying segments, then the target does not have all translog. + indexShard.closeEngine(); + return Optional.of(indexShard.snapshotStoreMetadata()); + } + state().getTranslog().totalOperations(state().getTranslog().totalOperations() + totalTranslogOps); + return Optional.empty(); }); } @@ -458,7 +469,17 @@ public String getTempNameForFile(String origFile) { return multiFileWriter.getTempNameForFile(origFile); } - Path translogLocation() { - return indexShard().shardPath().resolveTranslog(); + long readGlobalCheckpointFromTranslog() { + try { + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + return Translog.readGlobalCheckpoint(indexShard.shardPath().resolveTranslog(), translogUUID); + } catch (final TranslogCorruptedException | IOException e) { + /* + * This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the + * translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and + * proceeds to attempt a sequence-number-based recovery. + */ + return SequenceNumbers.UNASSIGNED_SEQ_NO; + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index d03fe42d90146..5934a212fc749 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -28,16 +28,24 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; public interface RecoveryTargetHandler { /** - * Prepares the target to receive translog operations, after all file have been copied + * Starts an engine an target and recovers locally up to the given sequence number (inclusive). If the target can't recover up to + * {@code recoverUpToSeqNo}, it has to close the engine and send back the latest index commit; otherwise it leaves the engine open. * * @param fileBasedRecovery whether or not this call is part of an file based recovery * @param totalTranslogOps total translog operations expected to be sent + * @param recoverUpToSeqNo the upper bound sequence number (inclusive) that the recovery target should + * recover locally after opening its engine. + * @param listener if the engine is started and recovered properly up to @{code recoverUpToSeqNo}, this listener contains + * a null value; otherwise it's the latest metadata snapshot on the target. The recovery source needs to + * use this latest value for a file-based recovery. */ - void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener); + void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener> listener); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index ec3c22d42a191..56207ae7834f5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -77,12 +78,13 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe } @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener> listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), + new RecoveryPrepareForTranslogRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery, recoverUpToSeqNo), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), - in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); + new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.targetMetadata), + RecoveryPrepareForTranslogOperationsResponse::new, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 4ec20d17ac5be..a9bb042cc8759 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -34,17 +35,15 @@ */ public class StartRecoveryRequest extends TransportRequest { - private long recoveryId; - private ShardId shardId; - private String targetAllocationId; - private DiscoveryNode sourceNode; - private DiscoveryNode targetNode; - private Store.MetadataSnapshot metadataSnapshot; - private boolean primaryRelocation; - private long startingSeqNo; - - public StartRecoveryRequest() { - } + private final long recoveryId; + private final ShardId shardId; + private final String targetAllocationId; + private final DiscoveryNode sourceNode; + private final DiscoveryNode targetNode; + private final Store.MetadataSnapshot metadataSnapshot; + private final boolean primaryRelocation; + private final long startingSeqNo; + private final long globalCheckpoint; /** * Construct a request for starting a peer recovery. @@ -56,7 +55,8 @@ public StartRecoveryRequest() { * @param metadataSnapshot the Lucene metadata * @param primaryRelocation whether or not the recovery is a primary relocation * @param recoveryId the recovery ID - * @param startingSeqNo the starting sequence number + * @param startingSeqNo the starting sequence number that the recovery target requires for a operation-based recovery + * @param globalCheckpoint the persisted global checkpoint on target */ public StartRecoveryRequest(final ShardId shardId, final String targetAllocationId, @@ -65,7 +65,9 @@ public StartRecoveryRequest(final ShardId shardId, final Store.MetadataSnapshot metadataSnapshot, final boolean primaryRelocation, final long recoveryId, - final long startingSeqNo) { + final long startingSeqNo, + final long globalCheckpoint) { + assert startingSeqNo <= (globalCheckpoint + 1) : startingSeqNo + " > " + (globalCheckpoint + 1); this.recoveryId = recoveryId; this.shardId = shardId; this.targetAllocationId = targetAllocationId; @@ -74,6 +76,7 @@ public StartRecoveryRequest(final ShardId shardId, this.metadataSnapshot = metadataSnapshot; this.primaryRelocation = primaryRelocation; this.startingSeqNo = startingSeqNo; + this.globalCheckpoint = globalCheckpoint; assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null : "starting seq no is set but not history uuid"; } @@ -110,9 +113,12 @@ public long startingSeqNo() { return startingSeqNo; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public long globalCheckpoint() { + return globalCheckpoint; + } + + StartRecoveryRequest(StreamInput in) throws IOException { + super(in); recoveryId = in.readLong(); shardId = new ShardId(in); targetAllocationId = in.readString(); @@ -121,6 +127,12 @@ public void readFrom(StreamInput in) throws IOException { metadataSnapshot = new Store.MetadataSnapshot(in); primaryRelocation = in.readBoolean(); startingSeqNo = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + globalCheckpoint = in.readZLong(); + assert startingSeqNo <= (globalCheckpoint + 1) : startingSeqNo + " > " + (globalCheckpoint + 1); + } else { + globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; + } } @Override @@ -134,5 +146,8 @@ public void writeTo(StreamOutput out) throws IOException { metadataSnapshot.writeTo(out); out.writeBoolean(primaryRelocation); out.writeLong(startingSeqNo); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeZLong(globalCheckpoint); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 59bbee9f1bbf5..e66757a64ed6f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3022,7 +3022,7 @@ public void testSkipTranslogReplay() throws IOException { assertVisibleCount(engine, numDocs); engine.close(); try (InternalEngine engine = new InternalEngine(config)) { - engine.skipTranslogRecovery(); + engine.recoverFromTranslog(translogHandler, UNASSIGNED_SEQ_NO); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits.value, equalTo(0L)); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e25557eaabcf6..7513714099dbc 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; @@ -197,14 +198,14 @@ public IndexResult index(Index op) throws IOException { Future fut = shards.asyncRecoverReplica(replica, (shard, node) -> new RecoveryTarget(shard, node, recoveryListener) { @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, - ActionListener listener) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener> listener) { try { indexedOnPrimary.await(); } catch (InterruptedException e) { throw new AssertionError(e); } - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener); } }); fut.get(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7e284aa4f73e9..0d5d00ea331dd 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -144,6 +144,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -2655,7 +2656,7 @@ public void testShardActiveDuringInternalRecovery() throws IOException { shard.prepareForIndexRecovery(); // Shard is still inactive since we haven't started recovering yet assertFalse(shard.isActive()); - shard.openEngineAndRecoverFromTranslog(); + shard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE); // Shard should now be active since we did recover: assertTrue(shard.isActive()); closeShards(shard); @@ -2739,8 +2740,9 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener> listener) { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, ActionListener.wrap( r -> { assertListenerCalled.accept(replica); @@ -3812,11 +3814,13 @@ public void testCloseShardWhileResettingEngine() throws Exception { @Override public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { - readyToCloseLatch.countDown(); - try { - closeDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + if (recoverUpToSeqNo != UNASSIGNED_SEQ_NO) { + readyToCloseLatch.countDown(); + try { + closeDoneLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); } @@ -3867,11 +3871,13 @@ public void testSnapshotWhileResettingEngine() throws Exception { public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); - readyToSnapshotLatch.countDown(); - try { - snapshotDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + if (recoverUpToSeqNo != UNASSIGNED_SEQ_NO) { + readyToSnapshotLatch.countDown(); + try { + snapshotDoneLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } return internalEngine; } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 72eb2baeca942..3b4baf9c163b4 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -42,7 +42,7 @@ public void testDuplicateRecoveries() throws IOException { new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), - SequenceNumbers.UNASSIGNED_SEQ_NO); + SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.NO_OPS_PERFORMED); RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bb4c25e6186de..a7a9b3a2e12d4 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -57,12 +57,13 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { public void testGetStartingSeqNo() throws Exception { final IndexShard replica = newShard(false); + long globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; try { // Empty store { recoveryEmptyReplica(replica, true); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), equalTo(0L)); recoveryTarget.decRef(); } // Last commit is good - use it. @@ -75,10 +76,11 @@ public void testGetStartingSeqNo() throws Exception { } } flushShard(replica); - replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); + globalCheckpoint = initDocs - 1; + replica.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); replica.sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), equalTo(initDocs)); recoveryTarget.decRef(); } // Global checkpoint does not advance, last commit is not good - use the previous commit @@ -92,15 +94,17 @@ public void testGetStartingSeqNo() throws Exception { } flushShard(replica); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), equalTo(initDocs)); recoveryTarget.decRef(); } // Advances the global checkpoint, a safe commit also advances { - replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); + globalCheckpoint = initDocs + moreDocs - 1; + replica.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); replica.sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint), + equalTo(initDocs + moreDocs)); recoveryTarget.decRef(); } // Different translogUUID, fallback to file-based @@ -119,7 +123,9 @@ public void testGetStartingSeqNo() throws Exception { writer.commit(); } final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(recoveryTarget.readGlobalCheckpointFromTranslog(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, SequenceNumbers.NO_OPS_PERFORMED), + equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); recoveryTarget.decRef(); } } finally { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index b00e89575ccd5..6a1c51e9a3e58 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -50,7 +50,6 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; @@ -92,6 +91,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -203,6 +203,8 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException { Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : new Store.MetadataSnapshot(Collections.emptyMap(), Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100)); + final long startingSeqNo = randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong(); return new StartRecoveryRequest( shardId, null, @@ -211,8 +213,8 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException { metadataSnapshot, randomBoolean(), randomNonNegativeLong(), - randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + startingSeqNo, + startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ? startingSeqNo : randomLongBetween(startingSeqNo, Long.MAX_VALUE)); } public void testSendSnapshotSendsOps() throws IOException { @@ -478,15 +480,17 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) { + SendFileResult phase1(IndexCommit snapshot, long globalCheckpoint, Supplier translogOps, + Store.MetadataSnapshot recoveryTargetMetadata) { phase1Called.set(true); - return super.phase1(snapshot, globalCheckpoint, translogOps); + return super.phase1(snapshot, globalCheckpoint, translogOps, recoveryTargetMetadata); } @Override - void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + void startEngineOnTarget(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener listener) { prepareTargetForTranslogCalled.set(true); - super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener); + super.startEngineOnTarget(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener); } @Override @@ -729,8 +733,11 @@ private List generateFiles(Store store, int numFiles, IntSupp } class TestRecoveryTargetHandler implements RecoveryTargetHandler { + @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, + long recoverUpToSeqNo, ActionListener> listener) { + } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index c3f6a3aae89fb..031fff8b7fe18 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -330,8 +331,9 @@ public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData()); recoverReplica(replicaShard, primaryShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long recoverUpToSeqNo, + ActionListener> listener) { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener); assertThat(replicaShard.getLastKnownGlobalCheckpoint(), equalTo(primaryShard.getLastKnownGlobalCheckpoint())); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index e77bf5f8d4ae5..52097de47051b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -46,6 +46,10 @@ public void testSerialization() throws Exception { Store.MetadataSnapshot metadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : new Store.MetadataSnapshot(Collections.emptyMap(), Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), randomIntBetween(0, 100)); + final long startingSeqNo = randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? + SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong(); + final long globalCheckpoint = startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ? SequenceNumbers.UNASSIGNED_SEQ_NO + : randomLongBetween(startingSeqNo - 1, Long.MAX_VALUE); final StartRecoveryRequest outRequest = new StartRecoveryRequest( new ShardId("test", "_na_", 0), UUIDs.randomBase64UUID(), @@ -54,8 +58,8 @@ public void testSerialization() throws Exception { metadataSnapshot, randomBoolean(), randomNonNegativeLong(), - randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()); + startingSeqNo, + globalCheckpoint); final ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); final OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); @@ -65,17 +69,21 @@ public void testSerialization() throws Exception { final ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); in.setVersion(targetNodeVersion); - final StartRecoveryRequest inRequest = new StartRecoveryRequest(); - inRequest.readFrom(in); + final StartRecoveryRequest inRequest = new StartRecoveryRequest(in); - assertThat(outRequest.shardId(), equalTo(inRequest.shardId())); - assertThat(outRequest.targetAllocationId(), equalTo(inRequest.targetAllocationId())); - assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode())); - assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode())); - assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap())); - assertThat(outRequest.isPrimaryRelocation(), equalTo(inRequest.isPrimaryRelocation())); - assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId())); - assertThat(outRequest.startingSeqNo(), equalTo(inRequest.startingSeqNo())); + assertThat(inRequest.shardId(), equalTo(outRequest.shardId())); + assertThat(inRequest.targetAllocationId(), equalTo(outRequest.targetAllocationId())); + assertThat(inRequest.sourceNode(), equalTo(outRequest.sourceNode())); + assertThat(inRequest.targetNode(), equalTo(outRequest.targetNode())); + assertThat(inRequest.metadataSnapshot().asMap(), equalTo(outRequest.metadataSnapshot().asMap())); + assertThat(inRequest.isPrimaryRelocation(), equalTo(outRequest.isPrimaryRelocation())); + assertThat(inRequest.recoveryId(), equalTo(outRequest.recoveryId())); + assertThat(inRequest.startingSeqNo(), equalTo(startingSeqNo)); + if (targetNodeVersion.onOrAfter(Version.V_8_0_0)) { + assertThat(inRequest.globalCheckpoint(), equalTo(globalCheckpoint)); + } else { + assertThat(inRequest.globalCheckpoint(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4b5be29205778..89c25ca764ca4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -620,15 +620,18 @@ protected final void recoverUnstartedReplica(final IndexShard replica, final String targetAllocationId = recoveryTarget.indexShard().routingEntry().allocationId().getId(); final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica); + final long globalCheckpoint; final long startingSeqNo; if (snapshot.size() > 0) { - startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget); + final String translogUUID = replica.store().readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + globalCheckpoint = Translog.readGlobalCheckpoint(replica.shardPath().resolveTranslog(), translogUUID); + startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget, globalCheckpoint); } else { + globalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; } - final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, - pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); + pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo, globalCheckpoint); final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index d5a7ab8109e12..c32cd9b0070e6 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** @@ -46,8 +47,9 @@ public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { } @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { - executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener)); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, + long recoverUpToSeqNo, ActionListener> listener) { + executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, recoverUpToSeqNo, listener)); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 98bfa1b2068bb..045f9a010644b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -494,7 +494,8 @@ private void runFollowTest(CheckedBiConsumer