Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot V2] Support pinned timestamp in delete flow #15256

Merged
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
9142a25
Initial Commit to support centralize snapshot creation and implicit l…
Aug 6, 2024
d9bbc65
Fix deserilization error
Aug 6, 2024
f72702f
Fix gradle spotless check
Aug 6, 2024
8e85231
Fix listener
Aug 6, 2024
8a507ad
Merge branch 'main' into snapshot-pinned-timestamp
Aug 7, 2024
2d404e8
Fix test
Aug 7, 2024
90c860c
Fix snapshot generation
Aug 8, 2024
193da65
Modify cluster setting name
Aug 14, 2024
fe2aaaf
Add more tests
Aug 14, 2024
afcc37a
Snapshot V2 delete changes
Aug 14, 2024
14c08ae
Merge branch 'main' into snapshot-pinned-timestamp
Aug 15, 2024
ec17028
Merge branch 'main' into snapshot-pinned-timestamp
Aug 20, 2024
6504169
Uncomment pin timestamp code
Aug 20, 2024
ebfb9d9
Merge branch 'snapshot-pinned-timestamp' into snapshot-pinned-timesta…
Aug 20, 2024
c91f1b5
Add code to unpin timestamp on deletion
Aug 20, 2024
626c2fa
Modify log messages
Aug 21, 2024
be65f6d
Add spotless check failure fix
Aug 21, 2024
62452ee
Fix completion listener for snapshot v2
Aug 21, 2024
00031ec
Elevate cluster state update priority for repository metadata update …
Aug 21, 2024
a59e980
Add test for missing snapshot
Aug 22, 2024
0c636ef
Add more integ tests
Aug 22, 2024
623f994
Add priority as IMMEDIATE for cluster state repo update task only for…
Aug 23, 2024
2e4795b
Fix build error
Aug 23, 2024
a6090a7
Fix spotless error
Aug 23, 2024
b5d012f
Add repository setting for snapshot v2
Aug 23, 2024
1394d8c
Merge branch 'main' into snapshot-pinned-timestamp
Aug 23, 2024
0b1a52e
Merge branch 'main' into snapshot-pinned-timestamp-delete
Aug 23, 2024
51c1cdf
Add integ test on multiple snapshots deletion in one delete request
Aug 26, 2024
80bf6cc
Address review comments
Aug 26, 2024
b0cbc08
Add integ test to verify snapshot creation if shallow copy repo setti…
Aug 26, 2024
38af0f6
Fix spotless vilation error
Aug 26, 2024
73376a8
Address review comment
Aug 26, 2024
39b57e3
Address review comments
Aug 26, 2024
e1eecbd
Add min version check for backward compatibility
Aug 26, 2024
e986243
Merge branch 'snapshot-pinned-timestamp' into snapshot-pinned-timesta…
Aug 27, 2024
ea71855
Move integ tests to DeleteSnapshotIT
Aug 27, 2024
9694dec
Merge branch 'main' into snapshot-pinned-timestamp-delete
Aug 28, 2024
14e7539
Refactor code
Aug 28, 2024
f4b4122
refactor code
Aug 28, 2024
cb03f59
refactor code
Aug 28, 2024
1f966fd
fix tests
Aug 28, 2024
b97707a
fix build failure
Aug 28, 2024
a94642c
Refactor code
Aug 28, 2024
27f7554
Add support to cleanup remote store segments for deleted index
Aug 30, 2024
f1adb8e
refactor code
Aug 30, 2024
32ed74f
refactor code
Aug 31, 2024
2221437
Merge branch 'main' into snapshot-pinned-timestamp-delete
Sep 2, 2024
4998174
Fix tests
Sep 2, 2024
52573e0
remove pinned timestamp after updating repository data
Sep 2, 2024
6d5fd21
refactor code
Sep 2, 2024
e36786a
Add tests for deleted index cleanup
Sep 2, 2024
2b987b0
Address review comments
Sep 2, 2024
4b7376b
Refactor code
Sep 2, 2024
5fa4237
modify java doc
Sep 2, 2024
88c3be8
fix test
Sep 2, 2024
8db5790
modify integ test for deleted index
Sep 3, 2024
1f49456
Merge branch 'main' into snapshot-pinned-timestamp-delete
Sep 3, 2024
3fca63c
remove merge conflict
Sep 3, 2024
f7e2609
Add cleanup for deleted index if there are snapshots referencing it
Sep 3, 2024
d075b6b
fix integ test
Sep 3, 2024
f621384
Merge branch 'main' into snapshot-pinned-timestamp-delete
Sep 3, 2024
de457b6
fix spotless failure
Sep 3, 2024
1f38f23
address review comments
Sep 3, 2024
8fe6a24
address review comments
Sep 3, 2024
5b33e85
refactor code
Sep 3, 2024
54bee31
fix precommit failure
Sep 3, 2024
c5ef40f
Merge branch 'main' into snapshot-pinned-timestamp-delete
Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand All @@ -38,6 +40,8 @@
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -370,6 +374,259 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
}, 30, TimeUnit.SECONDS);
}

