Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNAPSHOTS: Allow Parallel Restore Operations #36397

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8e903ec
SNAPSHOTS: Allow Parallel Restore Operations
original-brownbear Dec 7, 2018
cee0adf
revert noisy cleanup
original-brownbear Dec 8, 2018
7ceb2b6
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 10, 2018
9d18495
Add tests for parallel restore from single snapshot
original-brownbear Dec 10, 2018
7411054
Add UUIDs and the ability to restore the same snapshot in parallel
original-brownbear Dec 11, 2018
f114c97
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
c0286da
CR: Add uuid to restore operations
original-brownbear Dec 11, 2018
d3cc957
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
8076986
Javadoc and cleanup uuid handling
original-brownbear Dec 11, 2018
f12d1fb
fix javadoc
original-brownbear Dec 11, 2018
d510c78
no need for uuid on restoreinfo
original-brownbear Dec 11, 2018
87d3bf3
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
1de0350
Use map style access to restore in progress entries
original-brownbear Dec 11, 2018
b9e3a05
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
2a8573d
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
3c6cd85
remove noisy change
original-brownbear Dec 11, 2018
d4e19f5
remove noisy change
original-brownbear Dec 11, 2018
ff0596d
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
389ceee
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 11, 2018
7731a9d
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 12, 2018
183ded2
CR comments
original-brownbear Dec 12, 2018
144aa26
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 12, 2018
808eac4
CR comments
original-brownbear Dec 12, 2018
5c26af3
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 13, 2018
70e29d0
CR comments
original-brownbear Dec 13, 2018
615bd69
Merge remote-tracking branch 'elastic/master' into parallel-restores
original-brownbear Dec 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ Response:
"repository" : "my_repository",
"snapshot" : "my_snapshot",
"index" : "index1",
"version" : "{version}"
"version" : "{version}",
"restoreUUID": "PDh1ZAOaRbiGIVtCvZOMww"
},
"target" : {
"id" : "ryqJ5lO5S4-lSFbGntkEkg",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ protected void masterOperation(final RestoreSnapshotRequest request, final Clust
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
String uuid = restoreCompletionResponse.getUuid();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lookup of in progress restores works by String uuid now :)


ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot);
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
Expand Down
113 changes: 81 additions & 32 deletions server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
Expand All @@ -33,36 +34,33 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

