Skip to content

Commit

Permalink
Move allocation awareness attributes to list setting (#30626)
Browse files Browse the repository at this point in the history
Allows the setting to be specified using proper array syntax, for example:
"cluster.routing.allocation.awareness.attributes": [ "foo", "bar", "baz" ]

Closes #30617
  • Loading branch information
ywelsch committed May 16, 2018
1 parent 0b82d6d commit 4720778
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -625,20 +625,20 @@ public Set<String> getAllAllocationIds() {

static class AttributesKey {

final String[] attributes;
final List<String> attributes;

AttributesKey(String[] attributes) {
AttributesKey(List<String> attributes) {
this.attributes = attributes;
}

@Override
public int hashCode() {
return Arrays.hashCode(attributes);
return attributes.hashCode();
}

@Override
public boolean equals(Object obj) {
return obj instanceof AttributesKey && Arrays.equals(attributes, ((AttributesKey) obj).attributes);
return obj instanceof AttributesKey && attributes.equals(((AttributesKey) obj).attributes);
}
}

Expand Down Expand Up @@ -702,11 +702,11 @@ private static List<ShardRouting> collectAttributeShards(AttributesKey key, Disc
return Collections.unmodifiableList(to);
}

public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes) {
return preferAttributesActiveInitializingShardsIt(attributes, nodes, shuffler.nextSeed());
}

public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int seed) {
public ShardIterator preferAttributesActiveInitializingShardsIt(List<String> attributes, DiscoveryNodes nodes, int seed) {
AttributesKey key = new AttributesKey(attributes);
AttributesRoutings activeRoutings = getActiveAttribute(key, nodes);
AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -49,7 +50,7 @@ public class OperationRouting extends AbstractComponent {
Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", false,
Setting.Property.Dynamic, Setting.Property.NodeScope);

private String[] awarenessAttributes;
private List<String> awarenessAttributes;
private boolean useAdaptiveReplicaSelection;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
Expand All @@ -65,7 +66,7 @@ void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
this.useAdaptiveReplicaSelection = useAdaptiveReplicaSelection;
}

private void setAwarenessAttributes(String[] awarenessAttributes) {
private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

Expand Down Expand Up @@ -139,7 +140,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts) {
if (preference == null || preference.isEmpty()) {
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
Expand Down Expand Up @@ -174,7 +175,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
Expand Down Expand Up @@ -234,7 +235,7 @@ private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable index
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
}
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -34,6 +35,8 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import static java.util.Collections.emptyList;

/**
* This {@link AllocationDecider} controls shard allocation based on
* {@code awareness} key-value pairs defined in the node configuration.
Expand Down Expand Up @@ -78,13 +81,13 @@ public class AwarenessAllocationDecider extends AllocationDecider {

public static final String NAME = "awareness";

public static final Setting<String[]> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
new Setting<>("cluster.routing.allocation.awareness.attributes", "", s -> Strings.tokenizeToStringArray(s, ","), Property.Dynamic,
public static final Setting<List<String>> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING =
Setting.listSetting("cluster.routing.allocation.awareness.attributes", emptyList(), Function.identity(), Property.Dynamic,
Property.NodeScope);
public static final Setting<Settings> CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING =
Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope);

private volatile String[] awarenessAttributes;
private volatile List<String> awarenessAttributes;

private volatile Map<String, List<String>> forcedAwarenessAttributes;

Expand All @@ -109,7 +112,7 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
}

private void setAwarenessAttributes(String[] awarenessAttributes) {
private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

Expand All @@ -124,7 +127,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
}

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
if (awarenessAttributes.isEmpty()) {
return allocation.decision(Decision.YES, NAME,
"allocation awareness is not enabled, set cluster setting [%s] to enable it",
CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey());
Expand All @@ -138,7 +141,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
return allocation.decision(Decision.NO, NAME,
"node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]",
awarenessAttribute, CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(),
allocation.debugDecision() ? Strings.arrayToCommaDelimitedString(awarenessAttributes) : null);
allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null);
}

// build attr_value -> nodes map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class IndexShardRoutingTableTests extends ESTestCase {
public void testEqualsAttributesKey() {
String[] attr1 = {"a"};
String[] attr2 = {"b"};
List<String> attr1 = Arrays.asList("a");
List<String> attr2 = Arrays.asList("b");
IndexShardRoutingTable.AttributesKey attributesKey1 = new IndexShardRoutingTable.AttributesKey(attr1);
IndexShardRoutingTable.AttributesKey attributesKey2 = new IndexShardRoutingTable.AttributesKey(attr1);
IndexShardRoutingTable.AttributesKey attributesKey3 = new IndexShardRoutingTable.AttributesKey(attr2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -223,11 +224,16 @@ public void testRandomRouting() {
}

public void testAttributePreferenceRouting() {
AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id,zone")
.build());
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always");
if (randomBoolean()) {
settings.put("cluster.routing.allocation.awareness.attributes", " rack_id, zone ");
} else {
settings.putList("cluster.routing.allocation.awareness.attributes", "rack_id", "zone");
}

AllocationService strategy = createAllocationService(settings.build());

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
Expand Down Expand Up @@ -257,15 +263,15 @@ public void testAttributePreferenceRouting() {
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

// after all are started, check routing iteration
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
ShardRouting shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node2"));

shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(Arrays.asList("rack_id"), clusterState.nodes());
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
Expand Down

0 comments on commit 4720778

Please sign in to comment.