Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not cancel ongoing recovery for noop copy on broken node #48265

Merged
merged 14 commits into from
Nov 1, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet());
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
}
}
Expand Down Expand Up @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(),
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
allocationStatus);
allocationStatus, currInfo.getFailedNodeIds());
ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource());
changes.unassignedInfoUpdated(shard, newInfo);
shard = updatedShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
Expand All @@ -38,8 +39,10 @@
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;

/**
* Holds additional information as to why the shard is in unassigned state.
Expand Down Expand Up @@ -213,6 +216,7 @@ public String value() {
private final String message;
private final Exception failure;
private final int failedAllocations;
private final Set<String> failedNodeIds;
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard

/**
Expand All @@ -223,7 +227,7 @@ public String value() {
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
AllocationStatus.NO_ATTEMPT, Collections.emptySet());
}

/**
Expand All @@ -234,9 +238,11 @@ public UnassignedInfo(Reason reason, String message) {
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
* @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.
* @param lastAllocationStatus the result of the last allocation attempt for this shard
* @param failedNodeIds a set of nodeIds that previously failed to allocate this shard
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations,
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus) {
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus,
Set<String> failedNodeIds) {
this.reason = Objects.requireNonNull(reason);
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
Expand All @@ -245,10 +251,12 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti
this.failure = failure;
this.failedAllocations = failedAllocations;
this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus);
this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds);
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) :
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving";
assert failedAllocations >= failedNodeIds.size() : "failedAllocations: " + failedAllocations + " failedNodeIds: " + failedNodeIds;
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}

public UnassignedInfo(StreamInput in) throws IOException {
Expand All @@ -262,6 +270,11 @@ public UnassignedInfo(StreamInput in) throws IOException {
this.failure = in.readException();
this.failedAllocations = in.readVInt();
this.lastAllocationStatus = AllocationStatus.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString));
} else {
this.failedNodeIds = Collections.emptySet();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -273,6 +286,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeException(failure);
out.writeVInt(failedAllocations);
lastAllocationStatus.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeCollection(failedNodeIds, StreamOutput::writeString);
}
}

/**
Expand Down Expand Up @@ -347,6 +363,13 @@ public AllocationStatus getLastAllocationStatus() {
return lastAllocationStatus;
}

/**
* Returns a set of nodeId that previously failed to allocate this shard
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
*/
public Set<String> getFailedNodeIds() {
return failedNodeIds;
}

/**
* Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings.
* Only relevant if shard is effectively delayed (see {@link #isDelayed()})
Expand Down Expand Up @@ -432,6 +455,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("details", details);
}
builder.field("allocation_status", lastAllocationStatus.value());
builder.field("failed_nodes", failedNodeIds);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -459,13 +483,16 @@ public boolean equals(Object o) {
if (reason != that.reason) {
return false;
}
if (message != null ? !message.equals(that.message) : that.message != null) {
if (Objects.equals(message, that.message) == false) {
return false;
}
if (lastAllocationStatus != that.lastAllocationStatus) {
return false;
}
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
if (Objects.equals(failure, that.failure) == false) {
return false;
}
return failedNodeIds.equals(that.failedNodeIds);
}

@Override
Expand All @@ -477,6 +504,7 @@ public int hashCode() {
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
result = 31 * result + lastAllocationStatus.hashCode();
result = 31 * result + failedNodeIds.hashCode();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -194,10 +196,18 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
shardToFail.shardId(), shardToFail, failedShard);
}
int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0;
final Set<String> failedNodeIds;
if (failedShard.unassignedInfo() != null) {
failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1);
failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds());
failedNodeIds.add(failedShard.currentNodeId());
} else {
failedNodeIds = Set.of(failedShard.currentNodeId());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message,
failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false,
AllocationStatus.NO_ATTEMPT);
AllocationStatus.NO_ATTEMPT, failedNodeIds);
if (failedShardEntry.markAsStale()) {
allocation.removeAllocationId(failedShard);
}
Expand Down Expand Up @@ -289,8 +299,8 @@ private void removeDelayMarkers(RoutingAllocation allocation) {
if (newComputedLeftDelayNanos == 0) {
unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()),
shardRouting.recoverySource(), allocation.changes());
unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(),
unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes());
}
}
}
Expand All @@ -308,7 +318,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) {
UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(),
unassignedInfo.getLastAllocationStatus()), shardRouting.recoverySource(), allocation.changes());
unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes());
}
}

Expand Down Expand Up @@ -421,7 +431,8 @@ private void disassociateDeadNodes(RoutingAllocation allocation) {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT,
Collections.emptySet());
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) {
unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(),
unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO),
unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO,
unassignedInfo.getFailedNodeIds()),
shardRouting.recoverySource(), allocation.changes());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

/**
Expand Down Expand Up @@ -139,7 +140,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
", " + shardRouting.unassignedInfo().getMessage();
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage,
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false,
shardRouting.unassignedInfo().getLastAllocationStatus());
shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet());
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
continue;
}

MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false);
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
Expand All @@ -110,7 +111,7 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f
"existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+
nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT);
UnassignedInfo.AllocationStatus.NO_ATTEMPT, Collections.emptySet());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo,
metaData.getIndexSafe(shard.index()), allocation.changes()));
Expand Down Expand Up @@ -186,7 +187,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
return AllocateUnassignedDecision.NOT_TAKEN;
}

MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain);
MatchingNodes matchingNodes = findMatchingNodes(
unassignedShard, allocation, false, primaryNode, primaryStore, shardStores, explain);
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";

List<NodeAllocationResult> nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions);
Expand Down Expand Up @@ -297,14 +299,18 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore
return nodeFilesStore.storeFilesMetaData();
}

private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean ignorePreviousFailedNodes,
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
boolean explain) {
Map<DiscoveryNode, MatchingNode> matchingNodes = new HashMap<>();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
if (ignorePreviousFailedNodes
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
&& shard.unassignedInfo() != null && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
// we don't have any files at all, it is an empty index
if (storeFilesMetaData.isEmpty()) {
Expand Down
Loading