public void testDeleteShallowCopyV2() throws Exception {
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
disableRepoConsistencyCheck("Remote store repository is being used in the test");

final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
String snapshotName2 = "test-create-snapshot2";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

createIndex(indexName1, getRemoteStoreBackedIndexSettings());
createIndex(indexName2, getRemoteStoreBackedIndexSettings());

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexRandomDocs(indexName1, numDocsInIndex1);
indexRandomDocs(indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

createIndex(indexName3, getRemoteStoreBackedIndexSettings());
indexRandomDocs(indexName3, 10);
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));

assertAcked(client().admin().indices().prepareDelete(indexName1));
Thread.sleep(100);

AcknowledgedResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotName2)
.setSnapshots(snapshotName2)
.get();
assertTrue(deleteResponse.isAcknowledged());

// test delete non-existent snapshot
assertThrows(
SnapshotMissingException.class,
() -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get()
);

}

public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();

internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
String snapshotName2 = "test-create-snapshot2";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

createIndex(indexName1, getRemoteStoreBackedIndexSettings());

createIndex(indexName2, getRemoteStoreBackedIndexSettings());

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexRandomDocs(indexName1, numDocsInIndex1);
indexRandomDocs(indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

createIndex(indexName3, getRemoteStoreBackedIndexSettings());
indexRandomDocs(indexName3, 10);

CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));

AcknowledgedResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotName1, snapshotName2)
.setSnapshots(snapshotName2)
.get();
assertTrue(deleteResponse.isAcknowledged());
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved

// test delete non-existent snapshot
assertThrows(
SnapshotMissingException.class,
() -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get()
);

}

public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Path absolutePath1 = randomRepoPath().toAbsolutePath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);

internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
assertAcked(
client().admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, "snap1")
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo();

CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, "snap2")
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo2 = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2"));
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

logger.info("--> delete snapshot 2");
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo2.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(indexPath), comparesEqualTo(0));
} catch (Exception e) {}
}, 30, TimeUnit.SECONDS);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
Expand All @@ -381,4 +638,12 @@ private List<String> createNSnapshots(String repoName, int count) {
logger.info("--> created {} in [{}]", snapshotNames, repoName);
return snapshotNames;
}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Settings settings = Settings.builder()
.put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
return settings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -96,6 +97,8 @@ public final class TransportCleanupRepositoryAction extends TransportClusterMana

private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;

private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory;

@Override
protected String executor() {
return ThreadPool.Names.SAME;
Expand Down Expand Up @@ -123,6 +126,7 @@ public TransportCleanupRepositoryAction(
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
this.remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down Expand Up @@ -272,6 +276,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
remoteStoreLockManagerFactory,
remoteSegmentStoreDirectoryFactory,
ActionListener.wrap(result -> after(null, result), e -> after(e, null))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* An interface for providing basic metadata about a blob.
*
* @opensearch.internal
*/
@ExperimentalApi
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
public interface BlobMetadata {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.opensearch.common.blobstore.stream.write;

import org.opensearch.common.annotation.ExperimentalApi;

/**
* WritePriority for upload
*
* @opensearch.internal
*/
@ExperimentalApi
public enum WritePriority {
// Used for segment transfers during refresh, flush or merges
NORMAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.common.remote;

import org.opensearch.common.annotation.ExperimentalApi;

import java.util.List;

/**
* Parameters which can be used to construct a blob path
*
*/
@ExperimentalApi
public class BlobPathParameters {

private final List<String> pathTokens;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.common.remote;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.compress.Compressor;

Expand All @@ -17,6 +18,7 @@
* The abstract class which represents a {@link RemoteWriteableEntity} that can be written to a store
* @param <T> the entity to be written
*/
@ExperimentalApi
public abstract class RemoteWriteableBlobEntity<T> implements RemoteWriteableEntity<T> {

protected String blobFileName;
Expand Down
Loading
Loading