Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add global primary shard balance constraint during allocation #6643

Merged
merged 3 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,50 @@ public void enablePreferPrimaryBalance() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), "true")
)
.setPersistentSettings(Settings.builder().put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), "true"))
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
* @throws Exception
*/
public void testGlobalPrimaryAllocation() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a test case here for allocation/rebalance post failover with these constraints? To simulate rebalance when a shard comes back full of replicas.

Copy link
Member Author

@dreamer-89 dreamer-89 Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-balancing is only performed at per index level today. So, allocation on a multiple index failover scenario wouldn't be predictible. The per index allocation/rebalancing scenario is covered in existing ITs. The global re-balancing is tracked in #6642

internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 1;
final int maxShardCount = 1;
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10);
final int numberOfIndices = randomIntBetween(5, 10);

final List<String> nodeNames = new ArrayList<>();
logger.info("--> Creating {} nodes", nodeCount);
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
enablePreferPrimaryBalance();
int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(0, maxReplicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount);
ensureGreen(TimeValue.timeValueSeconds(60));
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
}

/**
* This test verifies the happy path where primary shard allocation is balanced when multiple indices are created.
*
* This test in general passes without primary shard balance as well due to nature of allocation algorithm which
* assigns all primary shards first followed by replica copies.
*/
public void testBalancedPrimaryAllocation() throws Exception {
public void testPerIndexPrimaryAllocation() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 5;
Expand Down Expand Up @@ -213,4 +244,24 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
int totalPrimaryShards = 0;
for (ObjectObjectCursor<String, IndexRoutingTable> index : currentState.getRoutingTable().indicesRouting()) {
totalPrimaryShards += index.value.primaryShardsActive();
}
final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size());
for (RoutingNode node : nodes) {
final int primaryCount = node.shardsWithState(STARTED)
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
}
}, 60, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,29 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID;
import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.isPrimaryShardsPerIndexPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isIndexShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached;

/**
* Allocation constraints specify conditions which, if breached, reduce the
* priority of a node for receiving unassigned shard allocations.
* Allocation constraints specify conditions which, if breached, reduce the priority of a node for receiving unassigned
* shard allocations. Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by
* this constraint.
*
* @opensearch.internal
*/
public class AllocationConstraints {

/**
*
* This constraint is only applied for unassigned shards to avoid overloading a newly added node.
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this constraint.
*/
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.breach.constraint";
private Map<String, Constraint> constraints;

public AllocationConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
new Constraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, isIndexShardsPerNodeBreached())
);
this.constraints.putIfAbsent(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached())
);
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached()));
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand All @@ -51,26 +43,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

/**
* Constraint to control number of shards of an index allocated on a single
* node.
*
* In current weight function implementation, when a node has significantly
* fewer shards than other nodes (e.g. during single new node addition or node
* replacement), its weight is much less than other nodes. All shard allocations
* at this time tend to land on the new node with skewed weight. This breaks
* index level balance in the cluster, by creating all shards of the same index
* on one node, often resulting in a hotspot on that node.
*
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;

import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;

/**
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
* re-balancing target used in {@link RebalanceConstraints}
Expand All @@ -23,45 +24,22 @@
*/
public class Constraint implements Predicate<Constraint.ConstraintParams> {

public final static long CONSTRAINT_WEIGHT = 1000000L;

private String name;

private boolean enable;
private Predicate<ConstraintParams> predicate;

public Constraint(String name, Predicate<ConstraintParams> constraintPredicate) {
this.name = name;
public Constraint(Predicate<ConstraintParams> constraintPredicate) {
this.predicate = constraintPredicate;
this.enable = false;
}

@Override
public boolean test(ConstraintParams constraintParams) {
return this.enable && predicate.test(constraintParams);
}

public String getName() {
return name;
}

public void setEnable(boolean enable) {
this.enable = enable;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Constraint that = (Constraint) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import java.util.function.Predicate;

/**
* Defines different constraints definitions
*
* @opensearch.internal
*/
public class ConstraintTypes {
public final static long CONSTRAINT_WEIGHT = 1000000L;

/**
* Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index
*/
public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint";

/**
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
*/
public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint";

/**
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index
*/
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint";

/**
* Constraint to control number of shards of an index allocated on a single
* node.
*
* In current weight function implementation, when a node has significantly
* fewer shards than other nodes (e.g. during single new node addition or node
* replacement), its weight is much less than other nodes. All shard allocations
* at this time tend to land on the new node with skewed weight. This breaks
* index level balance in the cluster, by creating all shards of the same index
* on one node, often resulting in a hotspot on that node.
*
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}

/**
* Defines a predicate which returns true when specific to an index, a node contains more than average number of primary
* shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight
* {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected
* as allocation or rebalancing target
*/
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached() {
return (params) -> {
int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex());
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount;
};
}

/**
* Defines a predicate which returns true when a node contains more than average number of primary shards. This
* constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation target
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached() {
return (params) -> {
int primaryShardCount = params.getNode().numPrimaryShards();
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode());
return primaryShardCount >= allowedPrimaryShardCount;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;

/**
* Constraints applied during rebalancing round; specify conditions which, if breached, reduce the
Expand All @@ -24,15 +24,12 @@
* @opensearch.internal
*/
public class RebalanceConstraints {
public final static String PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID = PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey();

private Map<String, Constraint> constraints;

public RebalanceConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached())
);
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
}

public void updateRebalanceConstraint(String constraint, boolean enable) {
Expand All @@ -43,16 +40,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

/**
* When primary balance is preferred, add node constraint of average primary shards per node to give the node a
* higher weight resulting in lesser chances of being target of unassigned shard allocation or rebalancing target node
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerIndexPerNodeBreached() {
return (params) -> {
int currPrimaryShardsOnNode = params.getNode().numPrimaryShards(params.getIndex());
int allowedPrimaryShardsPerNode = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return currPrimaryShardsOnNode > allowedPrimaryShardsPerNode;
};
}
}
Loading