Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Write shard level metadata blob when snapshotting searchable snapshot #14492

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
Expand Down Expand Up @@ -53,11 +54,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -132,21 +135,24 @@ public void testCreateSearchableSnapshot() throws Exception {

public void testSnapshottingSearchableSnapshots() throws Exception {
final String repoName = "test-repo";
final String initSnapName = "initial-snapshot";
final String indexName = "test-idx";
final String repeatSnapNamePrefix = "test-repeated-snap-";
final String repeatIndexNamePrefix = indexName + "-copy-";
final Client client = client();

// create an index, add data, snapshot it, then delete it
internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, "initial-snapshot", repoName, indexName);
takeSnapshot(client, initSnapName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

// restore the index as a searchable snapshot
internalCluster().ensureAtLeastNumSearchNodes(1);
client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "initial-snapshot")
.prepareRestoreSnapshot(repoName, initSnapName)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy-0")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
Expand All @@ -159,7 +165,7 @@ public void testSnapshottingSearchableSnapshots() throws Exception {

// Test that the searchable snapshot index can continue to be snapshotted and restored
for (int i = 0; i < 4; i++) {
final String repeatedSnapshotName = "test-repeated-snap-" + i;
final String repeatedSnapshotName = repeatSnapNamePrefix + i;
takeSnapshot(client, repeatedSnapshotName, repoName);
deleteIndicesAndEnsureGreen(client, "_all");
client.admin()
Expand All @@ -181,21 +187,34 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
final Map<String, List<String>> snapshotInfoMap = response.getSnapshots()
.stream()
.collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices));
assertEquals(
Map.of(
"initial-snapshot",
List.of("test-idx"),
"test-repeated-snap-0",
List.of("test-idx-copy-0"),
"test-repeated-snap-1",
List.of("test-idx-copy-1"),
"test-repeated-snap-2",
List.of("test-idx-copy-2"),
"test-repeated-snap-3",
List.of("test-idx-copy-3")
),
snapshotInfoMap
);
final Map<String, List<String>> expect = new HashMap<>();
expect.put(initSnapName, List.of(indexName));
IntStream.range(0, 4).forEach(i -> expect.put(repeatSnapNamePrefix + i, List.of(repeatIndexNamePrefix + i)));
assertEquals(expect, snapshotInfoMap);

String[] snapNames = new String[5];
IntStream.range(0, 4).forEach(i -> snapNames[i] = repeatSnapNamePrefix + i);
snapNames[4] = initSnapName;
SnapshotsStatusResponse snapshotsStatusResponse = client.admin()
.cluster()
.prepareSnapshotStatus(repoName)
.addSnapshots(snapNames)
.execute()
.actionGet();
snapshotsStatusResponse.getSnapshots().forEach(s -> {
String snapName = s.getSnapshot().getSnapshotId().getName();
assertEquals(1, s.getIndices().size());
assertEquals(1, s.getShards().size());
if (snapName.equals("initial-snapshot")) {
assertNotNull(s.getIndices().get("test-idx"));
assertTrue(s.getShards().get(0).getStats().getTotalFileCount() > 0);
} else {
assertTrue(snapName.startsWith(repeatSnapNamePrefix));
assertEquals(1, s.getIndices().size());
assertNotNull(s.getIndices().get(repeatIndexNamePrefix + snapName.substring(repeatSnapNamePrefix.length())));
assertEquals(0L, s.getShards().get(0).getStats().getTotalFileCount());
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2888,9 +2888,12 @@
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {
if (store.indexSettings().isRemoteSnapshot()) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot) then no need to snapshot the shard
indexCommitPointFiles = List.of();

Check warning on line 2893 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L2893

Added line #L2893 was not covered by tests
} else if (filesFromSegmentInfos == null) {
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,58 +277,52 @@
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
if (isRemoteSnapshot(shardId)) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot)
// then no need to snapshot the shard and can immediately notify success.
notifySuccessfulSnapshotShard(snapshot, shardId, snapshotStatus.generation());
} else {
assert SnapshotsService.useShardGenerations(entry.version())
|| ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null
: "Found non-null, non-numeric shard generation ["
+ snapshotStatus.generation()
+ "] for snapshot with old-format compatibility";
snapshot(
shardId,
snapshot,
indexId,
entry.userMetadata(),
snapshotStatus,
entry.version(),
entry.remoteStoreIndexShallowCopy(),
new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
assert SnapshotsService.useShardGenerations(entry.version())
|| ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null
: "Found non-null, non-numeric shard generation ["
+ snapshotStatus.generation()

Check warning on line 283 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L283

Added line #L283 was not covered by tests
+ "] for snapshot with old-format compatibility";
snapshot(
shardId,
snapshot,
indexId,
entry.userMetadata(),
snapshotStatus,
entry.version(),
entry.remoteStoreIndexShallowCopy(),
new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(

Check warning on line 300 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L299-L300

Added lines #L299 - L300 were not covered by tests
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),

Check warning on line 303 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L303

Added line #L303 was not covered by tests
lastSnapshotStatus,
snapshotStatus.generation()

Check warning on line 305 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L305

Added line #L305 was not covered by tests
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}

@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);

Check warning on line 316 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L315-L316

Added lines #L315 - L316 were not covered by tests
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
}
);
}
}
);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@

import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress;
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
Expand Down Expand Up @@ -143,7 +142,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
}
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(repository, blobContainer, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData);
return null;
} catch (AssertionError e) {
return e;
Expand All @@ -167,31 +166,24 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe
assertTrue(indexGenerations.length <= 2);
}

private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData)
throws IOException {
private static void assertShardIndexGenerations(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final BlobContainer indicesContainer = repoRoot.children().get("indices");
for (IndexId index : shardGenerations.indices()) {
final List<String> gens = shardGenerations.getGens(index);
if (gens.isEmpty() == false) {
final BlobContainer indexContainer = indicesContainer.children().get(index.getId());
final Map<String, BlobContainer> shardContainers = indexContainer.children();
if (isRemoteSnapshot(repository, repositoryData, index)) {
// If the source of the data is another snapshot (i.e. searchable snapshot)
// then assert that there is no shard data (because it exists in the source snapshot)
assertThat(shardContainers, anEmptyMap());
} else {
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
}
}
Expand Down
Loading