Skip to content

Commit

Permalink
Add Remote store as a segment replication source
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed May 22, 2023
1 parent 63834d9 commit 37e6e79
Show file tree
Hide file tree
Showing 18 changed files with 286 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand Down Expand Up @@ -785,6 +786,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
if (segmentReplicationWithRemoteEnabled()) {
logger.info("Skipping testPressureServiceStats as segment replication with remote store is enabled.");
return;
}
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
Expand Down Expand Up @@ -874,6 +879,10 @@ public void testPressureServiceStats() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
if (segmentReplicationWithRemoteEnabled()) {
logger.info("Skipping testScrollCreatedOnReplica as segment replication with remote store is enabled.");
return;
}
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -963,6 +972,10 @@ public void testScrollCreatedOnReplica() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollWithOngoingSegmentReplication() throws Exception {
if (segmentReplicationWithRemoteEnabled()) {
logger.info("Skipping testScrollWithOngoingSegmentReplication as segment replication with remote store is enabled.");
return;
}
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
prepareCreate(
Expand Down Expand Up @@ -1249,4 +1262,8 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
waitForSearchableDocs(2, nodes);
}

private boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* The aim of this class is to run Segment Replication integ tests by enabling remote store specific settings.
* This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling
* remote store.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
69 changes: 50 additions & 19 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2237,7 +2237,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4413,7 +4413,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4463,23 +4463,15 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4531,19 +4523,58 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
skippedSegments.add(file);
}
}

if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
SegmentInfos infosSnapshot = null;
boolean canRetry = true;
while (true) {
try {
infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
break;
} catch (FileNotFoundException e) {
/**
* While we're downloading the segment files from remote store, primary is continuously writing as well.
* Primary updates segmentInfo in same file on remote store during refreshes(not for commit though).
* This can lead to race conditions where the segmentInfo file downloaded is from a newer refresh
* and points to segment files which we haven't even downloaded thus resulting in FileNotFoundException.
* So to handle this, we again download the delta in segments from store and retry.
* Since, we're not downloading the files which are already downloaded, we skip the segmentInfosSnapshot during retry.
* This guarantees that all files needs to refresh the reader are available, and we don't run into race conditions during retry.
*/
if (canRetry) {
canRetry = false; // only retry once
// download diff.
((RemoteSegmentStoreDirectory) remoteDirectory).init();
uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory).getSegmentsUploadedToRemoteStore();
Set<String> segmentsToFetch = uploadedSegments.keySet();
segmentsToFetch.removeAll(downloadedSegments);
segmentsToFetch.removeAll(skippedSegments);
for (String file : segmentsToFetch) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
}
logger.info("Fetching extra segments during retry: {}", segmentsToFetch);
downloadedSegments.addAll(segmentsToFetch);
} else {
break;
}
}
}

long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
if (shouldCommit) {
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
}
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void init() throws IOException {
* @return Map of segment filename to uploaded filename with checksum
* @throws IOException if there were any failures in reading the metadata file
*/
private Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
public Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = new HashMap<>();

Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
Expand Down Expand Up @@ -189,6 +189,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
}

public String getOriginalFilename() {
return originalFilename;
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata>
missing.add(value);
} else {
final StoreFileMetadata fileMetadata = target.get(value.name());
if (fileMetadata.isSame(value)) {
// match segments using checksum
if (fileMetadata.checksum().equals(value.checksum())) {
identical.add(value);
} else {
different.add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.store.Store;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RetryableTransportClient;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
Store store,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Implementation of a {@link SegmentReplicationSource} where the source is remote store.
*
* @opensearch.internal
*/
public class RemoteStoreReplicationSource implements SegmentReplicationSource {

private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class);

private final IndexShard indexShard;

public RemoteStoreReplicationSource(IndexShard indexShard) {
this.indexShard = indexShard;
}

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate();

Map<String, StoreFileMetadata> metadataMap;
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
try {
metadataMap = remoteDirectory.readLatestMetadataFile()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
);
// TODO: GET current checkpoint from remote store.
listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null));
} catch (IOException e) {
logger.error("Error fetching checkpoint metadata from remote store", e);
listener.onFailure(e);
}
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
try {
indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false);
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
} catch (Exception e) {
logger.error("Failed to sync segments", e);
listener.onFailure(e);
}
}

@Override
public String getDescription() {
return "remote store";
}
}
Loading

0 comments on commit 37e6e79

Please sign in to comment.