Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Upload translog checkpoint as object metadata to translog #13637

Merged
merged 15 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679))
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.InputStreamWithMetadata;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -143,9 +143,9 @@

@ExperimentalApi
@Override
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
return new InputStreamWithMetadata(s3RetryingInputStream, s3RetryingInputStream.getMetadata());

Check warning on line 148 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L148

Added line #L148 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@
return extendedStats;
}

@Override
public boolean isBlobMetadataEnabled() {
return true;

Check warning on line 249 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L249

Added line #L249 was not covered by tests
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.concurrent.ExecutionException;
Expand All @@ -60,6 +61,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase {

@Override
Expand Down Expand Up @@ -139,6 +141,7 @@ public void testCreateCloneIndex() {
}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
asyncUploadMockFsRepo = false;
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;
import org.junit.Before;

import java.util.Arrays;
import java.util.Map;
Expand All @@ -61,12 +63,18 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteShrinkIndexIT extends RemoteStoreBaseIntegTestCase {
@Override
protected boolean forbidPrivateIndexSettings() {
return false;
}

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
Expand All @@ -84,6 +92,7 @@ public void testCreateShrinkIndexToN() {
int[] shardSplits = randomFrom(possibleShardSplits);
assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);

internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get();
for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.io.IOException;
Expand All @@ -86,6 +87,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteSplitIndexIT extends RemoteStoreBaseIntegTestCase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -40,6 +41,11 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,5 +512,6 @@ private void assertCustomIndexMetadata(String index) {
logger.info("---> Asserting custom index metadata");
IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index);
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY));
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";
static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2";
Expand All @@ -39,7 +42,7 @@ public Settings indexSettings(int shards, int replicas) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

protected void restore(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.Before;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -50,7 +49,7 @@ public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
Expand All @@ -48,13 +51,15 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand All @@ -74,6 +79,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected Path segmentRepoPath;
protected Path translogRepoPath;
protected boolean clusterSettingsSuppliedByTest = false;
protected boolean asyncUploadMockFsRepo = randomBoolean();
private boolean metadataSupportedType = randomBoolean();
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
Expand Down Expand Up @@ -129,6 +136,19 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
return indexingStats;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
if (!clusterSettingsSuppliedByTest && asyncUploadMockFsRepo) {
if (metadataSupportedType) {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsMetadataSupportedRepositoryPlugin.class))
.collect(Collectors.toList());
} else {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}
}
return super.nodePlugins();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
Expand All @@ -138,10 +158,27 @@ protected Settings nodeSettings(int nodeOrdinal) {
if (clusterSettingsSuppliedByTest) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.build();
if (asyncUploadMockFsRepo) {
String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE;
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
repoType,
REPOSITORY_2_NAME,
translogRepoPath,
repoType
)
)
.build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.build();
}
}
}

Expand Down Expand Up @@ -221,6 +258,8 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFiel
@After
public void teardown() {
clusterSettingsSuppliedByTest = false;
asyncUploadMockFsRepo = randomBoolean();
metadataSupportedType = randomBoolean();
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -68,6 +69,11 @@ public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {
static final Setting<String> MOCK_SETTING = Setting.simpleString("mock-setting");
static final String[] EXCLUDED_NODES = { "ex-1", "ex-2" };

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -37,7 +38,7 @@ public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -48,14 +47,14 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -81,7 +80,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -797,25 +796,8 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
// Test local only translog files which are not uploaded to remote store (no metadata present in remote)
// Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE.
public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
clusterSettingsSuppliedByTest = true;

// Overriding settings to use AsyncMultiStreamBlobContainer
Settings settings = Settings.builder()
.put(super.nodeSettings(1))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
MockFsRepositoryPlugin.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
MockFsRepositoryPlugin.TYPE
)
)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
String dataNode = internalCluster().startDataOnlyNode(settings);
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();

// 1. Create index with 0 replica
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
Expand Down
Loading
Loading