Skip to content

Commit

Permalink
Allow for setting the total shards per node in the Allocate ILM action (
Browse files Browse the repository at this point in the history
elastic#76134)

This adds a new optional field to the allocate ILM action called "total_shards_per_node". If present, the
value of this field is set as the value of "index.routing.allocation.total_shards_per_node" before the allocation
takes place.
Relates to elastic#44070
  • Loading branch information
masseyke committed Aug 20, 2021
1 parent bd038d7 commit f6b4df8
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 37 deletions.
11 changes: 9 additions & 2 deletions docs/reference/ilm/actions/ilm-allocate.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ see <<shard-allocation-filtering>>.
(Optional, integer)
Number of replicas to assign to the index.

`total_shards_per_node`::
(Optional, integer)
The maximum number of shards for the index on a single {es} node. A value of `-1` is
interpreted as unlimited. See <<allocation-total-shards, total shards>>.

`include`::
(Optional, object)
Assigns an index to nodes that have at least _one_ of the specified custom attributes.
Expand All @@ -48,7 +53,8 @@ Assigns an index to nodes that have _all_ of the specified custom attributes.
==== Example

The allocate action in the following policy changes the index's number of replicas to `2`.
The index allocation rules are not changed.
No more than 200 shards for the index will be placed on any single node. Otherwise the index
allocation rules are not changed.

[source,console]
--------------------------------------------------
Expand All @@ -59,7 +65,8 @@ PUT _ilm/policy/my_policy
"warm": {
"actions": {
"allocate" : {
"number_of_replicas" : 2
"number_of_replicas" : 2,
"total_shards_per_node" : 200
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -29,22 +31,26 @@ public class AllocateAction implements LifecycleAction {

public static final String NAME = "allocate";
public static final ParseField NUMBER_OF_REPLICAS_FIELD = new ParseField("number_of_replicas");
public static final ParseField TOTAL_SHARDS_PER_NODE_FIELD = new ParseField("total_shards_per_node");
public static final ParseField INCLUDE_FIELD = new ParseField("include");
public static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
public static final ParseField REQUIRE_FIELD = new ParseField("require");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<AllocateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new AllocateAction((Integer) a[0], (Map<String, String>) a[1], (Map<String, String>) a[2], (Map<String, String>) a[3]));
a -> new AllocateAction((Integer) a[0], (Integer) a[1], (Map<String, String>) a[2], (Map<String, String>) a[3],
(Map<String, String>) a[4]));

static {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), NUMBER_OF_REPLICAS_FIELD);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), TOTAL_SHARDS_PER_NODE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), INCLUDE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), EXCLUDE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.mapStrings(), REQUIRE_FIELD);
}

