From 6b4ce75c34cf347f486ce7009003ddb9d70d7f2e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 25 Aug 2022 19:31:26 +0530 Subject: [PATCH 01/12] Initiate Recovery - Decouples translog if remote xlog Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShard.java | 35 ++++++ .../recovery/PeerRecoveryTargetService.java | 111 +++++++++++++++--- 2 files changed, 131 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dcb7bdeb30e4f..8aa4881203035 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1792,6 +1792,41 @@ public long recoverLocallyUpToGlobalCheckpoint() { } } + /** + * The method figures out the sequence number basis the last commit. + * + * @return the starting sequence number from which the recovery should start. + */ + public long fetchStartSeqNoFromLastCommit() { + long seqNo; + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + + try { + seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace("skip local recovery as no index commit found"); + return UNASSIGNED_SEQ_NO; + } catch (Exception e) { + logger.debug("skip local recovery as failed to find the safe commit", e); + return UNASSIGNED_SEQ_NO; + } + + try { + maybeCheckIndex(); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.getTranslog().totalLocal(0); + } catch (Exception e) { + logger.debug("check index failed during fetch seqNo", e); + return UNASSIGNED_SEQ_NO; + } + return seqNo; + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 85141556657f3..eb05ec17e2c39 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -36,12 +36,13 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.support.replication.TransportWriteAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.metadata.IndexMetadata; @@ -238,10 +239,20 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false + && TransportWriteAction.IS_REMOTE_TXLOG_ENABLED.apply(indexShard); + final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex + ? indexShard.fetchStartSeqNoFromLastCommit() + : indexShard.recoverLocallyUpToGlobalCheckpoint(); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; - startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); + startRequest = getStartRecoveryRequest( + logger, + clusterService.localNode(), + recoveryTarget, + startingSeqNo, + !isRecoveringReplicaWithRemoteTxLogEnabledIndex + ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; } catch (final Exception e) { @@ -271,7 +282,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi } /** - * Prepare the start recovery request. + * Prepare the start recovery request for replicas whose underlying index is having remote translog enabled. * * @param logger the logger * @param localNode the local node of the recovery target @@ -280,7 +291,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi * This is the first operation after the local checkpoint of the safe commit if exists. * @return a start recovery request */ - public static StartRecoveryRequest getStartRecoveryRequest( + public static StartRecoveryRequest createStartRecoveryRequest( Logger logger, DiscoveryNode localNode, RecoveryTarget recoveryTarget, @@ -288,26 +299,96 @@ public static StartRecoveryRequest getStartRecoveryRequest( ) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - Store.MetadataSnapshot metadataSnapshot; try { metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index. - try { - final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); - assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; - } catch (IOException | TranslogCorruptedException e) { + } catch (final org.apache.lucene.index.IndexNotFoundException e) { + // happens on an empty folder. no need to log + assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; + logger.trace("{} shard folder empty, recovering all files", recoveryTarget); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } catch (final IOException e) { + if (startingSeqNo != UNASSIGNED_SEQ_NO) { logger.warn( new ParameterizedMessage( - "error while reading global checkpoint from translog, " - + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", + "error while listing local files, resetting the starting sequence number from {} " + + "to unassigned and recovering as if there are none", startingSeqNo ), e ); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; startingSeqNo = UNASSIGNED_SEQ_NO; + } else { + logger.warn("error while listing local files, recovering as if there are none", e); + } + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } + logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); + request = new StartRecoveryRequest( + recoveryTarget.shardId(), + recoveryTarget.indexShard().routingEntry().allocationId().getId(), + recoveryTarget.sourceNode(), + localNode, + metadataSnapshot, + recoveryTarget.state().getPrimary(), + recoveryTarget.getId(), + startingSeqNo + ); + return request; + } + + public static StartRecoveryRequest getStartRecoveryRequest( + Logger logger, + DiscoveryNode localNode, + RecoveryTarget recoveryTarget, + long startingSeqNo + ) { + return getStartRecoveryRequest(logger, localNode, recoveryTarget, startingSeqNo, true); + } + + /** + * Prepare the start recovery request. + * + * @param logger the logger + * @param localNode the local node of the recovery target + * @param recoveryTarget the target of the recovery + * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. + * @param validateTranslog should the recovery request validate translog consistency with snapshot store metadata. + * @return a start recovery request + */ + public static StartRecoveryRequest getStartRecoveryRequest( + Logger logger, + DiscoveryNode localNode, + RecoveryTarget recoveryTarget, + long startingSeqNo, + boolean validateTranslog + ) { + final StartRecoveryRequest request; + logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); + + Store.MetadataSnapshot metadataSnapshot; + try { + metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + if (validateTranslog) { + // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene + // index. + try { + final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); + assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; + } catch (IOException | TranslogCorruptedException e) { + logger.warn( + new ParameterizedMessage( + "error while reading global checkpoint from translog, " + + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; + } } } catch (final org.apache.lucene.index.IndexNotFoundException e) { // happens on an empty folder. no need to log From 687e12f30dfdaa48cce8780a043d967618ad3acc Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 26 Aug 2022 01:09:35 +0530 Subject: [PATCH 02/12] Skips txlog replay during recovery if remote txlog Signed-off-by: Ashish Singh --- .../recovery/RecoverySourceHandler.java | 130 +++++++++++------- 1 file changed, 78 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 9e219db5a4c96..3a3e440f644e3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -47,6 +47,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.action.support.replication.TransportWriteAction; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; @@ -316,60 +317,85 @@ && isTargetSameHistory() } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - sendFileStep.whenComplete(r -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); - }, onFailure); - - prepareEngineStep.whenComplete(prepareEngineTime -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - RunUnderPrimaryPermit.run( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), - shard, - cancellableThreads, - logger - ); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); - } - final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( - PEER_RECOVERY_NAME, - startingSeqNo, - Long.MAX_VALUE, - false, - true - ); - resources.add(phase2Snapshot); - retentionLock.close(); + boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false + && TransportWriteAction.IS_REMOTE_TXLOG_ENABLED.apply(shard); + + if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(0, prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + retentionLock.close(); + sendSnapshotStep.onResponse(new SendSnapshotResult(endingSeqNo, 0, TimeValue.ZERO)); + }, onFailure); + } else { + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - final RetentionLeases retentionLeases = shard.getRetentionLeases(); - final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); - phase2( - startingSeqNo, - endingSeqNo, - phase2Snapshot, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - sendSnapshotStep - ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + if (logger.isTraceEnabled()) { + logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( + PEER_RECOVERY_NAME, + startingSeqNo, + Long.MAX_VALUE, + false, + true + ); + resources.add(phase2Snapshot); + retentionLock.close(); + + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + final RetentionLeases retentionLeases = shard.getRetentionLeases(); + final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); + phase2( + startingSeqNo, + endingSeqNo, + phase2Snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersionOnPrimary, + sendSnapshotStep + ); - }, onFailure); + }, onFailure); + } // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 final long trimAboveSeqNo = startingSeqNo - 1; From 5773fd8e4d1f6993b91669b7f499f90b5b3c12bc Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 26 Aug 2022 12:12:42 +0530 Subject: [PATCH 03/12] Removed unnecessary code Signed-off-by: Ashish Singh --- .../recovery/PeerRecoveryTargetService.java | 56 ------------------- 1 file changed, 56 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index eb05ec17e2c39..4db70324a2608 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -281,62 +281,6 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi ); } - /** - * Prepare the start recovery request for replicas whose underlying index is having remote translog enabled. - * - * @param logger the logger - * @param localNode the local node of the recovery target - * @param recoveryTarget the target of the recovery - * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. - * This is the first operation after the local checkpoint of the safe commit if exists. - * @return a start recovery request - */ - public static StartRecoveryRequest createStartRecoveryRequest( - Logger logger, - DiscoveryNode localNode, - RecoveryTarget recoveryTarget, - long startingSeqNo - ) { - final StartRecoveryRequest request; - logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - Store.MetadataSnapshot metadataSnapshot; - try { - metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - } catch (final org.apache.lucene.index.IndexNotFoundException e) { - // happens on an empty folder. no need to log - assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; - logger.trace("{} shard folder empty, recovering all files", recoveryTarget); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } catch (final IOException e) { - if (startingSeqNo != UNASSIGNED_SEQ_NO) { - logger.warn( - new ParameterizedMessage( - "error while listing local files, resetting the starting sequence number from {} " - + "to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); - startingSeqNo = UNASSIGNED_SEQ_NO; - } else { - logger.warn("error while listing local files, recovering as if there are none", e); - } - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } - logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); - request = new StartRecoveryRequest( - recoveryTarget.shardId(), - recoveryTarget.indexShard().routingEntry().allocationId().getId(), - recoveryTarget.sourceNode(), - localNode, - metadataSnapshot, - recoveryTarget.state().getPrimary(), - recoveryTarget.getId(), - startingSeqNo - ); - return request; - } - public static StartRecoveryRequest getStartRecoveryRequest( Logger logger, DiscoveryNode localNode, From c7e8464305f518b6b8f1617e9628656791f6df36 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 29 Aug 2022 16:21:04 +0530 Subject: [PATCH 04/12] Refactored code and simplified remote txlog check Signed-off-by: Ashish Singh --- .../main/java/org/opensearch/index/shard/IndexShard.java | 7 +++++++ .../indices/recovery/PeerRecoveryTargetService.java | 3 +-- .../opensearch/indices/recovery/RecoverySourceHandler.java | 3 +-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8aa4881203035..cbac01ad4a290 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3310,6 +3310,13 @@ private boolean isRemoteStoreEnabled() { return (remoteStore != null && shardRouting.primary()); } + public boolean isRemoteTxlogEnabledOnPrimary() { + return indexSettings != null + && indexSettings.isSegRepEnabled() + && indexSettings.isRemoteStoreEnabled() + && indexSettings.isRemoteTranslogStoreEnabled(); + } + /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 4db70324a2608..a9173c4ed4611 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -42,7 +42,6 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.support.replication.TransportWriteAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.metadata.IndexMetadata; @@ -240,7 +239,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false - && TransportWriteAction.IS_REMOTE_TXLOG_ENABLED.apply(indexShard); + && indexShard.isRemoteTxlogEnabledOnPrimary(); final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex ? indexShard.fetchStartSeqNoFromLastCommit() : indexShard.recoverLocallyUpToGlobalCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 3a3e440f644e3..2fcebde2b7ef3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -47,7 +47,6 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.TransportWriteAction; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; @@ -318,7 +317,7 @@ && isTargetSameHistory() assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false - && TransportWriteAction.IS_REMOTE_TXLOG_ENABLED.apply(shard); + && shard.isRemoteTxlogEnabledOnPrimary(); if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { sendFileStep.whenComplete(r -> { From fae774c8c6fc0342041634bec0508706b812e6e5 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 1 Sep 2022 11:14:18 +0530 Subject: [PATCH 05/12] Incorporated PR review feedback Signed-off-by: Ashish Singh --- .../main/java/org/opensearch/index/shard/IndexShard.java | 7 ++----- .../indices/recovery/PeerRecoveryTargetService.java | 2 +- .../opensearch/indices/recovery/RecoverySourceHandler.java | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index cbac01ad4a290..83aab1c7f9227 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3310,11 +3310,8 @@ private boolean isRemoteStoreEnabled() { return (remoteStore != null && shardRouting.primary()); } - public boolean isRemoteTxlogEnabledOnPrimary() { - return indexSettings != null - && indexSettings.isSegRepEnabled() - && indexSettings.isRemoteStoreEnabled() - && indexSettings.isRemoteTranslogStoreEnabled(); + public boolean isRemoteTranslogEnabledOnPrimary() { + return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index a9173c4ed4611..51b73c20c5c30 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -239,7 +239,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false - && indexShard.isRemoteTxlogEnabledOnPrimary(); + && indexShard.isRemoteTranslogEnabledOnPrimary(); final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex ? indexShard.fetchStartSeqNoFromLastCommit() : indexShard.recoverLocallyUpToGlobalCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 2fcebde2b7ef3..648fb75d5e89f 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -317,7 +317,7 @@ && isTargetSameHistory() assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false - && shard.isRemoteTxlogEnabledOnPrimary(); + && shard.isRemoteTranslogEnabledOnPrimary(); if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { sendFileStep.whenComplete(r -> { From 94d5cf71e31ee8ba8dec9b454398675a4c2a6b68 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 5 Sep 2022 10:09:00 +0530 Subject: [PATCH 06/12] Added UT for replica shard recovery Signed-off-by: Ashish Singh --- ...overyWithRemoteTranslogOnPrimaryTests.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java new file mode 100644 index 0000000000000..a1f47269bf82d --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.common.ReplicationType; + +public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .build(); + + public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs and flush + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + // Step 2 - Start replica for recovery to happen, check both has same number of docs + final IndexShard replica1 = shards.addReplica(); + shards.startAll(); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 3 - Index more docs, run segment replication, check both have same number of docs + int moreDocs = shards.indexDocs(randomIntBetween(10, 100)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 4 - Check both shard has expected number of doc count + assertDocCount(primary, numDocs + moreDocs); + assertDocCount(replica1, numDocs + moreDocs); + + // Step 5 - Start new replica, recovery happens, and check that new replica has docs upto last flush + final IndexShard replica2 = shards.addReplica(); + shards.startAll(); + assertDocCount(replica2, numDocs); + + // Step 6 - Segment replication, check all shards have same number of docs + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } +} From 60dfb37000880b84eb94f46363c57a69e53f0aee Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 5 Sep 2022 11:31:12 +0530 Subject: [PATCH 07/12] Added UT for replica shard recovery Signed-off-by: Ashish Singh --- ...overyWithRemoteTranslogOnPrimaryTests.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index a1f47269bf82d..95b338c50a6e8 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -10,10 +10,15 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.translog.WriteOnlyTranslogManager; import org.opensearch.indices.replication.common.ReplicationType; +import java.util.List; + public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() @@ -56,4 +61,35 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { shards.assertAllEqual(numDocs + moreDocs); } } + + public void testNoTranslogHistoryTransferred() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + List docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary); + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + assertEquals(moreDocs, getTranslog(primary).totalOperations()); + + // Step 2 - Start replica, recovery happens, check docs recovered till last flush + final IndexShard replica = shards.addReplica(); + shards.startAll(); + assertEquals(docIdAndSeqNosAfterFlush, getDocIdAndSeqNos(replica)); + assertDocCount(replica, numDocs); + assertEquals(NRTReplicationEngine.class, replica.getEngine().getClass()); + + // Step 3 - Check replica's translog has no operations + assertEquals(WriteOnlyTranslogManager.class, replica.getEngine().translogManager().getClass()); + WriteOnlyTranslogManager replicaTranslogManager = (WriteOnlyTranslogManager) replica.getEngine().translogManager(); + assertEquals(0, replicaTranslogManager.getTranslog().totalOperations()); + + // Adding this for close to succeed + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } } From 9d813b0541044e0155b891f1139b8efec0bd34e4 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 5 Sep 2022 16:27:41 +0530 Subject: [PATCH 08/12] Added UT for checking starting seq no in replica recovery Signed-off-by: Ashish Singh --- ...overyWithRemoteTranslogOnPrimaryTests.java | 66 +++++++++++++++++++ .../index/shard/IndexShardTestCase.java | 7 +- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 95b338c50a6e8..799acf4c41b6c 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -9,16 +9,22 @@ package org.opensearch.index.shard; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.translog.WriteOnlyTranslogManager; +import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.common.ReplicationType; import java.util.List; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; + public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() @@ -92,4 +98,64 @@ public void testNoTranslogHistoryTransferred() throws Exception { shards.assertAllEqual(numDocs + moreDocs); } } + + public void testStartSequenceForReplicaRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + final IndexShard replica = shards.addReplica(); + shards.startAll(); + + allowShardFailures(); + replica.failShard("test", null); + + final ShardRouting replicaRouting = replica.routingEntry(); + final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata()) + .primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1) + .build(); + closeShards(replica); + shards.removeReplica(replica); + + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + shards.flush(); + + IndexShard newReplicaShard = newShard( + newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + false, + ShardRoutingState.INITIALIZING, + RecoverySource.PeerRecoverySource.INSTANCE + ), + replica.shardPath(), + newIndexMetadata, + null, + null, + replica.getEngineFactory(), + replica.getEngineConfigFactory(), + replica.getGlobalCheckpointSyncer(), + replica.getRetentionLeaseSyncer(), + EMPTY_EVENT_LISTENER, + null + ); + shards.addReplica(newReplicaShard); + shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { + @Override + public IndexShard indexShard() { + IndexShard idxShard = super.indexShard(); + // verify the starting sequence number while recovering a failed shard which has a valid last commit + assertEquals(numDocs - 1, idxShard.fetchStartSeqNoFromLastCommit()); + return idxShard; + } + }); + + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 09eca006d600a..35e0205c3b665 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -853,7 +853,12 @@ protected final void recoverUnstartedReplica( } replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); - final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + IndexShard indexShard = recoveryTarget.indexShard(); + boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false + && indexShard.isRemoteTranslogEnabledOnPrimary(); + final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex + ? indexShard.fetchStartSeqNoFromLastCommit() + : indexShard.recoverLocallyUpToGlobalCheckpoint(); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode, From 6c4f93ba4546e8d9b778f0332231ea023452737d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 5 Sep 2022 17:17:02 +0530 Subject: [PATCH 09/12] Fixed UT for checking starting seq no in replica recovery Signed-off-by: Ashish Singh --- ...icaRecoveryWithRemoteTranslogOnPrimaryTests.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 799acf4c41b6c..5d317693e02df 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.junit.Assert; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; @@ -17,10 +18,12 @@ import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.WriteOnlyTranslogManager; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.common.ReplicationType; +import java.io.IOException; import java.util.List; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; @@ -148,7 +151,15 @@ public void testStartSequenceForReplicaRecovery() throws Exception { public IndexShard indexShard() { IndexShard idxShard = super.indexShard(); // verify the starting sequence number while recovering a failed shard which has a valid last commit - assertEquals(numDocs - 1, idxShard.fetchStartSeqNoFromLastCommit()); + long startingSeqNo = -1; + try { + startingSeqNo = Long.parseLong( + idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) + ); + } catch (IOException e) { + Assert.fail(); + } + assertEquals(numDocs - 1, startingSeqNo); return idxShard; } }); From 4eda9b0a6cf621049632d8096c5540c64586a66e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 15 Sep 2022 11:58:45 +0530 Subject: [PATCH 10/12] Incorporated PR review feedback Signed-off-by: Ashish Singh --- .../org/opensearch/index/shard/IndexShard.java | 14 +++++++++++--- .../recovery/PeerRecoveryTargetService.java | 15 ++++++--------- .../indices/recovery/RecoverySourceHandler.java | 2 +- .../index/shard/IndexShardTestCase.java | 8 ++------ 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 83aab1c7f9227..c2542e1a2907c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -163,8 +163,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1792,12 +1792,20 @@ public long recoverLocallyUpToGlobalCheckpoint() { } } + public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { + if (localTranslog) { + return recoverLocallyUpToGlobalCheckpoint(); + } else { + return recoverLocallyUptoLastCommit(); + } + } + /** * The method figures out the sequence number basis the last commit. * * @return the starting sequence number from which the recovery should start. */ - public long fetchStartSeqNoFromLastCommit() { + public long recoverLocallyUptoLastCommit() { long seqNo; assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; if (state != IndexShardState.RECOVERING) { @@ -3310,7 +3318,7 @@ private boolean isRemoteStoreEnabled() { return (remoteStore != null && shardRouting.primary()); } - public boolean isRemoteTranslogEnabledOnPrimary() { + public boolean isRemoteTranslogEnabled() { return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 51b73c20c5c30..324d246adc88b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -238,11 +238,8 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false - && indexShard.isRemoteTranslogEnabledOnPrimary(); - final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex - ? indexShard.fetchStartSeqNoFromLastCommit() - : indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; startRequest = getStartRecoveryRequest( @@ -250,7 +247,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi clusterService.localNode(), recoveryTarget, startingSeqNo, - !isRecoveringReplicaWithRemoteTxLogEnabledIndex + remoteTranslogEnabled == false ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; @@ -297,7 +294,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( * @param recoveryTarget the target of the recovery * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. * This is the first operation after the local checkpoint of the safe commit if exists. - * @param validateTranslog should the recovery request validate translog consistency with snapshot store metadata. + * @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata. * @return a start recovery request */ public static StartRecoveryRequest getStartRecoveryRequest( @@ -305,7 +302,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( DiscoveryNode localNode, RecoveryTarget recoveryTarget, long startingSeqNo, - boolean validateTranslog + boolean verifyTranslog ) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); @@ -313,7 +310,7 @@ public static StartRecoveryRequest getStartRecoveryRequest( Store.MetadataSnapshot metadataSnapshot; try { metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - if (validateTranslog) { + if (verifyTranslog) { // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene // index. try { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 648fb75d5e89f..665e79722770e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -317,7 +317,7 @@ && isTargetSameHistory() assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false - && shard.isRemoteTranslogEnabledOnPrimary(); + && shard.isRemoteTranslogEnabled(); if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { sendFileStep.whenComplete(r -> { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 35e0205c3b665..054aa6ad6eacc 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,7 +134,6 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -854,11 +853,8 @@ protected final void recoverUnstartedReplica( replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); IndexShard indexShard = recoveryTarget.indexShard(); - boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false - && indexShard.isRemoteTranslogEnabledOnPrimary(); - final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex - ? indexShard.fetchStartSeqNoFromLastCommit() - : indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode, From 3bacd4c2bf6ac57bc4e002a6d7a8f13abb42f87f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 16 Sep 2022 01:19:08 +0530 Subject: [PATCH 11/12] Incorporates PR feedback Signed-off-by: Ashish Singh --- .../org/opensearch/index/shard/IndexShard.java | 4 ++-- .../recovery/PeerRecoveryTargetServiceTests.java | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index c2542e1a2907c..08159c44eb312 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1703,7 +1703,7 @@ public void prepareForIndexRecovery() { * @return a sequence number that an operation-based peer recovery can start with. * This is the first operation after the local checkpoint of the safe commit if exists. */ - public long recoverLocallyUpToGlobalCheckpoint() { + private long recoverLocallyUpToGlobalCheckpoint() { assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); @@ -1805,7 +1805,7 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { * * @return the starting sequence number from which the recovery should start. */ - public long recoverLocallyUptoLastCommit() { + private long recoverLocallyUptoLastCommit() { long seqNo; assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; if (state != IndexShardState.RECOVERING) { diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 2a88345346e52..a50089831b3e9 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -211,7 +211,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { IndexShard shard = newShard(false); shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, localNode)); shard.prepareForIndexRecovery(); - assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(shard.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -239,7 +239,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(globalCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -254,7 +254,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -276,10 +276,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception { replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); if (safeCommit.isPresent()) { - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); } else { - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); } assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG)); @@ -322,7 +322,7 @@ public void testClosedIndexSkipsLocalRecovery() throws Exception { ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -349,7 +349,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception { shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); shard.prepareForIndexRecovery(); - long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint(); + long startingSeqNo = shard.recoverLocallyAndFetchStartSeqNo(true); shard.store().markStoreCorrupted(new IOException("simulated")); RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo); From b2ada8e815fb74ab58c538047deb988502156424 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 16 Sep 2022 02:13:05 +0530 Subject: [PATCH 12/12] Incorporates PR feedback Signed-off-by: Ashish Singh --- CHANGELOG.md | 3 +- .../opensearch/index/shard/IndexShard.java | 35 ++++++++++--------- .../recovery/PeerRecoveryTargetService.java | 10 ++++-- .../index/shard/IndexShardTestCase.java | 2 +- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7839d43209a71..0e792bae085a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) +- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) ### Deprecated @@ -91,4 +92,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) [Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD -[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x \ No newline at end of file +[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 08159c44eb312..9185ef0d440ce 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -203,6 +203,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** @@ -1704,12 +1705,7 @@ public void prepareForIndexRecovery() { * This is the first operation after the local checkpoint of the safe commit if exists. */ private long recoverLocallyUpToGlobalCheckpoint() { - assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); - assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + validateLocalRecoveryState(); final Optional safeCommit; final long globalCheckpoint; try { @@ -1806,21 +1802,17 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { * @return the starting sequence number from which the recovery should start. */ private long recoverLocallyUptoLastCommit() { + assert isRemoteTranslogEnabled() : "Remote translog store is not enabled"; long seqNo; - assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); - assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + validateLocalRecoveryState(); try { - seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO)); } catch (org.apache.lucene.index.IndexNotFoundException e) { - logger.trace("skip local recovery as no index commit found"); + logger.error("skip local recovery as no index commit found", e); return UNASSIGNED_SEQ_NO; } catch (Exception e) { - logger.debug("skip local recovery as failed to find the safe commit", e); + logger.error("skip local recovery as failed to find the safe commit", e); return UNASSIGNED_SEQ_NO; } @@ -1829,12 +1821,21 @@ private long recoverLocallyUptoLastCommit() { recoveryState.setStage(RecoveryState.Stage.TRANSLOG); recoveryState.getTranslog().totalLocal(0); } catch (Exception e) { - logger.debug("check index failed during fetch seqNo", e); + logger.error("check index failed during fetch seqNo", e); return UNASSIGNED_SEQ_NO; } return seqNo; } + private void validateLocalRecoveryState() { + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -2041,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t private boolean assertSequenceNumbersInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; - assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + userData.get(Engine.HISTORY_UUID_KEY) diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 324d246adc88b..b5702431ed4bf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request)); } + /** + * Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see + * github issue on it. + * @param recoveryId recovery id + * @param preExistingRequest start recovery request + */ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) { final String actionName; final TransportRequest requestToSend; @@ -239,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; startRequest = getStartRecoveryRequest( @@ -247,7 +253,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi clusterService.localNode(), recoveryTarget, startingSeqNo, - remoteTranslogEnabled == false + !remoteTranslogEnabled ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 054aa6ad6eacc..1fcdfd79c544e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -854,7 +854,7 @@ protected final void recoverUnstartedReplica( final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); IndexShard indexShard = recoveryTarget.indexShard(); boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(remoteTranslogEnabled == false); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode,