Skip to content

Commit

Permalink
fix snapshot status
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr committed Apr 15, 2024
1 parent 6bc04b4 commit 6045092
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
Expand Down Expand Up @@ -52,11 +53,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -127,21 +130,24 @@ public void testCreateSearchableSnapshot() throws Exception {

public void testSnapshottingSearchableSnapshots() throws Exception {
final String repoName = "test-repo";
final String initSnapName = "initial-snapshot";
final String indexName = "test-idx";
final String repeatSnapNamePrefix = "test-repeated-snap-";
final String repeatIndexNamePrefix = indexName + "-copy-";
final Client client = client();

// create an index, add data, snapshot it, then delete it
internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, "initial-snapshot", repoName, indexName);
takeSnapshot(client, initSnapName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

// restore the index as a searchable snapshot
internalCluster().ensureAtLeastNumSearchNodes(1);
client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "initial-snapshot")
.prepareRestoreSnapshot(repoName, initSnapName)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy-0")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
Expand All @@ -154,7 +160,7 @@ public void testSnapshottingSearchableSnapshots() throws Exception {

// Test that the searchable snapshot index can continue to be snapshotted and restored
for (int i = 0; i < 4; i++) {
final String repeatedSnapshotName = "test-repeated-snap-" + i;
final String repeatedSnapshotName = repeatSnapNamePrefix + i;
takeSnapshot(client, repeatedSnapshotName, repoName);
deleteIndicesAndEnsureGreen(client, "_all");
client.admin()
Expand All @@ -176,21 +182,34 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
final Map<String, List<String>> snapshotInfoMap = response.getSnapshots()
.stream()
.collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices));
assertEquals(
Map.of(
"initial-snapshot",
List.of("test-idx"),
"test-repeated-snap-0",
List.of("test-idx-copy-0"),
"test-repeated-snap-1",
List.of("test-idx-copy-1"),
"test-repeated-snap-2",
List.of("test-idx-copy-2"),
"test-repeated-snap-3",
List.of("test-idx-copy-3")
),
snapshotInfoMap
);
final Map<String, List<String>> expect = new HashMap<>();
expect.put(initSnapName, List.of(indexName));
IntStream.range(0, 4).forEach(i -> expect.put(repeatSnapNamePrefix + i, List.of(repeatIndexNamePrefix + i)));
assertEquals(expect, snapshotInfoMap);

String[] snapNames = new String[5];
IntStream.range(0, 4).forEach(i -> snapNames[i] = repeatSnapNamePrefix + i);
snapNames[4] = initSnapName;
SnapshotsStatusResponse snapshotsStatusResponse = client.admin()
.cluster()
.prepareSnapshotStatus(repoName)
.addSnapshots(snapNames)
.execute()
.actionGet();
snapshotsStatusResponse.getSnapshots().forEach(s -> {
String snapName = s.getSnapshot().getSnapshotId().getName();
assertEquals(1, s.getIndices().size());
assertEquals(1, s.getShards().size());
if (snapName.equals("initial-snapshot")) {
assertNotNull(s.getIndices().get("test-idx"));
assertTrue(s.getShards().get(0).getStats().getTotalFileCount() > 0);
} else {
assertTrue(snapName.startsWith(repeatSnapNamePrefix));
assertEquals(1, s.getIndices().size());
assertNotNull(s.getIndices().get(repeatIndexNamePrefix + snapName.substring(repeatSnapNamePrefix.length())));
assertEquals(0L, s.getShards().get(0).getStats().getTotalFileCount());
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2787,9 +2787,12 @@ public void snapshotShard(
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {
if (store.indexSettings().isRemoteSnapshot()) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot) then no need to snapshot the shard
indexCommitPointFiles = List.of();
} else if (filesFromSegmentInfos == null) {
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,53 +276,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexS
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
if (isRemoteSnapshot(shardId)) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot)
// then no need to snapshot the shard and can immediately notify success.
notifySuccessfulSnapshotShard(snapshot, shardId, snapshotStatus.generation());
} else {
snapshot(
shardId,
snapshot,
indexId,
entry.userMetadata(),
snapshotStatus,
entry.version(),
entry.remoteStoreIndexShallowCopy(),
new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
snapshot(
shardId,
snapshot,
indexId,
entry.userMetadata(),
snapshotStatus,
entry.version(),
entry.remoteStoreIndexShallowCopy(),
new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}

@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
}
);
}
}
);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
}
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(repository, blobContainer, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData);
return null;
} catch (AssertionError e) {
return e;
Expand All @@ -167,7 +167,7 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe
assertTrue(indexGenerations.length <= 2);
}

private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData)
private static void assertShardIndexGenerations(BlobContainer repoRoot, RepositoryData repositoryData)
throws IOException {
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final BlobContainer indicesContainer = repoRoot.children().get("indices");
Expand All @@ -176,22 +176,16 @@ private static void assertShardIndexGenerations(BlobStoreRepository repository,
if (gens.isEmpty() == false) {
final BlobContainer indexContainer = indicesContainer.children().get(index.getId());
final Map<String, BlobContainer> shardContainers = indexContainer.children();
if (isRemoteSnapshot(repository, repositoryData, index)) {
// If the source of the data is another snapshot (i.e. searchable snapshot)
// then assert that there is no shard data (because it exists in the source snapshot)
assertThat(shardContainers, anEmptyMap());
} else {
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
}
}
Expand Down

0 comments on commit 6045092

Please sign in to comment.