diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 4c17db2044e15..701dec069d946 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -902,7 +902,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { + if (checkpoints.get(shardRouting.allocationId().getId()).tracked && !indexSettings().isRemoteTranslogStoreEnabled()) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java new file mode 100644 index 0000000000000..9ffe61208b78c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.util.SetOnce; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.ThreadedActionListener; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.engine.RecoveryEngineException; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.seqno.RetentionLeaseNotFoundException; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transports; + +import java.io.Closeable; +import java.io.IOException; +import java.util.function.Consumer; + +/** + * This handler is used for node-to-node peer recovery when the recovery target is a replica/ or a relocating primary + * shard with translog backed by local store. + * + * @opensearch.internal + */ +public class LocalStorePeerRecoverySourceHandler extends RecoverySourceHandler { + + public LocalStorePeerRecoverySourceHandler( + IndexShard shard, + RecoveryTargetHandler recoveryTarget, + ThreadPool threadPool, + StartRecoveryRequest request, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks, + int maxConcurrentOperations + ) { + super(shard, recoveryTarget, threadPool, request, fileChunkSizeInBytes, maxConcurrentFileChunks, maxConcurrentOperations); + } + + @Override + protected void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException { + final SetOnce retentionLeaseRef = new SetOnce<>(); + + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying recovery of {} as it is not listed as assigned to target node {}", + request.shardId(), + request.targetNode() + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; + retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); + }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); + final Closeable retentionLock = shard.acquireHistoryRetentionLock(); + resources.add(retentionLock); + final long startingSeqNo; + final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && isTargetSameHistory() + && shard.hasCompleteHistoryOperations(PEER_RECOVERY_NAME, request.startingSeqNo()) + && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) + || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); + // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, + // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's + // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold. + // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery + // without having a complete history. + + if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { + // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock + retentionLock.close(); + logger.trace("history is retained by {}", retentionLeaseRef.get()); + } else { + // all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations() + // and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's + // local checkpoint will be retained for the duration of this recovery. + logger.trace("history is retained by retention lock"); + } + + final StepListener sendFileStep = new StepListener<>(); + final StepListener prepareEngineStep = new StepListener<>(); + final StepListener sendSnapshotStep = new StepListener<>(); + final StepListener finalizeStep = new StepListener<>(); + + if (isSequenceNumberBasedRecovery) { + logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); + startingSeqNo = request.startingSeqNo(); + if (retentionLeaseRef.get() == null) { + createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); + } else { + sendFileStep.onResponse(SendFileResult.EMPTY); + } + } else { + final GatedCloseable wrappedSafeCommit; + try { + wrappedSafeCommit = acquireSafeCommit(shard); + resources.add(wrappedSafeCommit); + } catch (final Exception e) { + throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); + } + + // Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being + // able to recover other replicas using operations-based recoveries. If we are not using retention leases then we + // conservatively copy all available operations. If we are using retention leases then "enough operations" is just the + // operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains + // at least as much history as anything else. The safe commit will often contain all the history retained by the current set + // of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a + // retention lease for some history that this primary already discarded, since we discard history when the global checkpoint + // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can + // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled + // down. + startingSeqNo = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; + logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); + + try { + final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo); + final Releasable releaseStore = acquireStore(shard.store()); + resources.add(releaseStore); + onSendFileStepComplete(sendFileStep, wrappedSafeCommit, releaseStore); + + final StepListener deleteRetentionLeaseStep = new StepListener<>(); + RunUnderPrimaryPermit.run(() -> { + try { + // If the target previously had a copy of this shard then a file-based recovery might move its global + // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a + // new one later on in the recovery. + shard.removePeerRecoveryRetentionLease( + request.targetNode().getId(), + new ThreadedActionListener<>( + logger, + shard.getThreadPool(), + ThreadPool.Names.GENERIC, + deleteRetentionLeaseStep, + false + ) + ); + } catch (RetentionLeaseNotFoundException e) { + logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); + deleteRetentionLeaseStep.onResponse(null); + } + }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); + + deleteRetentionLeaseStep.whenComplete(ignored -> { + assert Transports.assertNotTransportThread(this + "[phase1]"); + phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep, false); + }, onFailure); + + } catch (final Exception e) { + throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); + } + } + assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(this + "[phase2]"); + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); + + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + if (logger.isTraceEnabled()) { + logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( + PEER_RECOVERY_NAME, + startingSeqNo, + Long.MAX_VALUE, + false, + true + ); + resources.add(phase2Snapshot); + retentionLock.close(); + + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + final RetentionLeases retentionLeases = shard.getRetentionLeases(); + final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); + phase2( + startingSeqNo, + endingSeqNo, + phase2Snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersionOnPrimary, + sendSnapshotStep + ); + + }, onFailure); + finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java index a1cf78920cf7e..8bea14a1a1c86 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java @@ -378,15 +378,7 @@ private Tuple createRecovery recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime) ); - handler = new RecoverySourceHandler( - shard, - recoveryTarget, - shard.getThreadPool(), - request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks(), - recoverySettings.getMaxConcurrentOperations() - ); + handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings); return Tuple.tuple(handler, recoveryTarget); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 505d3c7adfb3f..03d1066ae5a60 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -39,15 +39,12 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.SetOnce; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.StopWatch; import org.opensearch.common.concurrent.GatedCloseable; @@ -62,7 +59,6 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.engine.RecoveryEngineException; -import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseNotFoundException; import org.opensearch.index.seqno.RetentionLeases; @@ -97,7 +93,7 @@ * RecoverySourceHandler handles the three phases of shard recovery, which is * everything relating to copying the segment files as well as sending translog * operations across the wire once the segments have been copied. - * + *

* Note: There is always one source handler per recovery that handles all the * file and translog transfer. This handler is completely isolated from other recoveries * while the {@link RateLimiter} passed via {@link RecoverySettings} is shared across recoveries @@ -106,25 +102,25 @@ * * @opensearch.internal */ -public class RecoverySourceHandler { +public abstract class RecoverySourceHandler { protected final Logger logger; // Shard that is going to be recovered (the "source") - private final IndexShard shard; - private final int shardId; + protected final IndexShard shard; + protected final int shardId; // Request containing source and target node information - private final StartRecoveryRequest request; + protected final StartRecoveryRequest request; private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentOperations; private final ThreadPool threadPool; - private final CancellableThreads cancellableThreads = new CancellableThreads(); - private final List resources = new CopyOnWriteArrayList<>(); - private final ListenableFuture future = new ListenableFuture<>(); + protected final CancellableThreads cancellableThreads = new CancellableThreads(); + protected final List resources = new CopyOnWriteArrayList<>(); + protected final ListenableFuture future = new ListenableFuture<>(); public static final String PEER_RECOVERY_NAME = "peer-recovery"; private final SegmentFileTransferHandler transferHandler; - public RecoverySourceHandler( + RecoverySourceHandler( IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, @@ -183,251 +179,70 @@ public void recoverToTarget(ActionListener listener) { throw e; }); final Consumer onFailure = e -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]"); + assert Transports.assertNotTransportThread(this + "[onFailure]"); IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); }; + innerRecoveryToTarget(listener, onFailure); + } catch (Exception e) { + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + } + } - final SetOnce retentionLeaseRef = new SetOnce<>(); + protected abstract void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) + throws IOException; - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying recovery of {} as it is not listed as assigned to target node {}", - request.shardId(), - request.targetNode() - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set( - shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) - ); - }, - shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", - shard, - cancellableThreads, - logger + protected void finalizeStepAndCompleteFuture( + long startingSeqNo, + StepListener sendSnapshotStep, + StepListener sendFileStep, + StepListener prepareEngineStep, + Consumer onFailure + ) { + final StepListener finalizeStep = new StepListener<>(); + // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 + final long trimAboveSeqNo = startingSeqNo - 1; + sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure); + + finalizeStep.whenComplete(r -> { + final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time + final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result(); + final SendFileResult sendFileResult = sendFileStep.result(); + final RecoveryResponse response = new RecoveryResponse( + sendFileResult.phase1FileNames, + sendFileResult.phase1FileSizes, + sendFileResult.phase1ExistingFileNames, + sendFileResult.phase1ExistingFileSizes, + sendFileResult.totalSize, + sendFileResult.existingTotalSize, + sendFileResult.took.millis(), + phase1ThrottlingWaitTime, + prepareEngineStep.result().millis(), + sendSnapshotResult.sentOperations, + sendSnapshotResult.tookTime.millis() ); - final Closeable retentionLock = shard.acquireHistoryRetentionLock(); - resources.add(retentionLock); - final long startingSeqNo; - final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO - && isTargetSameHistory() - && shard.hasCompleteHistoryOperations(PEER_RECOVERY_NAME, request.startingSeqNo()) - && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) - || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); - // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, - // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's - // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold. - // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery - // without having a complete history. - - if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { - // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock - retentionLock.close(); - logger.trace("history is retained by {}", retentionLeaseRef.get()); - } else { - // all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations() - // and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's - // local checkpoint will be retained for the duration of this recovery. - logger.trace("history is retained by retention lock"); - } - - final StepListener sendFileStep = new StepListener<>(); - final StepListener prepareEngineStep = new StepListener<>(); - final StepListener sendSnapshotStep = new StepListener<>(); - final StepListener finalizeStep = new StepListener<>(); - - if (isSequenceNumberBasedRecovery) { - logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); - startingSeqNo = request.startingSeqNo(); - if (retentionLeaseRef.get() == null) { - createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); - } else { - sendFileStep.onResponse(SendFileResult.EMPTY); - } - } else { - final GatedCloseable wrappedSafeCommit; - try { - wrappedSafeCommit = acquireSafeCommit(shard); - resources.add(wrappedSafeCommit); - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); - } - - // Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being - // able to recover other replicas using operations-based recoveries. If we are not using retention leases then we - // conservatively copy all available operations. If we are using retention leases then "enough operations" is just the - // operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains - // at least as much history as anything else. The safe commit will often contain all the history retained by the current set - // of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a - // retention lease for some history that this primary already discarded, since we discard history when the global checkpoint - // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can - // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled - // down. - startingSeqNo = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; - logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); - - try { - final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo); - final Releasable releaseStore = acquireStore(shard.store()); - resources.add(releaseStore); - sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> { - try { - IOUtils.close(wrappedSafeCommit, releaseStore); - } catch (final IOException ex) { - logger.warn("releasing snapshot caused exception", ex); - } - }); - - final StepListener deleteRetentionLeaseStep = new StepListener<>(); - RunUnderPrimaryPermit.run(() -> { - try { - // If the target previously had a copy of this shard then a file-based recovery might move its global - // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a - // new one later on in the recovery. - shard.removePeerRecoveryRetentionLease( - request.targetNode().getId(), - new ThreadedActionListener<>( - logger, - shard.getThreadPool(), - ThreadPool.Names.GENERIC, - deleteRetentionLeaseStep, - false - ) - ); - } catch (RetentionLeaseNotFoundException e) { - logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); - deleteRetentionLeaseStep.onResponse(null); - } - }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); - - deleteRetentionLeaseStep.whenComplete(ignored -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); - phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep); - }, onFailure); - - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); - } + try { + future.onResponse(response); + } finally { + IOUtils.close(resources); } - assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - - boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false - && shard.isRemoteTranslogEnabled(); - - if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { - sendFileStep.whenComplete(r -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(0, prepareEngineStep); - }, onFailure); - - prepareEngineStep.whenComplete(prepareEngineTime -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); - RunUnderPrimaryPermit.run( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), - shard, - cancellableThreads, - logger - ); - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - retentionLock.close(); - sendSnapshotStep.onResponse(new SendSnapshotResult(endingSeqNo, 0, TimeValue.ZERO)); - }, onFailure); - } else { - sendFileStep.whenComplete(r -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); - }, onFailure); - - prepareEngineStep.whenComplete(prepareEngineTime -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - RunUnderPrimaryPermit.run( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), - shard, - cancellableThreads, - logger - ); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); - } - final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( - PEER_RECOVERY_NAME, - startingSeqNo, - Long.MAX_VALUE, - false, - true - ); - resources.add(phase2Snapshot); - retentionLock.close(); - - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - final RetentionLeases retentionLeases = shard.getRetentionLeases(); - final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); - phase2( - startingSeqNo, - endingSeqNo, - phase2Snapshot, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - sendSnapshotStep - ); + }, onFailure); + } - }, onFailure); + protected void onSendFileStepComplete( + StepListener sendFileStep, + GatedCloseable wrappedSafeCommit, + Releasable releaseStore + ) { + sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> { + try { + IOUtils.close(wrappedSafeCommit, releaseStore); + } catch (final IOException ex) { + logger.warn("releasing snapshot caused exception", ex); } - - // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 - final long trimAboveSeqNo = startingSeqNo - 1; - sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure); - - finalizeStep.whenComplete(r -> { - final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time - final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result(); - final SendFileResult sendFileResult = sendFileStep.result(); - final RecoveryResponse response = new RecoveryResponse( - sendFileResult.phase1FileNames, - sendFileResult.phase1FileSizes, - sendFileResult.phase1ExistingFileNames, - sendFileResult.phase1ExistingFileSizes, - sendFileResult.totalSize, - sendFileResult.existingTotalSize, - sendFileResult.took.millis(), - phase1ThrottlingWaitTime, - prepareEngineStep.result().millis(), - sendSnapshotResult.sentOperations, - sendSnapshotResult.tookTime.millis() - ); - try { - future.onResponse(response); - } finally { - IOUtils.close(resources); - } - }, onFailure); - } catch (Exception e) { - IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); - } + }); } - private boolean isTargetSameHistory() { + protected boolean isTargetSameHistory() { final String targetHistoryUUID = request.metadataSnapshot().getHistoryUUID(); assert targetHistoryUUID != null : "incoming target history missing"; return targetHistoryUUID.equals(shard.getHistoryUUID()); @@ -435,10 +250,11 @@ private boolean isTargetSameHistory() { /** * Counts the number of history operations from the starting sequence number - * @param startingSeqNo the starting sequence number to count; included - * @return number of history operations + * + * @param startingSeqNo the starting sequence number to count; included + * @return number of history operations */ - private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException { + protected int countNumberOfHistoryOperations(long startingSeqNo) throws IOException { return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE); } @@ -446,7 +262,7 @@ private int countNumberOfHistoryOperations(long startingSeqNo) throws IOExceptio * Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool. * We must never release the store using an interruptible thread as we can risk invalidating the node lock. */ - private Releasable acquireStore(Store store) { + protected Releasable acquireStore(Store store) { store.incRef(); return Releasables.releaseOnce(() -> runWithGenericThreadPool(store::decRef)); } @@ -456,7 +272,7 @@ private Releasable acquireStore(Store store) { * with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail). * This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool. */ - private GatedCloseable acquireSafeCommit(IndexShard shard) { + protected GatedCloseable acquireSafeCommit(IndexShard shard) { final GatedCloseable wrappedSafeCommit = shard.acquireSafeIndexCommit(); final AtomicBoolean closed = new AtomicBoolean(false); return new GatedCloseable<>(wrappedSafeCommit.get(), () -> { @@ -530,7 +346,13 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { cancellableThreads.checkForCancel(); final Store store = shard.store(); try { @@ -628,7 +450,12 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A listener::onFailure ); - sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); + // When doing peer recovery of remote store enabled replica, retention leases are not required. + if (skipCreateRetentionLeaseStep) { + sendFilesStep.whenComplete(r -> createRetentionLeaseStep.onResponse(null), listener::onFailure); + } else { + sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); + } createRetentionLeaseStep.whenComplete(retentionLease -> { final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java new file mode 100644 index 0000000000000..ea13ca18bbfca --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandlerFactory.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.index.shard.IndexShard; + +/** + * Factory that supplies {@link RecoverySourceHandler}. + * + * @opensearch.internal + */ +public class RecoverySourceHandlerFactory { + + public static RecoverySourceHandler create( + IndexShard shard, + RecoveryTargetHandler recoveryTarget, + StartRecoveryRequest request, + RecoverySettings recoverySettings + ) { + boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false && shard.isRemoteTranslogEnabled(); + if (isReplicaRecoveryWithRemoteTranslog) { + return new RemoteStorePeerRecoverySourceHandler( + shard, + recoveryTarget, + shard.getThreadPool(), + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks(), + recoverySettings.getMaxConcurrentOperations() + ); + } else { + return new LocalStorePeerRecoverySourceHandler( + shard, + recoveryTarget, + shard.getThreadPool(), + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks(), + recoverySettings.getMaxConcurrentOperations() + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java new file mode 100644 index 0000000000000..ff218ef71e397 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.apache.lucene.index.IndexCommit; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.engine.RecoveryEngineException; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transports; + +import java.io.IOException; +import java.util.function.Consumer; + +/** + * This handler is used when peer recovery target is a remote store enabled replica. + * + * @opensearch.internal + */ +public class RemoteStorePeerRecoverySourceHandler extends RecoverySourceHandler { + + public RemoteStorePeerRecoverySourceHandler( + IndexShard shard, + RecoveryTargetHandler recoveryTarget, + ThreadPool threadPool, + StartRecoveryRequest request, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks, + int maxConcurrentOperations + ) { + super(shard, recoveryTarget, threadPool, request, fileChunkSizeInBytes, maxConcurrentFileChunks, maxConcurrentOperations); + } + + @Override + protected void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException { + // A replica of an index with remote translog does not require the translogs locally and keeps receiving the + // updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed + // and there is no translog replay done. + + final StepListener sendFileStep = new StepListener<>(); + final StepListener prepareEngineStep = new StepListener<>(); + final StepListener sendSnapshotStep = new StepListener<>(); + + // It is always file based recovery while recovering replicas which are not relocating primary where the + // underlying indices are backed by remote store for storing segments and translog + + final GatedCloseable wrappedSafeCommit; + try { + wrappedSafeCommit = acquireSafeCommit(shard); + resources.add(wrappedSafeCommit); + } catch (final Exception e) { + throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); + } + + final long startingSeqNo = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; + logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); + + try { + final Releasable releaseStore = acquireStore(shard.store()); + resources.add(releaseStore); + onSendFileStepComplete(sendFileStep, wrappedSafeCommit, releaseStore); + + assert Transports.assertNotTransportThread(this + "[phase1]"); + phase1(wrappedSafeCommit.get(), startingSeqNo, () -> 0, sendFileStep, true); + } catch (final Exception e) { + throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); + } + assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; + + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(0, prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(this + "[phase2]"); + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + sendSnapshotStep.onResponse(new SendSnapshotResult(endingSeqNo, 0, TimeValue.ZERO)); + }, onFailure); + + finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 5d317693e02df..950ba047df19d 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; @@ -36,72 +37,6 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") .build(); - public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { - - // Step1 - Start primary, index docs and flush - shards.startPrimary(); - final IndexShard primary = shards.getPrimary(); - int numDocs = shards.indexDocs(randomIntBetween(10, 100)); - shards.flush(); - - // Step 2 - Start replica for recovery to happen, check both has same number of docs - final IndexShard replica1 = shards.addReplica(); - shards.startAll(); - assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); - - // Step 3 - Index more docs, run segment replication, check both have same number of docs - int moreDocs = shards.indexDocs(randomIntBetween(10, 100)); - primary.refresh("test"); - replicateSegments(primary, shards.getReplicas()); - assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); - - // Step 4 - Check both shard has expected number of doc count - assertDocCount(primary, numDocs + moreDocs); - assertDocCount(replica1, numDocs + moreDocs); - - // Step 5 - Start new replica, recovery happens, and check that new replica has docs upto last flush - final IndexShard replica2 = shards.addReplica(); - shards.startAll(); - assertDocCount(replica2, numDocs); - - // Step 6 - Segment replication, check all shards have same number of docs - replicateSegments(primary, shards.getReplicas()); - shards.assertAllEqual(numDocs + moreDocs); - } - } - - public void testNoTranslogHistoryTransferred() throws Exception { - try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { - - // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected - shards.startPrimary(); - final IndexShard primary = shards.getPrimary(); - int numDocs = shards.indexDocs(randomIntBetween(10, 100)); - shards.flush(); - List docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary); - int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); - assertEquals(moreDocs, getTranslog(primary).totalOperations()); - - // Step 2 - Start replica, recovery happens, check docs recovered till last flush - final IndexShard replica = shards.addReplica(); - shards.startAll(); - assertEquals(docIdAndSeqNosAfterFlush, getDocIdAndSeqNos(replica)); - assertDocCount(replica, numDocs); - assertEquals(NRTReplicationEngine.class, replica.getEngine().getClass()); - - // Step 3 - Check replica's translog has no operations - assertEquals(WriteOnlyTranslogManager.class, replica.getEngine().translogManager().getClass()); - WriteOnlyTranslogManager replicaTranslogManager = (WriteOnlyTranslogManager) replica.getEngine().translogManager(); - assertEquals(0, replicaTranslogManager.getTranslog().totalOperations()); - - // Adding this for close to succeed - shards.flush(); - replicateSegments(primary, shards.getReplicas()); - shards.assertAllEqual(numDocs + moreDocs); - } - } - public void testStartSequenceForReplicaRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { @@ -146,20 +81,23 @@ public void testStartSequenceForReplicaRecovery() throws Exception { null ); shards.addReplica(newReplicaShard); + AtomicBoolean assertDone = new AtomicBoolean(false); shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { @Override public IndexShard indexShard() { IndexShard idxShard = super.indexShard(); - // verify the starting sequence number while recovering a failed shard which has a valid last commit - long startingSeqNo = -1; - try { - startingSeqNo = Long.parseLong( - idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) - ); - } catch (IOException e) { - Assert.fail(); + if (assertDone.compareAndSet(false, true)) { + // verify the starting sequence number while recovering a failed shard which has a valid last commit + long startingSeqNo = -1; + try { + startingSeqNo = Long.parseLong( + idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) + ); + } catch (IOException e) { + Assert.fail(); + } + assertEquals(numDocs - 1, startingSeqNo); } - assertEquals(numDocs - 1, startingSeqNo); return idxShard; } }); @@ -169,4 +107,35 @@ public IndexShard indexShard() { shards.assertAllEqual(numDocs + moreDocs); } } + + public void testNoTranslogHistoryTransferred() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + List docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary); + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + assertEquals(moreDocs, getTranslog(primary).totalOperations()); + + // Step 2 - Start replica, recovery happens, check docs recovered till last flush + final IndexShard replica = shards.addReplica(); + shards.startAll(); + assertEquals(docIdAndSeqNosAfterFlush, getDocIdAndSeqNos(replica)); + assertDocCount(replica, numDocs); + assertEquals(NRTReplicationEngine.class, replica.getEngine().getClass()); + + // Step 3 - Check replica's translog has no operations + assertEquals(WriteOnlyTranslogManager.class, replica.getEngine().translogManager().getClass()); + WriteOnlyTranslogManager replicaTranslogManager = (WriteOnlyTranslogManager) replica.getEngine().translogManager(); + assertEquals(0, replicaTranslogManager.getTranslog().totalOperations()); + + // Adding this for close to succeed + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java similarity index 97% rename from server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java rename to server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 2b5550b71a627..7761f97769440 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -143,7 +143,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class RecoverySourceHandlerTests extends OpenSearchTestCase { +/** + * This covers test cases for {@link RecoverySourceHandler} and {@link LocalStorePeerRecoverySourceHandler}. + */ +public class LocalStorePeerRecoverySourceHandlerTests extends OpenSearchTestCase { private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( "index", Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() @@ -215,7 +218,7 @@ public void writeFileChunk( }); } }; - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -296,7 +299,7 @@ public void indexTranslogOperations( listener.onResponse(checkpointOnTarget.get()); } }; - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, @@ -359,7 +362,7 @@ public void indexTranslogOperations( } } }; - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, @@ -433,7 +436,7 @@ public void indexTranslogOperations( Randomness.shuffle(operations); List skipOperations = randomSubsetOf(operations); Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -552,7 +555,7 @@ public void writeFileChunk( failedEngine.set(true); return null; }).when(mockShard).failShard(any(), any()); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -627,7 +630,7 @@ public void writeFileChunk( failedEngine.set(true); return null; }).when(mockShard).failShard(any(), any()); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, @@ -680,7 +683,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); - final RecoverySourceHandler handler = new RecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, mock(RecoveryTargetHandler.class), threadPool, @@ -691,9 +694,15 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE ) { @Override - void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener listener) { + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { phase1Called.set(true); - super.phase1(snapshot, startingSeqNo, translogOps, listener); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); } @Override @@ -786,7 +795,7 @@ public void writeFileChunk( }; final int maxConcurrentChunks = between(1, 8); final int chunkSize = between(1, 32); - final RecoverySourceHandler handler = new RecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, recoveryTarget, threadPool, @@ -859,7 +868,7 @@ public void writeFileChunk( }; final int maxConcurrentChunks = between(1, 4); final int chunkSize = between(1, 16); - final RecoverySourceHandler handler = new RecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor), threadPool, @@ -967,7 +976,7 @@ public void cleanFiles( } }; final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest(); - final RecoverySourceHandler handler = new RecoverySourceHandler( + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, recoveryTarget, threadPool, @@ -993,7 +1002,7 @@ void createRetentionLease(long startingSeqNo, ActionListener lis final StepListener phase1Listener = new StepListener<>(); try { final CountDownLatch latch = new CountDownLatch(1); - handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch)); + handler.phase1(DirectoryReader.listCommits(dir).get(0), 0, () -> 0, new LatchedActionListener<>(phase1Listener, latch), false); latch.await(); phase1Listener.result(); } catch (Exception e) { @@ -1006,7 +1015,7 @@ void createRetentionLease(long startingSeqNo, ActionListener lis public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception { IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); - RecoverySourceHandler handler = new RecoverySourceHandler( + RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( shard, new TestRecoveryTargetHandler(), threadPool, @@ -1061,7 +1070,7 @@ private Store newStore(Path path) throws IOException { } private Store newStore(Path path, boolean checkIndex) throws IOException { - BaseDirectoryWrapper baseDirectoryWrapper = RecoverySourceHandlerTests.newFSDirectory(path); + BaseDirectoryWrapper baseDirectoryWrapper = LocalStorePeerRecoverySourceHandlerTests.newFSDirectory(path); if (checkIndex == false) { baseDirectoryWrapper.setCheckIndexOnClose(false); // don't run checkindex we might corrupt the index in these tests } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java new file mode 100644 index 0000000000000..91953d4db3495 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.replication.common.ReplicationType; + +public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .build(); + + public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs and flush + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + // Step 2 - Start replica for recovery to happen, check both has same number of docs + final IndexShard replica1 = shards.addReplica(); + shards.startAll(); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 3 - Index more docs, run segment replication, check both have same number of docs + int moreDocs = shards.indexDocs(randomIntBetween(10, 100)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 4 - Check both shard has expected number of doc count + assertDocCount(primary, numDocs + moreDocs); + assertDocCount(replica1, numDocs + moreDocs); + + // Step 5 - Check retention lease does not exist for the replica shard + assertEquals(1, primary.getRetentionLeases().leases().size()); + assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica1.routingEntry()))); + + // Step 6 - Start new replica, recovery happens, and check that new replica has docs upto last flush + final IndexShard replica2 = shards.addReplica(); + shards.startAll(); + assertDocCount(replica2, numDocs); + + // Step 7 - Segment replication, check all shards have same number of docs + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + + // Step 8 - Check retention lease does not exist for the replica shard + assertEquals(1, primary.getRetentionLeases().leases().size()); + assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry()))); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index f520206e0f866..f874ab44d9d3b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -67,6 +67,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -104,6 +105,7 @@ import org.opensearch.indices.recovery.RecoveryResponse; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoverySourceHandler; +import org.opensearch.indices.recovery.RecoverySourceHandlerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; @@ -132,8 +134,8 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -861,17 +863,23 @@ protected final void recoverUnstartedReplica( recoveryTarget, startingSeqNo ); - int fileChunkSizeInBytes = Math.toIntExact( - randomBoolean() ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() : randomIntBetween(1, 10 * 1024 * 1024) + long fileChunkSizeInBytes = randomBoolean() + ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() + : randomIntBetween(1, 10 * 1024 * 1024); + final Settings settings = Settings.builder() + .put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4))) + .put("indices.recovery.max_concurrent_operations", Integer.toString(between(1, 4))) + .build(); + RecoverySettings recoverySettings = new RecoverySettings( + settings, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - final RecoverySourceHandler recovery = new RecoverySourceHandler( + recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes)); + final RecoverySourceHandler recovery = RecoverySourceHandlerFactory.create( primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), - threadPool, request, - fileChunkSizeInBytes, - between(1, 8), - between(1, 8) + recoverySettings ); primary.updateShardState( primary.routingEntry(),