private final Integer numberOfReplicas;
private final Integer totalShardsPerNode;
private final Map<String, String> include;
private final Map<String, String> exclude;
private final Map<String, String> require;
Expand All @@ -53,7 +59,8 @@ public static AllocateAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public AllocateAction(Integer numberOfReplicas, Map<String, String> include, Map<String, String> exclude, Map<String, String> require) {
public AllocateAction(Integer numberOfReplicas, Integer totalShardsPerNode, Map<String, String> include, Map<String, String> exclude,
Map<String, String> require) {
if (include == null) {
this.include = Collections.emptyMap();
} else {
Expand All @@ -78,18 +85,27 @@ public AllocateAction(Integer numberOfReplicas, Map<String, String> include, Map
throw new IllegalArgumentException("[" + NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0");
}
this.numberOfReplicas = numberOfReplicas;
if (totalShardsPerNode != null && totalShardsPerNode < -1) {
throw new IllegalArgumentException("[" + TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName() + "] must be >= -1");
}
this.totalShardsPerNode = totalShardsPerNode;
}

@SuppressWarnings("unchecked")
public AllocateAction(StreamInput in) throws IOException {
this(in.readOptionalVInt(), (Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
(Map<String, String>) in.readGenericValue());
this(in.readOptionalVInt(), in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readOptionalInt() : null,
(Map<String, String>) in.readGenericValue(), (Map<String, String>) in.readGenericValue(),
(Map<String, String>) in.readGenericValue());
}

public Integer getNumberOfReplicas() {
return numberOfReplicas;
}

public Integer getTotalShardsPerNode() {
return totalShardsPerNode;
}

public Map<String, String> getInclude() {
return include;
}
Expand All @@ -105,6 +121,9 @@ public Map<String, String> getRequire() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVInt(numberOfReplicas);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalInt(totalShardsPerNode);
}
out.writeGenericValue(include);
out.writeGenericValue(exclude);
out.writeGenericValue(require);
Expand All @@ -121,6 +140,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (numberOfReplicas != null) {
builder.field(NUMBER_OF_REPLICAS_FIELD.getPreferredName(), numberOfReplicas);
}
if (totalShardsPerNode != null) {
builder.field(TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName(), totalShardsPerNode);
}
builder.field(INCLUDE_FIELD.getPreferredName(), include);
builder.field(EXCLUDE_FIELD.getPreferredName(), exclude);
builder.field(REQUIRE_FIELD.getPreferredName(), require);
Expand All @@ -145,14 +167,17 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
include.forEach((key, value) -> newSettings.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + key, value));
exclude.forEach((key, value) -> newSettings.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
require.forEach((key, value) -> newSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
if (totalShardsPerNode != null) {
newSettings.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), totalShardsPerNode);
}
UpdateSettingsStep allocateStep = new UpdateSettingsStep(allocateKey, allocationRoutedKey, client, newSettings.build());
AllocationRoutedStep routedCheckStep = new AllocationRoutedStep(allocationRoutedKey, nextStepKey);
return Arrays.asList(allocateStep, routedCheckStep);
}

@Override
public int hashCode() {
return Objects.hash(numberOfReplicas, include, exclude, require);
return Objects.hash(numberOfReplicas, totalShardsPerNode, include, exclude, require);
}

@Override
Expand All @@ -165,6 +190,7 @@ public boolean equals(Object obj) {
}
AllocateAction other = (AllocateAction) obj;
return Objects.equals(numberOfReplicas, other.numberOfReplicas) &&
Objects.equals(totalShardsPerNode, other.totalShardsPerNode) &&
Objects.equals(include, other.include) &&
Objects.equals(exclude, other.exclude) &&
Objects.equals(require, other.require);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -19,6 +20,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.hamcrest.Matchers.equalTo;

public class AllocateActionTests extends AbstractActionTestCase<AllocateAction> {
Expand Down Expand Up @@ -56,7 +58,8 @@ static AllocateAction randomInstance() {
requires = randomBoolean() ? null : Collections.emptyMap();
}
Integer numberOfReplicas = randomBoolean() ? null : randomIntBetween(0, 10);
return new AllocateAction(numberOfReplicas, includes, excludes, requires);
Integer totalShardsPerNode = randomBoolean() ? null : randomIntBetween(-1, 300);
return new AllocateAction(numberOfReplicas, totalShardsPerNode, includes, excludes, requires);
}


Expand All @@ -71,6 +74,7 @@ protected AllocateAction mutateInstance(AllocateAction instance) {
Map<String, String> exclude = instance.getExclude();
Map<String, String> require = instance.getRequire();
Integer numberOfReplicas = instance.getNumberOfReplicas();
Integer totalShardsPerNode = instance.getTotalShardsPerNode();
switch (randomIntBetween(0, 3)) {
case 0:
include = new HashMap<>(include);
Expand All @@ -90,15 +94,15 @@ protected AllocateAction mutateInstance(AllocateAction instance) {
default:
throw new AssertionError("Illegal randomisation branch");
}
return new AllocateAction(numberOfReplicas, include, exclude, require);
return new AllocateAction(numberOfReplicas, totalShardsPerNode, include, exclude, require);
}

public void testAllMapsNullOrEmpty() {
Map<String, String> include = randomBoolean() ? null : Collections.emptyMap();
Map<String, String> exclude = randomBoolean() ? null : Collections.emptyMap();
Map<String, String> require = randomBoolean() ? null : Collections.emptyMap();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> new AllocateAction(null, include, exclude, require));
() -> new AllocateAction(null, null, include, exclude, require));
assertEquals("At least one of " + AllocateAction.INCLUDE_FIELD.getPreferredName() + ", "
+ AllocateAction.EXCLUDE_FIELD.getPreferredName() + " or " + AllocateAction.REQUIRE_FIELD.getPreferredName()
+ "must contain attributes for action " + AllocateAction.NAME, exception.getMessage());
Expand All @@ -109,10 +113,19 @@ public void testInvalidNumberOfReplicas() {
Map<String, String> exclude = randomBoolean() ? null : Collections.emptyMap();
Map<String, String> require = randomBoolean() ? null : Collections.emptyMap();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> new AllocateAction(randomIntBetween(-1000, -1), include, exclude, require));
() -> new AllocateAction(randomIntBetween(-1000, -1), randomIntBetween(0, 300), include, exclude, require));
assertEquals("[" + AllocateAction.NUMBER_OF_REPLICAS_FIELD.getPreferredName() + "] must be >= 0", exception.getMessage());
}

