Skip to content

Commit

Permalink
Limit retries of failed allocations per index
Browse files Browse the repository at this point in the history
Today if a shard fails during initialization phase due to misconfiguration, broken disks,
missing analyzers, not installed plugins etc. elasticsaerch keeps on trying to initialize
or rather allocate that shard. Yet, in the worst case scenario this ends in an endless
allocation loop. To prevent this loop and all it's sideeffects like spamming log files over
and over again this commit adds an allocation decider that stops allocating a shard that
failed more than N times in a row to allocate. The number or retries can be configured via
`index.allocation.max_retry` and it's default is set to `5`. Once the setting is updated
shards with less failures than the number set per index will be allowed to allocate again.

Internally we maintain a counter on the UnassignedInfo that is reset to `0` once the shards
has been started.

Relates to elastic#18417
  • Loading branch information
s1monw committed May 19, 2016
1 parent d77c299 commit 394b280
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class ClusterModule extends AbstractModule {
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);
public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
Collections.unmodifiableList(Arrays.asList(
MaxRetryAllocationDecider.class,
SameShardAllocationDecider.class,
FilterAllocationDecider.class,
ReplicaAfterPrimaryActiveAllocationDecider.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public final class UnassignedInfo implements ToXContent, Writeable {
public static final Setting<TimeValue> INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING =
Setting.timeSetting("index.unassigned.node_left.delayed_timeout", DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, Property.Dynamic,
Property.IndexScope);

/**
* Reason why the shard is in unassigned state.
* <p>
Expand Down Expand Up @@ -103,7 +102,11 @@ public enum Reason {
/**
* A better replica location is identified and causes the existing replica allocation to be cancelled.
*/
REALLOCATED_REPLICA;
REALLOCATED_REPLICA,
/**
* Unassigned as a result of a failed primary while the replica was initializing.
*/
PRIMARY_FAILED;
}

private final Reason reason;
Expand All @@ -112,6 +115,7 @@ public enum Reason {
private final long lastComputedLeftDelayNanos; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
private final String message;
private final Throwable failure;
private final int failedAllocations;

/**
* creates an UnassingedInfo object based **current** time
Expand All @@ -120,7 +124,7 @@ public enum Reason {
* @param message more information about cause.
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, System.nanoTime(), System.currentTimeMillis());
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis());
}

/**
Expand All @@ -130,13 +134,16 @@ public UnassignedInfo(Reason reason, String message) {
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) {
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis) {
this.reason = reason;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
this.lastComputedLeftDelayNanos = 0L;
this.message = message;
this.failure = failure;
this.failedAllocations = failedAllocations;
assert failedAllocations > 0 && reason == Reason.ALLOCATION_FAILED || failedAllocations == 0 && reason != Reason.ALLOCATION_FAILED:
"failedAllocations: " + 0 + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}

Expand All @@ -147,17 +154,19 @@ public UnassignedInfo(UnassignedInfo unassignedInfo, long newComputedLeftDelayNa
this.lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
this.message = unassignedInfo.message;
this.failure = unassignedInfo.failure;
this.failedAllocations = unassignedInfo.failedAllocations;
}

public UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
// This means that in master failover situations, elapsed delay time is forgotten.
// This means that in master fail-over situations, elapsed delay time is forgotten.
this.unassignedTimeNanos = System.nanoTime();
this.lastComputedLeftDelayNanos = 0L;
this.message = in.readOptionalString();
this.failure = in.readThrowable();
this.failedAllocations = in.readVInt();
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -166,12 +175,18 @@ public void writeTo(StreamOutput out) throws IOException {
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
out.writeOptionalString(message);
out.writeThrowable(failure);
out.writeVInt(failedAllocations);
}

public UnassignedInfo readFrom(StreamInput in) throws IOException {
return new UnassignedInfo(in);
}

/**
* Retruns the number of previously failed allocations of this shard.
*/
public int getNumFailedAllocations() {return failedAllocations;}

/**
* The reason why the shard is unassigned.
*/
Expand Down Expand Up @@ -325,7 +340,11 @@ public String shortSummary() {
StringBuilder sb = new StringBuilder();
sb.append("[reason=").append(reason).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)).append("]");
if (failedAllocations > 0) {
sb.append(", failed_attemps[").append(failedAllocations).append("]");
}
String details = getDetails();

if (details != null) {
sb.append(", details[").append(details).append("]");
}
Expand All @@ -342,6 +361,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject("unassigned_info");
builder.field("reason", reason);
builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis));
if (failedAllocations > 0) {
builder.field("failed_attemps", failedAllocations);
}
String details = getDetails();
if (details != null) {
builder.field("details", details);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis
List<FailedRerouteAllocation.FailedShard> orderedFailedShards = new ArrayList<>(failedShards);
orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.shard.primary()));
for (FailedRerouteAllocation.FailedShard failedShard : orderedFailedShards) {
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
System.nanoTime(), System.currentTimeMillis()));
failedAllocations + 1, System.nanoTime(), System.currentTimeMillis()));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
Expand Down Expand Up @@ -437,7 +439,7 @@ private boolean deassociateDeadNodes(RoutingAllocation allocation) {
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
allocation.getCurrentNanoTime(), System.currentTimeMillis());
0, allocation.getCurrentNanoTime(), System.currentTimeMillis());
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
Expand All @@ -457,8 +459,8 @@ private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, S
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
// we need to move the unassigned info back to treat it as if it was index creation
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
shardRouting.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis());
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis());
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

