Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffer for allocation constraints and a switch to disable index b… #14515

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Add allocation constraints buffer and setting to enable index balance constraint ([#14515](https://github.com/opensearch-project/OpenSearch/pull/14515))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
public class AllocationConstraints {
private Map<String, Constraint> constraints;

public AllocationConstraints() {
public AllocationConstraints(AllocationParameter allocationParameter) {
this.constraints = new HashMap<>();
this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
this.constraints.put(
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
new Constraint(isIndexShardsPerNodeBreached(allocationParameter.getShardBalanceBuffer()))
);
this.constraints.put(
INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceIndexBuffer()))
);
this.constraints.put(
CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceShardBuffer()))
);
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand All @@ -43,4 +52,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

public Constraint getAllocationConstraint(String name) {
return constraints.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

/**
* Allocation Constraint Parameters
*/
public class AllocationParameter {
private final float primaryBalanceShardBuffer;

private final float primaryBalanceIndexBuffer;

private final float shardBalanceBuffer;

public AllocationParameter(float shardBalanceBuffer, float primaryBalanceIndexBuffer, float primaryBalanceShardBuffer) {
this.shardBalanceBuffer = shardBalanceBuffer;
this.primaryBalanceShardBuffer = primaryBalanceShardBuffer;
this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer;
}

public float getPrimaryBalanceShardBuffer() {
return primaryBalanceShardBuffer;
}

public float getPrimaryBalanceIndexBuffer() {
return primaryBalanceIndexBuffer;
}

public float getShardBalanceBuffer() {
return shardBalanceBuffer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public void setEnable(boolean enable) {
this.enable = enable;
}

public boolean isEnable() {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
return enable;
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class ConstraintTypes {
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached(float buffer) {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()) * (1 + buffer));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}
Expand All @@ -66,11 +66,14 @@ public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreache
* {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected
* as allocation or rebalancing target
*/
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached(float buffer) {
return (params) -> {
int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex());
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount;
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(
params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()) * (1 + buffer)
);

return perIndexPrimaryShardCount >= perIndexAllowedPrimaryShardCount;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class RebalanceConstraints {

public RebalanceConstraints(RebalanceParameter rebalanceParameter) {
this.constraints = new HashMap<>();
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached(0.0f)));
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
this.constraints.put(
CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
import org.opensearch.cluster.routing.allocation.AllocationParameter;
import org.opensearch.cluster.routing.allocation.ConstraintTypes;
import org.opensearch.cluster.routing.allocation.MoveDecision;
import org.opensearch.cluster.routing.allocation.RebalanceConstraints;
Expand All @@ -62,7 +63,6 @@
import java.util.Set;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID;

Expand Down Expand Up @@ -135,7 +135,7 @@
);

/**
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()}
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached(float)}
* and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation
* {@link LocalShardsBalancer#allocateUnassigned()} and shard re-balance/relocation to a different node via {@link LocalShardsBalancer#balance()} .
*/
Expand All @@ -147,13 +147,44 @@
Property.NodeScope
);

public static final Setting<Boolean> SHARDS_PER_INDEX_BALANCE = Setting.boolSetting(
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
"cluster.routing.allocation.balance.constraint.index.enable",
true,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Boolean> PREFER_PRIMARY_SHARD_REBALANCE = Setting.boolSetting(
"cluster.routing.allocation.rebalance.primary.enable",
false,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.primary.shard.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.primary.index.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> SHARDS_PER_INDEX_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.constraint.index.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.rebalance.primary.buffer",
0.10f,
Expand All @@ -166,7 +197,11 @@
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile boolean shardsPerIndexBalance;
private volatile boolean preferPrimaryShardRebalance;
private volatile float primaryBalanceShardBuffer;
private volatile float primaryBalanceIndexBuffer;
private volatile float shardsPerIndexBalanceBuffer;
private volatile float preferPrimaryShardRebalanceBuffer;
private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
Expand All @@ -179,19 +214,28 @@

@Inject
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
setThreshold(THRESHOLD_SETTING.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings));
setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings));
setPrimaryBalanceShardBuffer(PRIMARY_SHARD_BALANCE_BUFFER.get(settings));
setPrimaryBalanceIndexBuffer(PRIMARY_INDEX_BALANCE_BUFFER.get(settings));
setShardsPerIndexBalanceBuffer(SHARDS_PER_INDEX_BALANCE_BUFFER.get(settings));
setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setShardsPerIndexBalance(SHARDS_PER_INDEX_BALANCE.get(settings));
updateWeightFunction();

clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE, this::setShardsPerIndexBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_BALANCE_BUFFER, this::setPrimaryBalanceShardBuffer);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_INDEX_BALANCE_BUFFER, this::setPrimaryBalanceIndexBuffer);
clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE_BUFFER, this::setShardsPerIndexBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
Expand Down Expand Up @@ -228,6 +272,7 @@

private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) {
this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer;
updateWeightFunction();
}