public void testInvalidTotalShardsPerNode() {
Map<String, String> include = randomAllocationRoutingMap(1, 5);
Map<String, String> exclude = randomBoolean() ? null : Collections.emptyMap();
Map<String, String> require = randomBoolean() ? null : Collections.emptyMap();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> new AllocateAction(randomIntBetween(0, 300), randomIntBetween(-1000, -2), include, exclude, require));
assertEquals("[" + AllocateAction.TOTAL_SHARDS_PER_NODE_FIELD.getPreferredName() + "] must be >= -1", exception.getMessage());
}

public static Map<String, String> randomAllocationRoutingMap(int minEntries, int maxEntries) {
Map<String, String> map = new HashMap<>();
int numIncludes = randomIntBetween(minEntries, maxEntries);
Expand Down Expand Up @@ -150,10 +163,32 @@ public void testToSteps() {
(key, value) -> expectedSettings.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey() + key, value));
action.getRequire().forEach(
(key, value) -> expectedSettings.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + key, value));
if (action.getTotalShardsPerNode() != null) {
expectedSettings.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), action.getTotalShardsPerNode());
}

assertThat(firstStep.getSettings(), equalTo(expectedSettings.build()));
AllocationRoutedStep secondStep = (AllocationRoutedStep) steps.get(1);
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(nextStepKey, secondStep.getNextStepKey());
}

public void testTotalNumberOfShards() throws Exception {
Integer totalShardsPerNode = randomIntBetween(-1, 1000);
Integer numberOfReplicas = randomIntBetween(0, 4);
AllocateAction action = new AllocateAction(numberOfReplicas, totalShardsPerNode, null, null, null);
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
List<Step> steps = action.toSteps(null, phase, nextStepKey);
UpdateSettingsStep firstStep = (UpdateSettingsStep) steps.get(0);
assertEquals(totalShardsPerNode, firstStep.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), null));