/**
* An allocation decider that prevents shards from being allocated on any node if the shards allocation has been retried N times without
* success. This means if a shard has been INITIALIZING N times in a row without being moved to STARTED the shard will be ignored until
* the setting for <tt>index.allocation.max_retry</tt> is raised. The default value is <tt>5</tt>.
*/
public class MaxRetryAllocationDecider extends AllocationDecider {

public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retry", 5, 0,
Setting.Property.Dynamic, Setting.Property.IndexScope);

public static final String NAME = "max_retry";

/**
* Initializes a new {@link MaxRetryAllocationDecider}
*
* @param settings {@link Settings} used by this {@link AllocationDecider}
*/
@Inject
public MaxRetryAllocationDecider(Settings settings) {
super(settings);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
if (unassignedInfo != null && unassignedInfo.getNumFailedAllocations() > 0) {
IndexMetaData indexSafe = allocation.metaData().getIndexSafe(shardRouting.index());
int maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(indexSafe.getSettings());
if (unassignedInfo.getNumFailedAllocations() >= maxRetry) {
return allocation.decision(Decision.NO, NAME, "shard has already failed allocating ["
+ unassignedInfo.getNumFailedAllocations() + "] times");
}
}
return allocation.decision(Decision.YES, NAME, "shard has no previous failures");
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocate(shardRouting, allocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.gateway.PrimaryShardAllocator;
Expand All @@ -40,7 +41,6 @@
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.IndicesRequestCache;

import java.util.Arrays;
Expand All @@ -59,6 +59,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
public static final Predicate<String> INDEX_SETTINGS_KEY_PREDICATE = (s) -> s.startsWith(IndexMetaData.INDEX_SETTING_PREFIX);

public static final Set<Setting<?>> BUILT_IN_INDEX_SETTINGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY,
IndexSettings.INDEX_TTL_DISABLE_PURGE_SETTING,
IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING,
IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public boolean processExistingRecoveries(RoutingAllocation allocation) {
currentNode, nodeWithHighestMatch);
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
changed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,19 @@ public void testReasonOrdinalOrder() {
UnassignedInfo.Reason.NODE_LEFT,
UnassignedInfo.Reason.REROUTE_CANCELLED,
UnassignedInfo.Reason.REINITIALIZED,
UnassignedInfo.Reason.REALLOCATED_REPLICA};
UnassignedInfo.Reason.REALLOCATED_REPLICA,
UnassignedInfo.Reason.PRIMARY_FAILED};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}
assertThat(UnassignedInfo.Reason.values().length, equalTo(order.length));
}

public void testSerialization() throws Exception {
UnassignedInfo meta = new UnassignedInfo(RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()), randomBoolean() ? randomAsciiOfLength(4) : null);
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values());
UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis()):
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null);
BytesStreamOutput out = new BytesStreamOutput();
meta.writeTo(out);
out.close();
Expand All @@ -82,6 +86,7 @@ public void testSerialization() throws Exception {
assertThat(read.getUnassignedTimeInMillis(), equalTo(meta.getUnassignedTimeInMillis()));
assertThat(read.getMessage(), equalTo(meta.getMessage()));
assertThat(read.getDetails(), equalTo(meta.getDetails()));
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
}

public void testIndexCreated() {
Expand Down Expand Up @@ -273,7 +278,10 @@ public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception {
EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(random(), reasons), null);
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), reasons);
UnassignedInfo unassignedInfo = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, null, null, 1, System.nanoTime(), System.currentTimeMillis()):
new UnassignedInfo(reason, null);
unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
Expand All @@ -287,7 +295,7 @@ public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception
*/
public void testLeftDelayCalculation() throws Exception {
final long baseTime = System.nanoTime();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis());
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis());
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
unassignedInfo = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
Expand Down
Loading

0 comments on commit 394b280

Please sign in to comment.