private void updateIndexBalanceFactor(float indexBalanceFactor) {
Expand All @@ -246,7 +291,18 @@
}

private void updateWeightFunction() {
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer);
weightFunction = new WeightFunction(
this.indexBalanceFactor,
this.shardBalanceFactor,
this.shardsPerIndexBalanceBuffer,
this.primaryBalanceIndexBuffer,
this.primaryBalanceShardBuffer,
this.preferPrimaryShardRebalanceBuffer
);
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance);
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
}

/**
Expand All @@ -255,14 +311,32 @@
*/
private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
this.preferPrimaryShardBalance = preferPrimaryShardBalance;
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
updateWeightFunction();
}

private void setShardsPerIndexBalance(boolean shardsPerIndexBalance) {
this.shardsPerIndexBalance = shardsPerIndexBalance;
updateWeightFunction();
}

private void setPrimaryBalanceShardBuffer(float primaryBalanceShardBuffer) {
this.primaryBalanceShardBuffer = primaryBalanceShardBuffer;
updateWeightFunction();
}

private void setPrimaryBalanceIndexBuffer(float primaryBalanceIndexBuffer) {
this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer;
updateWeightFunction();
}

private void setShardsPerIndexBalanceBuffer(float shardsPerIndexBalanceBuffer) {
this.shardsPerIndexBalanceBuffer = shardsPerIndexBalanceBuffer;
updateWeightFunction();
}

private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) {
this.preferPrimaryShardRebalance = preferPrimaryShardRebalance;
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance);
updateWeightFunction();
}

private void setThreshold(float threshold) {
Expand Down Expand Up @@ -375,6 +449,10 @@
return preferPrimaryShardBalance;
}

public AllocationConstraints getAllocationParam() {
return weightFunction.constraints;
}

/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
Expand Down Expand Up @@ -410,20 +488,34 @@
private AllocationConstraints constraints;
private RebalanceConstraints rebalanceConstraints;

WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
WeightFunction(
float indexBalance,
float shardBalance,
float shardsPerIndexBalanceBuffer,
float primaryBalanceIndexBuffer,
float primaryBalanceShardBuffer,
float preferPrimaryBalanceBuffer
) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
if (sum > 0.0f) {
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
} else if (sum == 0.0f) {
theta0 = 0;
theta1 = 0;

Check warning on line 505 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java#L504-L505

Added lines #L504 - L505 were not covered by tests
} else {
throw new IllegalArgumentException("Balance factors must sum to a value >= 0 but was: " + sum);

Check warning on line 507 in server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java#L507

Added line #L507 was not covered by tests
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
AllocationParameter allocationParameter = new AllocationParameter(
shardsPerIndexBalanceBuffer,
primaryBalanceIndexBuffer,
primaryBalanceShardBuffer
);
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints();
this.constraints = new AllocationConstraints(allocationParameter);
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
// Enable index shard per node breach constraint
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true);
}

public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE,
BalancedShardsAllocator.PRIMARY_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_BUFFER,
BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE,
BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
Expand Down
Loading
Loading