Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNAPSHOTS: Allow Parallel Restore Operations #36397

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8e903ec
SNAPSHOTS: Allow Parallel Restore Operations
original-brownbear Dec 7, 2018
cee0adf
revert noisy cleanup
original-brownbear Dec 8, 2018
7ceb2b6
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 10, 2018
9d18495
Add tests for parallel restore from single snapshot
original-brownbear Dec 10, 2018
7411054
Add UUIDs and the ability to restore the same snapshot in parallel
original-brownbear Dec 11, 2018
f114c97
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
c0286da
CR: Add uuid to restore operations
original-brownbear Dec 11, 2018
d3cc957
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
8076986
Javadoc and cleanup uuid handling
original-brownbear Dec 11, 2018
f12d1fb
fix javadoc
original-brownbear Dec 11, 2018
d510c78
no need for uuid on restoreinfo
original-brownbear Dec 11, 2018
87d3bf3
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
1de0350
Use map style access to restore in progress entries
original-brownbear Dec 11, 2018
b9e3a05
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
2a8573d
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
3c6cd85
remove noisy change
original-brownbear Dec 11, 2018
d4e19f5
remove noisy change
original-brownbear Dec 11, 2018
ff0596d
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
389ceee
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
7731a9d
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 12, 2018
183ded2
CR comments
original-brownbear Dec 12, 2018
144aa26
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 12, 2018
808eac4
CR comments
original-brownbear Dec 12, 2018
5c26af3
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 13, 2018
70e29d0
CR comments
original-brownbear Dec 13, 2018
615bd69
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,13 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener<R

@Override
public ClusterState execute(ClusterState currentState) {
// Check if another restore process is already running - cannot run two restore processes at the
// same time
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null && !restoreInProgress.entries().isEmpty()) {
throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster");
if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) {
// Check if another restore process is already running - cannot run two restore processes at the
// same time in versions prior to 7.0
if (restoreInProgress != null && !restoreInProgress.entries().isEmpty()) {
throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster");
}
}
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
Expand Down Expand Up @@ -330,7 +332,14 @@ public ClusterState execute(ClusterState currentState) {

shards = shardsBuilder.build();
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(indices.keySet())), shards);
builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry));
List<RestoreInProgress.Entry> newEntries;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we did assume only a single restore in parallel previously, now we have to add restoreEntry to the existing list if it's there.

original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
if (restoreInProgress != null) {
newEntries = new ArrayList<>(restoreInProgress.entries());
} else {
newEntries = new ArrayList<>(1);
}
newEntries.add(restoreEntry);
builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(newEntries.toArray(new RestoreInProgress.Entry[0])));
} else {
shards = ImmutableOpenMap.of();
}
Expand Down Expand Up @@ -617,7 +626,7 @@ public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
for (RestoreInProgress.Entry entry : oldRestore.entries()) {
Snapshot snapshot = entry.snapshot();
Updates updates = shardChanges.get(snapshot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also key the shardChanges by restore UUID?

if (updates.shards.isEmpty() == false) {
if (updates != null && updates.shards.isEmpty() == false) {
Copy link
Member Author

@original-brownbear original-brownbear Dec 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now having an update here doesn't necessarily mean an update for every snapshot so we need the null check.

ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(entry.shards());
for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.shards.entrySet()) {
shardsBuilder.put(shard.getKey(), shard.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should now only be updated if the previous entry is not completed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added that check now :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that if the respective ShardRestoreStatus is already completed, there is no need to update it here.

Expand All @@ -630,7 +639,7 @@ public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
entries.add(entry);
}
}
return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[0]));
} else {
return oldRestore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.StoredScriptsIT;
import org.elasticsearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -3510,6 +3511,70 @@ public void testSnapshottingWithMissingSequenceNumbers() {
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
}

public void testParallelRestoreOperations() {
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-restore-snapshot-repo";
String snapshotName1 = "test-restore-snapshot1";
String snapshotName2 = "test-restore-snapshot2";
String absolutePath = randomRepoPath().toAbsolutePath().toString();
logger.info("Path [{}]", absolutePath);
String restoredIndexName1 = indexName1 + "-restored";
String restoredIndexName2 = indexName2 + "-restored";
String typeName = "actions";
String expectedValue = "expected";

Client client = client();
// Write a document
String docId = Integer.toString(randomInt());
index(indexName1, typeName, docId, "value", expectedValue);

String docId2 = Integer.toString(randomInt());
index(indexName2, typeName, docId2, "value", expectedValue);

logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", absolutePath)
));

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards()));
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName1)
.setWaitForCompletion(false)
.setRenamePattern(indexName1)
.setRenameReplacement(restoredIndexName1)
.get();
RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName2)
.setWaitForCompletion(false)
.setRenamePattern(indexName2)
.setRenameReplacement(restoredIndexName2)
.get();
assertThat(restoreSnapshotResponse1.status(), equalTo(RestStatus.ACCEPTED));
assertThat(restoreSnapshotResponse2.status(), equalTo(RestStatus.ACCEPTED));
ensureGreen(restoredIndexName1, restoredIndexName2);
ensureGreen(restoredIndexName2, restoredIndexName2);
assertThat(client.prepareGet(restoredIndexName1, typeName, docId).get().isExists(), equalTo(true));
assertThat(client.prepareGet(restoredIndexName2, typeName, docId2).get().isExists(), equalTo(true));
}

@TestLogging("org.elasticsearch.snapshots:TRACE")
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
final Client client = client();
Expand Down