Skip to content

Commit

Permalink
Snapshot V2 delete changes
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
Anshu Agarwal committed Aug 14, 2024
1 parent fe2aaaf commit afcc37a
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexService;
Expand All @@ -33,6 +35,8 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotRestoreException;
Expand Down Expand Up @@ -806,4 +810,82 @@ public void testCreateSnapshotV2() throws Exception {

}

public void testDeleteShallowCopyV2() throws Exception {

Settings snapshotSettings = Settings.builder().put("snapshot.snapshot_v2", true).build();
// Settings snapshotSettings = Settings.builder().put("snapshot.max_concurrent_operations", 1000).build();
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(snapshotSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put(snapshotSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put(snapshotSettings).build());

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
String snapshotName2 = "test-create-snapshot2";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
)
);

Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

indexDocuments(client, indexName2, 10);
indexDocuments(client, indexName1, 10);
createIndex(indexName3);
indexDocuments(client, indexName3, 10);
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));

AcknowledgedResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotName2)
.setSnapshots(snapshotName2)
.get();
assertTrue(deleteResponse.isAcknowledged());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ default void deleteSnapshotsAndReleaseLockFiles(
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
boolean isSnapshotV2,
ActionListener<RepositoryData> listener
) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ public void deleteSnapshotsAndReleaseLockFiles(
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
boolean isSnapshotV2,
ActionListener<RepositoryData> listener
) {
if (isReadOnly()) {
Expand All @@ -918,6 +919,7 @@ protected void doRun() throws Exception {
repositoryData,
repositoryMetaVersion,
remoteStoreLockManagerFactory,
isSnapshotV2,
listener
);
}
Expand All @@ -942,6 +944,7 @@ public void deleteSnapshots(
repositoryStateId,
repositoryMetaVersion,
null, // Passing null since no remote store lock files need to be cleaned up.
false,
listener
);
}
Expand Down Expand Up @@ -1016,6 +1019,7 @@ private void doDeleteShardSnapshots(
RepositoryData repositoryData,
Version repoMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
boolean isSnapshotV2,
ActionListener<RepositoryData> listener
) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
Expand Down Expand Up @@ -1051,10 +1055,14 @@ private void doDeleteShardSnapshots(
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
int groupSize = 2;
if (isSnapshotV2) {
groupSize = 1;
}
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(updatedRepoData)),
2
groupSize
);
cleanupUnlinkedRootAndIndicesBlobs(
snapshotIds,
Expand All @@ -1064,13 +1072,16 @@ private void doDeleteShardSnapshots(
remoteStoreLockManagerFactory,
afterCleanupsListener
);
asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
snapshotIds,
writeShardMetaDataAndComputeDeletesStep.result(),
remoteStoreLockManagerFactory,
afterCleanupsListener
);
if (!isSnapshotV2) {
asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
snapshotIds,
writeShardMetaDataAndComputeDeletesStep.result(),
remoteStoreLockManagerFactory,
afterCleanupsListener
);
}

}, listener::onFailure);
}

Expand Down
84 changes: 72 additions & 12 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -617,6 +618,15 @@ private void updateSnapshotPinnedTimestamp(
// );
}

private void removeSnapshotPinnedTimestamp(
RepositoryData repositoryData,
Snapshot snapshot,
long timestampToPin,
ActionListener<RepositoryData> listener
) {

}

private static void ensureSnapshotNameNotRunning(
List<SnapshotsInProgress.Entry> runningSnapshots,
String repositoryName,
Expand Down Expand Up @@ -2462,18 +2472,68 @@ private void deleteSnapshotsFromRepository(
// the flag. This can be improved by having the info whether there ever were any shallow snapshot present in this repository
// or not in RepositoryData.
// SEE https://github.com/opensearch-project/OpenSearch/issues/8610
final boolean cleanupRemoteStoreLockFiles = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings());
if (cleanupRemoteStoreLockFiles) {
repository.deleteSnapshotsAndReleaseLockFiles(
snapshotIds,
repositoryData.getGenId(),
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
remoteStoreLockManagerFactory,
ActionListener.wrap(updatedRepoData -> {
logger.info("snapshots {} deleted", snapshotIds);
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
}, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData))
);
final boolean remoteStoreShallowCopyEnabled = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings());
if (remoteStoreShallowCopyEnabled) {
List<SnapshotId> snapshotsWithPinnedTimestamp = Collections.synchronizedList(new ArrayList<>());
List<SnapshotId> snapshotsWithLockFiles = Collections.synchronizedList(new ArrayList<>());

CountDownLatch latch = new CountDownLatch(1);

threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
for (SnapshotId snapshotId : snapshotIds) {
try {
SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
if (snapshotInfo.getPinnedTimestamp() > 0) {
snapshotsWithPinnedTimestamp.add(snapshotId);
} else {
snapshotsWithLockFiles.add(snapshotId);
}
} catch (Exception e) {
logger.warn("Failed to get snapshot info for {}", snapshotId, e);
removeSnapshotDeletionFromClusterState(deleteEntry, e, repositoryData);
}
}
} finally {
latch.countDown();
}
});
try {
latch.await();
if (snapshotsWithLockFiles.size() > 0) {
repository.deleteSnapshotsAndReleaseLockFiles(
snapshotIds,
repositoryData.getGenId(),
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
remoteStoreLockManagerFactory,
false,
ActionListener.wrap(updatedRepoData -> {
logger.info("snapshots {} deleted", snapshotsWithLockFiles);
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
}, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData))
);
}
if (snapshotsWithPinnedTimestamp.size() > 0) {
repository.deleteSnapshotsAndReleaseLockFiles(
snapshotIds,
repositoryData.getGenId(),
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
null,
true,
ActionListener.wrap(updatedRepoData -> {
logger.info("snapshots {} deleted", snapshotsWithPinnedTimestamp);
// removeSnapshotPinnedTimestamp();
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
}, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData))
);
}

} catch (InterruptedException e) {
logger.error("Interrupted while waiting for snapshot info processing", e);
Thread.currentThread().interrupt();
removeSnapshotDeletionFromClusterState(deleteEntry, e, repositoryData);
}

} else {
repository.deleteSnapshots(
snapshotIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,16 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito
.stream()
.noneMatch(shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) {
final Map<String, BlobMetadata> shardPathContents = shardContainer.listBlobs();

assertTrue(
shardPathContents.containsKey(
String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID())
)
|| shardPathContents.containsKey(
if (snapshotInfo.getPinnedTimestamp() == 0) {
assertTrue(
shardPathContents.containsKey(
String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID())
)
|| shardPathContents.containsKey(
String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID())
)
);
);
}

assertThat(
shardPathContents.keySet()
Expand Down

0 comments on commit afcc37a

Please sign in to comment.