diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0d982b92cd80e..0568bdca60d06 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -47,6 +47,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService; @@ -785,6 +786,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } public void testPressureServiceStats() throws Exception { + if (segmentReplicationWithRemoteEnabled()) { + logger.info("Skipping testPressureServiceStats as segment replication with remote store is enabled."); + return; + } final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); final String replicaNode = internalCluster().startNode(); @@ -874,6 +879,10 @@ public void testPressureServiceStats() throws Exception { * @throws Exception when issue is encountered */ public void testScrollCreatedOnReplica() throws Exception { + if (segmentReplicationWithRemoteEnabled()) { + logger.info("Skipping testScrollCreatedOnReplica as segment replication with remote store is enabled."); + return; + } // create the cluster with one primary node containing primary shard and replica node containing replica shard final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -963,6 +972,10 @@ public void testScrollCreatedOnReplica() throws Exception { * @throws Exception when issue is encountered */ public void testScrollWithOngoingSegmentReplication() throws Exception { + if (segmentReplicationWithRemoteEnabled()) { + logger.info("Skipping testScrollWithOngoingSegmentReplication as segment replication with remote store is enabled."); + return; + } // create the cluster with one primary node containing primary shard and replica node containing replica shard final String primary = internalCluster().startNode(); prepareCreate( @@ -1249,4 +1262,8 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { waitForSearchableDocs(2, nodes); } + private boolean segmentReplicationWithRemoteEnabled() { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue() + && "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL)); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java new file mode 100644 index 0000000000000..4f5346af0c483 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationIT; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * The aim of this class is to run Segment Replication integ tests by enabling remote store specific settings. + * This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling + * remote store. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationUsingRemoteStoreIT extends SegmentReplicationIT { + + private static final String REPOSITORY_NAME = "test-remore-store-repo"; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8b542be222f25..8794347a61fca 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2237,7 +2237,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false); + syncSegmentsFromRemoteSegmentStore(false, true, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4413,7 +4413,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false); + syncSegmentsFromRemoteSegmentStore(false, true, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4463,23 +4463,15 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog()); } - /** - * Downloads segments from remote segment store. This method will download segments till - * last refresh checkpoint. - * @param overrideLocal flag to override local segment files with those in remote store - * @throws IOException if exception occurs while reading segments from remote store - */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { - syncSegmentsFromRemoteSegmentStore(overrideLocal, true); - } - /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise + * @param shouldCommit if the shard requires committing the changes after sync from remote. * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) + throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; @@ -4531,19 +4523,58 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re skippedSegments.add(file); } } + if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) { try ( ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT) ) ) { - SegmentInfos infosSnapshot = SegmentInfos.readCommit( - store.directory(), - indexInput, - Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]) - ); + SegmentInfos infosSnapshot = null; + boolean canRetry = true; + while (true) { + try { + infosSnapshot = SegmentInfos.readCommit( + store.directory(), + indexInput, + Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]) + ); + break; + } catch (FileNotFoundException e) { + /** + * While we're downloading the segment files from remote store, primary is continuously writing as well. + * Primary updates segmentInfo in same file on remote store during refreshes(not for commit though). + * This can lead to race conditions where the segmentInfo file downloaded is from a newer refresh + * and points to segment files which we haven't even downloaded thus resulting in FileNotFoundException. + * So to handle this, we again download the delta in segments from store and retry. + * Since, we're not downloading the files which are already downloaded, we skip the segmentInfosSnapshot during retry. + * This guarantees that all files needs to refresh the reader are available, and we don't run into race conditions during retry. + */ + if (canRetry) { + canRetry = false; // only retry once + // download diff. + ((RemoteSegmentStoreDirectory) remoteDirectory).init(); + uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory).getSegmentsUploadedToRemoteStore(); + Set segmentsToFetch = uploadedSegments.keySet(); + segmentsToFetch.removeAll(downloadedSegments); + segmentsToFetch.removeAll(skippedSegments); + for (String file : segmentsToFetch) { + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + } + logger.info("Fetching extra segments during retry: {}", segmentsToFetch); + downloadedSegments.addAll(segmentsToFetch); + } else { + break; + } + } + } + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + if (shouldCommit) { + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } else { + finalizeReplication(infosSnapshot); + } } } } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 9393a5ac38ac2..02397bc356539 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -453,7 +453,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true); + indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9f41ac6f7fd17..ddd7bcaa6bd62 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -128,7 +128,7 @@ public void init() throws IOException { * @return Map of segment filename to uploaded filename with checksum * @throws IOException if there were any failures in reading the metadata file */ - private Map readLatestMetadataFile() throws IOException { + public Map readLatestMetadataFile() throws IOException { Map segmentMetadataMap = new HashMap<>(); Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); @@ -189,6 +189,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); } + + public String getOriginalFilename() { + return originalFilename; + } } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 9b2c661569c16..1b0406bb844fa 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map missing.add(value); } else { final StoreFileMetadata fileMetadata = target.get(value.name()); - if (fileMetadata.isSame(value)) { + // match segments using checksum + if (fileMetadata.checksum().equals(value.checksum())) { identical.add(value); } else { different.add(value); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 5fb84a165c498..a08d3182fa156 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index b211d81c1c76a..5455be2a69799 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -13,7 +13,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.index.store.Store; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; @@ -79,7 +79,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java new file mode 100644 index 0000000000000..429c7c5a0f7ee --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.util.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Implementation of a {@link SegmentReplicationSource} where the source is remote store. + * + * @opensearch.internal + */ +public class RemoteStoreReplicationSource implements SegmentReplicationSource { + + private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + + private final IndexShard indexShard; + + public RemoteStoreReplicationSource(IndexShard indexShard) { + this.indexShard = indexShard; + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); + + Map metadataMap; + // TODO: Need to figure out a way to pass this information for segment metadata via remote store. + final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); + try { + metadataMap = remoteDirectory.readLatestMetadataFile() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) + ) + ); + // TODO: GET current checkpoint from remote store. + listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + } catch (IOException e) { + logger.error("Error fetching checkpoint metadata from remote store", e); + listener.onFailure(e); + } + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + try { + indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false); + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } catch (Exception e) { + logger.error("Failed to sync segments", e); + listener.onFailure(e); + } + } + + @Override + public String getDescription() { + return "remote store"; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java index 2fa74819fe4de..79b9b31e3d5c3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -10,7 +10,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException; -import org.opensearch.index.store.Store; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -38,14 +38,14 @@ public interface SegmentReplicationSource { * @param replicationId {@link long} - ID of the replication event. * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. * @param filesToFetch {@link List} List of files to fetch. - * @param store {@link Store} Reference to the local store. + * @param indexShard {@link IndexShard} Reference to the IndexShard. * @param listener {@link ActionListener} Listener that completes with the list of files copied. */ void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 1867fc59c5a56..238e316c3b585 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -38,13 +38,17 @@ public SegmentReplicationSourceFactory( } public SegmentReplicationSource get(IndexShard shard) { - return new PrimaryShardReplicationSource( - shard.recoveryState().getTargetNode(), - shard.routingEntry().allocationId().getId(), - transportService, - recoverySettings, - getPrimaryNode(shard.shardId()) - ); + if (shard.indexSettings().isSegRepWithRemoteEnabled()) { + return new RemoteStoreReplicationSource(shard); + } else { + return new PrimaryShardReplicationSource( + shard.recoveryState().getTargetNode(), + shard.routingEntry().allocationId().getId(), + transportService, + recoverySettings, + getPrimaryNode(shard.shardId()) + ); + } } private DiscoveryNode getPrimaryNode(ShardId shardId) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index d9f318e78597c..e1d9eb9fdf0a9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -209,10 +209,18 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { + // TODO: Refactor the logic so that finalize doesn't have to be invoked for remote store as source + if (source instanceof RemoteStoreReplicationSource) { + ActionListener.completeWith(listener, () -> { + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + return null; + }); + return; + } ActionListener.completeWith(listener, () -> { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0d95f40652523..b4f4759f53222 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -840,7 +840,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -910,7 +910,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { Assert.fail("Should not be reached"); @@ -950,7 +950,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { // randomly resolve the listener, indicating the source has resolved. @@ -992,7 +992,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) {} }; diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index fdd707ae88195..4d273c71e7861 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -21,7 +21,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -123,7 +122,7 @@ public void testGetSegmentFiles() { REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -151,7 +150,7 @@ public void testTransportTimeoutForGetSegmentFilesAction() { REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -176,7 +175,7 @@ public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedExcep REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index a3c016d5ba0df..e8b526e838b80 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -21,7 +21,6 @@ import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; @@ -141,7 +140,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { Assert.fail("Should not be called"); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index a029d87f4a575..8adc5f38fbff4 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -129,7 +129,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { assertEquals(1, filesToFetch.size()); @@ -180,7 +180,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -223,7 +223,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onFailure(exception); @@ -266,7 +266,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -311,7 +311,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -355,7 +355,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -406,7 +406,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java index a3adedcbdef86..f6c0331d2056a 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -9,6 +9,7 @@ package org.opensearch.index.replication; import org.opensearch.action.ActionListener; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -35,7 +36,7 @@ public abstract void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b785574ca52b2..5271634b15086 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1335,7 +1335,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { try (