/**
* Meta data about restore processes that are currently executing
*/
public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom {
public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom, Iterable<RestoreInProgress.Entry> {

/**
* Fallback UUID used for restore operations that were started before v7.0 and don't have a uuid in the cluster state.
*/
public static final String BWC_UUID = new UUID(0, 0).toString();

public static final String TYPE = "restore";

private final List<Entry> entries;
private final ImmutableOpenMap<String, Entry> entries;

/**
* Constructs new restore metadata
*
* @param entries list of currently running restore processes
* @param entries map of currently running restore processes keyed by their restore uuid
*/
public RestoreInProgress(Entry... entries) {
this.entries = Arrays.asList(entries);
}

/**
* Returns list of currently running restore processes
*
* @return list of currently running restore processes
*/
public List<Entry> entries() {
return this.entries;
private RestoreInProgress(ImmutableOpenMap<String, Entry> entries) {
this.entries = entries;
}

@Override
Expand All @@ -84,20 +82,48 @@ public int hashCode() {

@Override
public String toString() {
StringBuilder builder = new StringBuilder("RestoreInProgress[");
for (int i = 0; i < entries.size(); i++) {
builder.append(entries.get(i).snapshot().getSnapshotId().getName());
if (i + 1 < entries.size()) {
builder.append(",");
}
return new StringBuilder("RestoreInProgress[").append(entries).append("]").toString();
}

public Entry get(String restoreUUID) {
return entries.get(restoreUUID);
}

public boolean isEmpty() {
return entries.isEmpty();
}

@Override
public Iterator<Entry> iterator() {
return entries.valuesIt();
}

public static final class Builder {

private final ImmutableOpenMap.Builder<String, Entry> entries = ImmutableOpenMap.builder();

public Builder() {
}

public Builder(RestoreInProgress restoreInProgress) {
entries.putAll(restoreInProgress.entries);
}

public Builder add(Entry entry) {
entries.put(entry.uuid, entry);
return this;
}

public RestoreInProgress build() {
return new RestoreInProgress(entries.build());
}
return builder.append("]").toString();
}

/**
* Restore metadata
*/
public static class Entry {
private final String uuid;
private final State state;
private final Snapshot snapshot;
private final ImmutableOpenMap<ShardId, ShardRestoreStatus> shards;
Expand All @@ -106,12 +132,14 @@ public static class Entry {
/**
* Creates new restore metadata
*
* @param uuid uuid of the restore
* @param snapshot snapshot
* @param state current state of the restore process
* @param indices list of indices being restored
* @param shards map of shards being restored to their current restore status
*/
public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
public Entry(String uuid, Snapshot snapshot, State state, List<String> indices,
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.indices = Objects.requireNonNull(indices);
Expand All @@ -120,6 +148,15 @@ public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpen
} else {
this.shards = shards;
}
this.uuid = Objects.requireNonNull(uuid);
}

/**
* Returns restore uuid
* @return restore uuid
*/
public String uuid() {
return uuid;
}

/**
Expand Down Expand Up @@ -167,15 +204,16 @@ public boolean equals(Object o) {
return false;
}
Entry entry = (Entry) o;
return snapshot.equals(entry.snapshot) &&
return uuid.equals(entry.uuid) &&
snapshot.equals(entry.snapshot) &&
state == entry.state &&
indices.equals(entry.indices) &&
shards.equals(entry.shards);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, state, indices, shards);
return Objects.hash(uuid, snapshot, state, indices, shards);
}
}

Expand Down Expand Up @@ -394,8 +432,15 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
}

public RestoreInProgress(StreamInput in) throws IOException {
Entry[] entries = new Entry[in.readVInt()];
for (int i = 0; i < entries.length; i++) {
int count = in.readVInt();
final ImmutableOpenMap.Builder<String, Entry> entriesBuilder = ImmutableOpenMap.builder(count);
for (int i = 0; i < count; i++) {
final String uuid;
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
uuid = in.readString();
} else {
uuid = BWC_UUID;
}
Snapshot snapshot = new Snapshot(in);
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
Expand All @@ -410,9 +455,9 @@ public RestoreInProgress(StreamInput in) throws IOException {
ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in);
builder.put(shardId, shardState);
}
entries[i] = new Entry(snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build());
entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build()));
}
this.entries = Arrays.asList(entries);
this.entries = entriesBuilder.build();
}

/**
Expand All @@ -421,7 +466,11 @@ public RestoreInProgress(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(entries.size());
for (Entry entry : entries) {
for (ObjectCursor<Entry> v : entries.values()) {
Entry entry = v.value;
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeString(entry.uuid);
}
entry.snapshot().writeTo(out);
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
Expand All @@ -442,8 +491,8 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
for (Entry entry : entries) {
toXContent(entry, builder, params);
for (ObjectCursor<Entry> entry : entries.values()) {
toXContent(entry.value, builder, params);
}
builder.endArray();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -208,22 +209,33 @@ public String toString() {
* recovery from a snapshot
*/
public static class SnapshotRecoverySource extends RecoverySource {
private final String restoreUUID;
private final Snapshot snapshot;
private final String index;
private final Version version;

public SnapshotRecoverySource(Snapshot snapshot, Version version, String index) {
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(index);
}

SnapshotRecoverySource(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
restoreUUID = in.readString();
} else {
restoreUUID = RestoreInProgress.BWC_UUID;
}
snapshot = new Snapshot(in);
version = Version.readVersion(in);
index = in.readString();
}

public String restoreUUID() {
return restoreUUID;
}

public Snapshot snapshot() {
return snapshot;
}
Expand All @@ -238,6 +250,9 @@ public Version version() {

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write this before snapshot field?

out.writeString(restoreUUID);
}
snapshot.writeTo(out);
Version.writeVersion(version, out);
out.writeString(index);
Expand All @@ -253,12 +268,13 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
builder.field("repository", snapshot.getRepository())
.field("snapshot", snapshot.getSnapshotId().getName())
.field("version", version.toString())
.field("index", index);
.field("index", index)
.field("restoreUUID", restoreUUID);
}

@Override
public String toString() {
return "snapshot recovery from " + snapshot.toString();
return "snapshot recovery [" + restoreUUID + "] from " + snapshot;
}

@Override
Expand All @@ -271,12 +287,13 @@ public boolean equals(Object o) {
}

SnapshotRecoverySource that = (SnapshotRecoverySource) o;
return snapshot.equals(that.snapshot) && index.equals(that.index) && version.equals(that.version);
return restoreUUID.equals(that.restoreUUID) && snapshot.equals(that.snapshot)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also adapt hashCode

&& index.equals(that.index) && version.equals(that.version);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, index, version);
return Objects.hash(restoreUUID, snapshot, index, version);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.snapshots.Snapshot;

/**
* This {@link AllocationDecider} prevents shards that have failed to be
Expand All @@ -46,25 +45,24 @@ public Decision canAllocate(final ShardRouting shardRouting, final RoutingAlloca
return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
}

final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot();
RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource;
final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);

if (restoresInProgress != null) {
for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) {
if (restoreInProgress.snapshot().equals(snapshot)) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
break;
RestoreInProgress.Entry restoreInProgress = restoresInProgress.get(source.restoreUUID());
if (restoreInProgress != null) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
}
}
return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " +
"manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " +
"allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
"allocation of an empty primary shard",
source.snapshot(), shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
}

@Override
Expand Down
Loading