totalShardsPerNode = null;
action = new AllocateAction(numberOfReplicas, totalShardsPerNode, null, null, null);
steps = action.toSteps(null, phase, nextStepKey);
firstStep = (UpdateSettingsStep) steps.get(0);
assertEquals(null, firstStep.getSettings().get(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void testReadStepKeys() {

Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("forcemerge", new ForceMergeAction(5, null));
actions.put("allocate", new AllocateAction(1, null, null, null));
actions.put("allocate", new AllocateAction(1, 20, null, null, null));
PhaseExecutionInfo pei = new PhaseExecutionInfo("policy", new Phase("wonky", TimeValue.ZERO, actions), 1, 1);
String phaseDef = Strings.toString(pei);
logger.info("--> phaseDef: {}", phaseDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
public class TimeseriesLifecycleTypeTests extends ESTestCase {

private static final AllocateAction TEST_ALLOCATE_ACTION =
new AllocateAction(2, Collections.singletonMap("node", "node1"),null, null);
new AllocateAction(2, 20, Collections.singletonMap("node", "node1"),null, null);
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
private static final WaitForSnapshotAction TEST_WAIT_FOR_SNAPSHOT_ACTION = new WaitForSnapshotAction("policy");
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1, null);
Expand Down Expand Up @@ -640,7 +640,7 @@ public void testShouldMigrateDataToTiers() {
{
// the allocate action only specifies the number of replicas
Map<String, LifecycleAction> actions = new HashMap<>();
actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), new AllocateAction(2, null, null, null));
actions.put(TEST_ALLOCATE_ACTION.getWriteableName(), new AllocateAction(2, 20, null, null, null));
Phase phase = new Phase(WARM_PHASE, TimeValue.ZERO, actions);
assertThat(TimeseriesLifecycleType.shouldInjectMigrateStepForPhase(phase), is(true));
}
Expand Down Expand Up @@ -878,7 +878,8 @@ private ConcurrentMap<String, LifecycleAction> convertActionNamesToActions(Strin
return Arrays.asList(availableActionNames).stream().map(n -> {
switch (n) {
case AllocateAction.NAME:
return new AllocateAction(null, Collections.singletonMap("foo", "bar"), Collections.emptyMap(), Collections.emptyMap());
return new AllocateAction(null, null, Collections.singletonMap("foo", "bar"), Collections.emptyMap(),
Collections.emptyMap());
case DeleteAction.NAME:
return new DeleteAction();
case ForceMergeAction.NAME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ public void testMigrateToDataTiersAction() throws Exception {
Map<String, LifecycleAction> warmActions = new HashMap<>();
warmActions.put(SetPriorityAction.NAME, new SetPriorityAction(50));
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null));
warmActions.put(AllocateAction.NAME, new AllocateAction(null, singletonMap("data", "warm"), null, null));
warmActions.put(AllocateAction.NAME, new AllocateAction(null, null, singletonMap("data", "warm"), null, null));
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1, null));
Map<String, LifecycleAction> coldActions = new HashMap<>();
coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0));
coldActions.put(AllocateAction.NAME, new AllocateAction(0, null, null, singletonMap("data", "cold")));
coldActions.put(AllocateAction.NAME, new AllocateAction(0, null, null, null, singletonMap("data", "cold")));

createPolicy(client(), policy,
new Phase("hot", TimeValue.ZERO, hotActions),
Expand Down Expand Up @@ -221,11 +221,11 @@ public void testMigrationDryRun() throws Exception {
Map<String, LifecycleAction> warmActions = new HashMap<>();
warmActions.put(SetPriorityAction.NAME, new SetPriorityAction(50));
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null));
warmActions.put(AllocateAction.NAME, new AllocateAction(null, singletonMap("data", "warm"), null, null));
warmActions.put(AllocateAction.NAME, new AllocateAction(null, null, singletonMap("data", "warm"), null, null));
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1, null));
Map<String, LifecycleAction> coldActions = new HashMap<>();
coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0));
coldActions.put(AllocateAction.NAME, new AllocateAction(0, null, null, singletonMap("data", "cold")));
coldActions.put(AllocateAction.NAME, new AllocateAction(0, null, null, null, singletonMap("data", "cold")));

createPolicy(client(), policy,
new Phase("hot", TimeValue.ZERO, hotActions),
Expand Down
Loading

0 comments on commit f6b4df8

Please sign in to comment.