Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate RecoveryState details for shallow snapshot restore #15353

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
Expand All @@ -34,6 +36,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -73,6 +76,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -589,6 +593,37 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// ensure recovery details are non-zero
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(restoredIndexName1).execute().actionGet();
assertEquals(1, recoveryResponse.getTotalShards());
assertEquals(1, recoveryResponse.getSuccessfulShards());
assertEquals(0, recoveryResponse.getFailedShards());
assertEquals(1, recoveryResponse.shardRecoveryStates().size());
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(restoredIndexName1));
assertEquals(1, recoveryResponse.shardRecoveryStates().get(restoredIndexName1).size());

RecoveryState recoveryState = recoveryResponse.shardRecoveryStates().get(restoredIndexName1).get(0);
assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage());
assertEquals(0, recoveryState.getShardId().getId());
assertTrue(recoveryState.getPrimary());
assertEquals(RecoverySource.Type.SNAPSHOT, recoveryState.getRecoverySource().getType());
assertThat(recoveryState.getIndex().time(), greaterThanOrEqualTo(0L));

// ensure populated file details
assertTrue(recoveryState.getIndex().totalFileCount() > 0);
assertTrue(recoveryState.getIndex().totalRecoverFiles() > 0);
assertTrue(recoveryState.getIndex().recoveredFileCount() > 0);
assertThat(recoveryState.getIndex().recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(recoveryState.getIndex().recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
assertFalse(recoveryState.getIndex().fileDetails().isEmpty());

// ensure populated bytes details
assertTrue(recoveryState.getIndex().recoveredBytes() > 0L);
assertTrue(recoveryState.getIndex().totalBytes() > 0L);
assertTrue(recoveryState.getIndex().totalRecoverBytes() > 0L);
assertThat(recoveryState.getIndex().recoveredBytesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(recoveryState.getIndex().recoveredBytesPercent(), lessThanOrEqualTo(100.0f));

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5117,10 +5117,23 @@
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();
ltaragi marked this conversation as resolved.
Show resolved Hide resolved

try {
final Directory storeDirectory;
if (recoveryState.getStage() == RecoveryState.Stage.INDEX) {
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());

Check warning on line 5124 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5124

Added line #L5124 was not covered by tests
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());

Check warning on line 5126 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5126

Added line #L5126 was not covered by tests
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);

Check warning on line 5128 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5128

Added line #L5128 was not covered by tests
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);

Check warning on line 5130 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5130

Added line #L5130 was not covered by tests
}
}

Check warning on line 5132 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5132

Added line #L5132 was not covered by tests
} else {
storeDirectory = store.directory();
}

String segmentsNFile = copySegmentFiles(
storeDirectory,
sourceRemoteDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2896,9 +2896,9 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
RecoverySource.ExistingStoreRecoverySource.INSTANCE
);
routing = ShardRoutingHelper.newWithRestoreSource(routing, new RecoverySource.EmptyStoreRecoverySource());

target = reinitShard(target, routing);

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("from snapshot", new RecoveryState(routing, localNode, null));
target.syncSegmentsFromGivenRemoteSegmentStore(false, tempRemoteSegmentDirectory, primaryTerm, commitGeneration);
RemoteSegmentStoreDirectory remoteStoreDirectory = ((RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) target
.remoteStore()
Expand Down
Loading