Skip to content

Commit

Permalink
Optmising AwarenessAllocationDecider for hashmap.get call
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY committed Jul 16, 2024
1 parent 29a3e2c commit bc1dd06
Show file tree
Hide file tree
Showing 5 changed files with 1,200 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,74 @@ public void testSimpleAwareness() throws Exception {
}, 10, TimeUnit.SECONDS);
}

public void testSimpleAwarenessWithZoneOptimised() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.zone_optimised", true)
.build();

logger.info("--> starting 2 nodes on the same rack");
internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());

Settings settings = Settings.builder()
.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), false)
.build();
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(settings);

createIndex("test1");
createIndex("test2");

NumShards test1 = getNumShards("test1");
NumShards test2 = getNumShards("test2");
// no replicas will be allocated as both indices end up on a single node
final int totalPrimaries = test1.numPrimaries + test2.numPrimaries;

ensureGreen();

final List<String> indicesToClose = randomSubsetOf(Arrays.asList("test1", "test2"));
indicesToClose.forEach(indexToClose -> assertAcked(client().admin().indices().prepareClose(indexToClose).get()));

logger.info("--> starting 1 node on a different rack");
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());

// On slow machines the initial relocation might be delayed
assertBusy(() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin()
.cluster()
.prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();

assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false));

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

// check that closed indices are effectively closed
final List<String> notClosedIndices = indicesToClose.stream()
.filter(index -> clusterState.metadata().index(index).getState() != State.CLOSE)
.collect(Collectors.toList());
assertThat("Some indices not closed", notClosedIndices, empty());

// verify that we have all the primaries on node3
final Map<String, Integer> counts = new HashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
}
}
}
assertThat(counts.get(node3), equalTo(totalPrimaries));
}, 10, TimeUnit.SECONDS);
}

public void testAwarenessZones() {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public static boolean isDedicatedSearchNode(Settings settings) {
private final Map<String, String> attributes;
private final Version version;
private final SortedSet<DiscoveryNodeRole> roles;
private final String zoneValue;
private final boolean hasZoneAttribute;

/**
* Creates a new {@link DiscoveryNode}
Expand Down Expand Up @@ -268,6 +270,8 @@ public DiscoveryNode(
this.version = version;
}
this.attributes = Collections.unmodifiableMap(attributes);
this.zoneValue = this.attributes.get("zone");
this.hasZoneAttribute = attributes.containsKey("zone");
// verify that no node roles are being provided as attributes
Predicate<Map<String, String>> predicate = (attrs) -> {
boolean success = true;
Expand Down Expand Up @@ -329,6 +333,9 @@ public DiscoveryNode(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
this.attributes.put(in.readString(), in.readString());
}

this.zoneValue = this.attributes.get("zone");
this.hasZoneAttribute = attributes.containsKey("zone");
int rolesSize = in.readVInt();
final Set<DiscoveryNodeRole> roles = new HashSet<>(rolesSize);
for (int i = 0; i < rolesSize; i++) {
Expand Down Expand Up @@ -458,6 +465,14 @@ public boolean isRemoteClusterClient() {
return roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}

public String getZoneValue() {
return zoneValue;
}

public boolean hasZoneAttribute() {
return hasZoneAttribute;
}

/**
* Returns whether the node is dedicated to provide search capability.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,23 @@ public class AwarenessAllocationDecider extends AllocationDecider {
Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED = Setting.boolSetting(
"cluster.routing.allocation.awareness.zone_optimised",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile List<String> awarenessAttributes;

private volatile boolean isAllocationZoneOptimised;
private volatile Map<String, List<String>> forcedAwarenessAttributes;

public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);
this.isAllocationZoneOptimised = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED, this::setAllocationZoneOptimised);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
Expand All @@ -140,6 +150,10 @@ private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

private void setAllocationZoneOptimised(boolean isAllocationZoneOptimised) {
this.isAllocationZoneOptimised = isAllocationZoneOptimised;
}

Check warning on line 155 in server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java#L154-L155

Added lines #L154 - L155 were not covered by tests

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true);
Expand All @@ -164,7 +178,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (node.node().getAttributes().containsKey(awarenessAttribute) == false) {
if (isAwarenessAttributeAssociatedWithNode(node, awarenessAttribute) == false) {
return allocation.decision(
Decision.NO,
NAME,
Expand All @@ -175,36 +189,10 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
);
}

int currentNodeCount = getCurrentNodeCountForAttribute(shardRouting, node, allocation, moveToNode, awarenessAttribute);

// build attr_value -> nodes map
Set<String> nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute);

// build the count of shards per attribute value
Map<String, Integer> shardPerAttribute = new HashMap<>();
for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) {
if (assignedShard.started() || assignedShard.initializing()) {
// Note: this also counts relocation targets as that will be the new location of the shard.
// Relocation sources should not be counted as the shard is moving away
RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId());
shardPerAttribute.merge(routingNode.node().getAttributes().get(awarenessAttribute), 1, Integer::sum);
}
}

if (moveToNode) {
if (shardRouting.assignedToNode()) {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (node.nodeId().equals(nodeId) == false) {
// we work on different nodes, move counts around
shardPerAttribute.compute(
allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute),
(k, v) -> (v == null) ? 0 : v - 1
);
shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum);
}
} else {
shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum);
}
}

int numberOfAttributes = nodesPerAttribute.size();
List<String> fullValues = forcedAwarenessAttributes.get(awarenessAttribute);

Expand All @@ -216,9 +204,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
}
numberOfAttributes = attributesSet.size();
}
// TODO should we remove ones that are not part of full list?

final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute));
// TODO should we remove ones that are not part of full list?
final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
if (currentNodeCount > maximumNodeCount) {
return allocation.decision(
Expand All @@ -238,4 +225,64 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout

return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
}

private int getCurrentNodeCountForAttribute(
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation,
boolean moveToNode,
String awarenessAttribute
) {
// build the count of shards per attribute value
final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute);
int currentNodeCount = 0;
final List<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
for (ShardRouting assignedShard : assignedShards) {
if (assignedShard.started() || assignedShard.initializing()) {
// Note: this also counts relocation targets as that will be the new location of the shard.
// Relocation sources should not be counted as the shard is moving away
RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId());
// Increase node count when
if (getAttributeValueForNode(routingNode, awarenessAttribute).equals(shardAttributeForNode)) {
++currentNodeCount;
}
}
}

if (moveToNode) {
if (shardRouting.assignedToNode()) {
String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId();
if (node.nodeId().equals(nodeId) == false) {
// we work on different nodes, move counts around
if (getAttributeValueForNode(allocation.routingNodes().node(nodeId), awarenessAttribute).equals(shardAttributeForNode)
&& currentNodeCount > 0) {
--currentNodeCount;
}

++currentNodeCount;
}
} else {
++currentNodeCount;
}
}

return currentNodeCount;
}

private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String awarenessAttribute) {
if (isAllocationZoneOptimised) {
return node.node().hasZoneAttribute();
} else {
return node.node().getAttributes().containsKey(awarenessAttribute);
}
}

private String getAttributeValueForNode(final RoutingNode node, final String awarenessAttribute) {
if (isAllocationZoneOptimised) {
return node.node().getZoneValue();
} else {
return node.node().getAttributes().get(awarenessAttribute);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public void apply(Settings value, Settings current, Settings previous) {
new HashSet<>(
Arrays.asList(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING,
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
Expand Down
Loading

0 comments on commit bc1dd06

Please sign in to comment.