Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster primary balance contraint for rebalancing with buffer #12656

Merged
merged 22 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8aed71b
Add cluster primary balance contraint for rebalancing with buffer
Arpit-Bandejiya Mar 14, 2024
a2eaddd
Fix spotless check
Arpit-Bandejiya Mar 14, 2024
761adc3
Tuning constraints
Arpit-Bandejiya Mar 14, 2024
f842b41
Fix the spotless checks
Arpit-Bandejiya Mar 14, 2024
55ed855
Add random allocation strategy
Arpit-Bandejiya Mar 11, 2024
32df4e6
Fix spotless errors
Arpit-Bandejiya Mar 14, 2024
938ac4d
Testing the allocation changes for all tests
Arpit-Bandejiya Mar 14, 2024
6cda5cd
Enable the prefer_primary to true
Arpit-Bandejiya Mar 18, 2024
ece3b26
Add changelog.md
Arpit-Bandejiya Mar 18, 2024
80dbcb9
Fix from main upsteam
Arpit-Bandejiya Mar 18, 2024
3d4d865
Add UTs
Arpit-Bandejiya Mar 19, 2024
d7657af
Fix spotless check
Arpit-Bandejiya Mar 19, 2024
566c7ef
Fix UTs
Arpit-Bandejiya Mar 21, 2024
0ca76bf
refactoring code
Arpit-Bandejiya Mar 22, 2024
b32f3bf
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
Arpit-Bandejiya Mar 22, 2024
462f286
address comments
Arpit-Bandejiya Mar 24, 2024
60ccd7b
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
Arpit-Bandejiya Mar 24, 2024
c2baa39
Address comments
Arpit-Bandejiya Mar 27, 2024
0f9edb0
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
Arpit-Bandejiya Mar 27, 2024
b28f5e8
Address the comments
Arpit-Bandejiya Mar 28, 2024
e823b85
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
Arpit-Bandejiya Apr 1, 2024
9f94ba5
Merge branch 'main' into primary-rebalacing
Arpit-Bandejiya Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
);
}

public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance)
.put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance)
.put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer)
)
);
}

/**
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
* balance per index and across all indices is maintained.
Expand Down Expand Up @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance();
verifyPrimaryBalance(0.0f);
}

/**
Expand Down Expand Up @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
verifyPerIndexPrimaryBalance();
}

/**
* Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
* removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
* when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
*/
public void testAllocationAndRebalanceWithDisruption() throws Exception {
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().startClusterManagerOnlyNode();
final int maxReplicaCount = 2;
final int maxShardCount = 2;
// Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
// and preventing primary relocations
final int nodeCount = randomIntBetween(5, 10);
final int numberOfIndices = randomIntBetween(1, 10);
final float buffer = randomIntBetween(1, 4) * 0.10f;

logger.info("--> Creating {} nodes", nodeCount);
final List<String> nodeNames = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodeNames.add(internalCluster().startNode());
}
setAllocationRelocationStrategy(true, true, buffer);

int shardCount, replicaCount;
ClusterState state;
for (int i = 0; i < numberOfIndices; i++) {
shardCount = randomIntBetween(1, maxShardCount);
replicaCount = randomIntBetween(1, maxReplicaCount);
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
ensureGreen(TimeValue.timeValueSeconds(60));
if (logger.isTraceEnabled()) {
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
}
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

final int additionalNodeCount = randomIntBetween(1, 5);
logger.info("--> Adding {} nodes", additionalNodeCount);

internalCluster().startNodes(additionalNodeCount);
ensureGreen(TimeValue.timeValueSeconds(60));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);

int nodeCountToStop = additionalNodeCount;
while (nodeCountToStop > 0) {
internalCluster().stopRandomDataNode();
// give replica a chance to promote as primary before terminating node containing the replica
ensureGreen(TimeValue.timeValueSeconds(60));
nodeCountToStop--;
}
state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info("--> Cluster state post nodes stop {}", state);
logger.info(ShardAllocations.printShardDistribution(state));
verifyPerIndexPrimaryBalance();
verifyPrimaryBalance(buffer);
}

/**
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
* @throws Exception exception
Expand Down Expand Up @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
}, 60, TimeUnit.SECONDS);
}

private void verifyPrimaryBalance() throws Exception {
private void verifyPrimaryBalance(float buffer) throws Exception {
assertBusy(() -> {
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
RoutingNodes nodes = currentState.getRoutingNodes();
Expand All @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.size();
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer)));
}
}, 60, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class AllocationConstraints {

public AllocationConstraints() {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class ConstraintTypes {
*/
public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint";

/**
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
*/
public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.rebalance.constraint";

/**
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index
*/
Expand Down Expand Up @@ -70,14 +75,14 @@ public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerN
}

/**
* Defines a predicate which returns true when a node contains more than average number of primary shards. This
* constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation target
* Defines a predicate which returns true when a node contains more than average number of primary shards with added buffer. This
* constraint is used in weight calculation during allocation/rebalance both. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation/rebalance target
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached(float buffer) {
return (params) -> {
int primaryShardCount = params.getNode().numPrimaryShards();
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode());
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer));
return primaryShardCount >= allowedPrimaryShardCount;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached;

/**
* Constraints applied during rebalancing round; specify conditions which, if breached, reduce the
Expand All @@ -27,9 +29,13 @@ public class RebalanceConstraints {

private Map<String, Constraint> constraints;

public RebalanceConstraints() {
public RebalanceConstraints(RebalanceParameter rebalanceParameter) {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(
CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer()))
);
}

public void updateRebalanceConstraint(String constraint, boolean enable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

/**
* RebalanceConstraint Params
*/
public class RebalanceParameter {
private float preferPrimaryBalanceBuffer;

public RebalanceParameter(float preferPrimaryBalanceBuffer) {
this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer;
}

public float getPreferPrimaryBalanceBuffer() {
return preferPrimaryBalanceBuffer;
}
}
Loading
Loading