Skip to content

Commit

Permalink
Limit retries of failed allocations per index (#18467)
Browse files Browse the repository at this point in the history
Today if a shard fails during initialization phase due to misconfiguration, broken disks,
missing analyzers, not installed plugins etc. elasticsaerch keeps on trying to initialize
or rather allocate that shard. Yet, in the worst case scenario this ends in an endless
allocation loop. To prevent this loop and all it's sideeffects like spamming log files over
and over again this commit adds an allocation decider that stops allocating a shard that
failed more than N times in a row to allocate. The number or retries can be configured via
`index.allocation.max_retry` and it's default is set to `5`. Once the setting is updated
shards with less failures than the number set per index will be allowed to allocate again.

Internally we maintain a counter on the UnassignedInfo that is reset to `0` once the shards
has been started.

Relates to #18417
  • Loading branch information
s1monw committed May 20, 2016
1 parent a2ff002 commit 35e7058
Show file tree
Hide file tree
Showing 33 changed files with 705 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ protected void masterOperation(final ClusterAllocationExplainRequest request, fi
final ActionListener<ClusterAllocationExplainResponse> listener) {
final RoutingNodes routingNodes = state.getRoutingNodes();
final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state,
clusterInfoService.getClusterInfo(), System.nanoTime());
clusterInfoService.getClusterInfo(), System.nanoTime(), false);

ShardRouting foundShard = null;
if (request.useAnyUnassignedShard()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
* Request to submit cluster reroute allocation commands
*/
public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteRequest> {
AllocationCommands commands = new AllocationCommands();
boolean dryRun;
boolean explain;
private AllocationCommands commands = new AllocationCommands();
private boolean dryRun;
private boolean explain;
private boolean retryFailed;

public ClusterRerouteRequest() {
}
Expand Down Expand Up @@ -81,13 +82,30 @@ public ClusterRerouteRequest explain(boolean explain) {
return this;
}

/**
* Sets the retry failed flag (defaults to <tt>false</tt>). If true, the
* request will retry allocating shards that can't currently be allocated due to too many allocation failures.
*/
public ClusterRerouteRequest setRetryFailed(boolean retryFailed) {
this.retryFailed = retryFailed;
return this;
}

/**
* Returns the current explain flag
*/
public boolean explain() {
return this.explain;
}

/**
* Returns the current retry failed flag
*/
public boolean isRetryFailed() {
return this.retryFailed;
}


/**
* Set the allocation commands to execute.
*/
Expand All @@ -96,6 +114,13 @@ public ClusterRerouteRequest commands(AllocationCommand... commands) {
return this;
}

/**
* Returns the allocation commands to execute
*/
public AllocationCommands getCommands() {
return commands;
}

/**
* Sets the source for the request.
*/
Expand Down Expand Up @@ -136,6 +161,7 @@ public void readFrom(StreamInput in) throws IOException {
commands = AllocationCommands.readFrom(in);
dryRun = in.readBoolean();
explain = in.readBoolean();
retryFailed = in.readBoolean();
readTimeout(in);
}

Expand All @@ -145,6 +171,7 @@ public void writeTo(StreamOutput out) throws IOException {
AllocationCommands.writeTo(commands, out);
out.writeBoolean(dryRun);
out.writeBoolean(explain);
out.writeBoolean(retryFailed);
writeTimeout(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ public ClusterRerouteRequestBuilder setExplain(boolean explain) {
return this;
}

/**
* Sets the retry failed flag (defaults to <tt>false</tt>). If true, the
* request will retry allocating shards that can't currently be allocated due to too many allocation failures.
*/
public ClusterRerouteRequestBuilder setRetryFailed(boolean retryFailed) {
request.setRetryFailed(retryFailed);
return this;
}

/**
* Sets the commands for the request to execute.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -68,38 +69,55 @@ protected ClusterRerouteResponse newResponse() {

@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)", new AckedClusterStateUpdateTask<ClusterRerouteResponse>(Priority.IMMEDIATE, request, listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, listener));
}

private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {

@Override
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
}
private final ClusterRerouteRequest request;
private final ActionListener<ClusterRerouteResponse> listener;
private final ESLogger logger;
private final AllocationService allocationService;
private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;

@Override
public void onAckTimeout() {
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
}
ClusterRerouteResponseAckedClusterStateUpdateTask(ESLogger logger, AllocationService allocationService, ClusterRerouteRequest request,
ActionListener<ClusterRerouteResponse> listener) {
super(Priority.IMMEDIATE, request, listener);
this.request = request;
this.listener = listener;
this.logger = logger;
this.allocationService = allocationService;
}

@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
super.onFailure(source, t);
}
@Override
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
}

@Override
public void onAckTimeout() {
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
}

@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
super.onFailure(source, t);
}

@Override
public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, request.explain());
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
clusterStateToSend = newState;
explanations = routingResult.explanations();
if (request.dryRun) {
return currentState;
}
return newState;
@Override
public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.getCommands(), request.explain(),
request.isRetryFailed());
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
clusterStateToSend = newState;
explanations = routingResult.explanations();
if (request.dryRun()) {
return currentState;
}
});
return newState;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class ClusterModule extends AbstractModule {
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
Collections.unmodifiableList(Arrays.asList(
MaxRetryAllocationDecider.class,
SameShardAllocationDecider.class,
FilterAllocationDecider.class,
ReplicaAfterPrimaryActiveAllocationDecider.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public final class UnassignedInfo implements ToXContent, Writeable {
public static final Setting<TimeValue> INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING =
Setting.timeSetting("index.unassigned.node_left.delayed_timeout", DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, Property.Dynamic,
Property.IndexScope);

/**
* Reason why the shard is in unassigned state.
* <p>
Expand Down Expand Up @@ -103,7 +102,11 @@ public enum Reason {
/**
* A better replica location is identified and causes the existing replica allocation to be cancelled.
*/
REALLOCATED_REPLICA;
REALLOCATED_REPLICA,
/**
* Unassigned as a result of a failed primary while the replica was initializing.
*/
PRIMARY_FAILED;
}

private final Reason reason;
Expand All @@ -112,6 +115,7 @@ public enum Reason {
private final long lastComputedLeftDelayNanos; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
private final String message;
private final Throwable failure;
private final int failedAllocations;

/**
* creates an UnassingedInfo object based **current** time
Expand All @@ -120,7 +124,7 @@ public enum Reason {
* @param message more information about cause.
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, System.nanoTime(), System.currentTimeMillis());
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis());
}

/**
Expand All @@ -130,13 +134,16 @@ public UnassignedInfo(Reason reason, String message) {
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) {
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis) {
this.reason = reason;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
this.lastComputedLeftDelayNanos = 0L;
this.message = message;
this.failure = failure;
this.failedAllocations = failedAllocations;
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED):
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}

Expand All @@ -147,17 +154,19 @@ public UnassignedInfo(UnassignedInfo unassignedInfo, long newComputedLeftDelayNa
this.lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
this.message = unassignedInfo.message;
this.failure = unassignedInfo.failure;
this.failedAllocations = unassignedInfo.failedAllocations;
}

public UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
// This means that in master failover situations, elapsed delay time is forgotten.
// This means that in master fail-over situations, elapsed delay time is forgotten.
this.unassignedTimeNanos = System.nanoTime();
this.lastComputedLeftDelayNanos = 0L;
this.message = in.readOptionalString();
this.failure = in.readThrowable();
this.failedAllocations = in.readVInt();
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -166,12 +175,18 @@ public void writeTo(StreamOutput out) throws IOException {
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
out.writeOptionalString(message);
out.writeThrowable(failure);
out.writeVInt(failedAllocations);
}

public UnassignedInfo readFrom(StreamInput in) throws IOException {
return new UnassignedInfo(in);
}

/**
* Returns the number of previously failed allocations of this shard.
*/
public int getNumFailedAllocations() { return failedAllocations; }

/**
* The reason why the shard is unassigned.
*/
Expand Down Expand Up @@ -325,7 +340,11 @@ public String shortSummary() {
StringBuilder sb = new StringBuilder();
sb.append("[reason=").append(reason).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)).append("]");
if (failedAllocations > 0) {
sb.append(", failed_attempts[").append(failedAllocations).append("]");
}
String details = getDetails();

if (details != null) {
sb.append(", details[").append(details).append("]");
}
Expand All @@ -342,6 +361,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject("unassigned_info");
builder.field("reason", reason);
builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis));
if (failedAllocations > 0) {
builder.field("failed_attempts", failedAllocations);
}
String details = getDetails();
if (details != null) {
builder.field("details", details);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis
List<FailedRerouteAllocation.FailedShard> orderedFailedShards = new ArrayList<>(failedShards);
orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.shard.primary()));
for (FailedRerouteAllocation.FailedShard failedShard : orderedFailedShards) {
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
System.nanoTime(), System.currentTimeMillis()));
failedAllocations + 1, System.nanoTime(), System.currentTimeMillis()));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
Expand Down Expand Up @@ -257,16 +259,13 @@ private <T> String firstListElementsToCommaDelimitedString(List<T> elements, Fun
.collect(Collectors.joining(", "));
}

public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) {
return reroute(clusterState, commands, false);
}

public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain) {
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, clusterInfoService.getClusterInfo(), currentNanoTime());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime(), retryFailed);
// don't short circuit deciders, we want a full explanation
allocation.debugDecision(true);
// we ignore disable allocation, because commands are explicit
Expand Down Expand Up @@ -305,7 +304,8 @@ protected RoutingAllocation.Result reroute(ClusterState clusterState, String rea
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, clusterInfoService.getClusterInfo(), currentNanoTime());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime(), false);
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
Expand Down Expand Up @@ -437,7 +437,7 @@ private boolean deassociateDeadNodes(RoutingAllocation allocation) {
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
allocation.getCurrentNanoTime(), System.currentTimeMillis());
0, allocation.getCurrentNanoTime(), System.currentTimeMillis());
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
Expand All @@ -457,8 +457,8 @@ private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, S
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public String toString() {
private final List<FailedShard> failedShards;

public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<FailedShard> failedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime());
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
this.failedShards = failedShards;
}

Expand Down
Loading

0 comments on commit 35e7058

Please sign in to comment.