From 0e9151cd99c04ed50b41952865ff1fce6dea484b Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 14 Aug 2024 11:23:45 +0530 Subject: [PATCH 01/20] Make balanced shards allocator timebound Signed-off-by: Rishab Nahata --- .../allocator/BalancedShardsAllocator.java | 46 +++++++++++++++++-- .../allocator/LocalShardsBalancer.java | 29 +++++++++++- .../common/settings/ClusterSettings.java | 1 + 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 212583d1fb14f..d200c0506aabe 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import java.util.HashMap; import java.util.HashSet; @@ -87,6 +88,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class); + public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); public static final Setting INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.index", @@ -169,6 +171,23 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", + TimeValue.timeValueSeconds(20), + TimeValue.MINUS_ONE, + timeValue -> { + if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) { + throw new IllegalArgumentException( + "Setting [" + + "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout" + + "] should be more than 20s or -1ms to disable timeout" + ); + } + }, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -181,6 +200,8 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float threshold; private volatile boolean ignoreThrottleInRestore; + private volatile TimeValue allocatorTimeout; + private long startTime; public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -197,6 +218,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -206,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); + clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); } /** @@ -284,6 +307,20 @@ private void setThreshold(float threshold) { this.threshold = threshold; } + private void setAllocatorTimeout(TimeValue allocatorTimeout) { + this.allocatorTimeout = allocatorTimeout; + } + + private boolean allocatorTimedOut(long currentTime) { + if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { + if (logger.isTraceEnabled()) { + logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks"); + } + return false; + } + return currentTime - this.startTime > allocatorTimeout.nanos(); + } + @Override public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { @@ -298,8 +335,10 @@ public void allocate(RoutingAllocation allocation) { threshold, preferPrimaryShardBalance, preferPrimaryShardRebalance, - ignoreThrottleInRestore + ignoreThrottleInRestore, + this::allocatorTimedOut ); + this.startTime = System.nanoTime(); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); @@ -321,7 +360,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f threshold, preferPrimaryShardBalance, preferPrimaryShardRebalance, - ignoreThrottleInRestore + ignoreThrottleInRestore, + this::allocatorTimedOut ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -585,7 +625,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, null); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 7e4ae58548c55..6d945a31f19d2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -71,8 +72,10 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; + private final Function timedOutFunc; private int totalShardCount = 0; + public LocalShardsBalancer( Logger logger, RoutingAllocation allocation, @@ -81,7 +84,8 @@ public LocalShardsBalancer( float threshold, boolean preferPrimaryBalance, boolean preferPrimaryRebalance, - boolean ignoreThrottleInRestore + boolean ignoreThrottleInRestore, + Function timedOutFunc ) { this.logger = logger; this.allocation = allocation; @@ -99,6 +103,7 @@ public LocalShardsBalancer( this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; this.ignoreThrottleInRestore = ignoreThrottleInRestore; + this.timedOutFunc = timedOutFunc; } /** @@ -572,6 +577,15 @@ void moveShards() { return; } + // Terminate if the time allocated to the balanced shards allocator has elapsed + if (timedOutFunc.apply(System.nanoTime())) { + logger.info( + "Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed" + + ". Skipping shard iteration" + ); + return; + } + ShardRouting shardRouting = it.next(); if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { @@ -799,8 +813,21 @@ void allocateUnassigned() { int secondaryLength = 0; int primaryLength = primary.length; ArrayUtil.timSort(primary, comparator); + if (logger.isTraceEnabled()) { + logger.trace("Staring allocation of [{}] unassigned shards", primaryLength); + } do { for (int i = 0; i < primaryLength; i++) { + if (timedOutFunc.apply(System.nanoTime())) { + // TODO - maybe check if we can allow wait for active shards thingy bypass this condition + logger.info("Ignoring [{}] unassigned shards for allocation as time allocated to " + + "balanced shards allocator has elapsed", (primaryLength - i)); + while (i < primaryLength - 1) { + unassigned.ignoreShard(primary[i], UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes()); + i++; + } + return; + } ShardRouting shard = primary[i]; final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); final String assignedNodeId = allocationDecision.getTargetNode() != null diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a73e5d44b7e02..f2d51f87cfa2a 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -269,6 +269,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, + BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, From 7fe10d7b27c4579cb9de3ba1edd311511aaad122 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 15 Aug 2024 22:35:38 +0530 Subject: [PATCH 02/20] Randomise nodes shards iterator for move Signed-off-by: Rishab Nahata --- .../java/org/opensearch/cluster/routing/RoutingNodes.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index ab455f52c4195..b5e74821d41e7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -1439,7 +1439,9 @@ public void remove() { */ public Iterator nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) { final Queue> queue = new ArrayDeque<>(); - for (Map.Entry entry : nodesToShards.entrySet()) { + List> nodesToShardsEntrySet = new ArrayList<>(nodesToShards.entrySet()); + Randomness.shuffle(nodesToShardsEntrySet); + for (Map.Entry entry : nodesToShardsEntrySet) { queue.add(entry.getValue().copyShards().iterator()); } if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) { From 8f558d7a71386edf5fb2e1ae1adfe64093b4b30f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 16 Aug 2024 22:42:12 +0530 Subject: [PATCH 03/20] Terminate rebalancing if time elapsed Signed-off-by: Rishab Nahata --- .../allocator/LocalShardsBalancer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 6d945a31f19d2..e6c6a252575be 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -349,6 +349,14 @@ private void balanceByWeights() { final BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes; final float[] weights = sorter.weights; for (String index : buildWeightOrderedIndices()) { + // Terminate if the time allocated to the balanced shards allocator has elapsed + if (timedOutFunc.apply(System.nanoTime())) { + logger.info( + "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + + ". Skipping indices iteration" + ); + return; + } IndexMetadata indexMetadata = metadata.index(index); // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, @@ -373,6 +381,14 @@ private void balanceByWeights() { int lowIdx = 0; int highIdx = relevantNodes - 1; while (true) { + // break if the time allocated to the balanced shards allocator has elapsed + if (timedOutFunc.apply(System.nanoTime())) { + logger.info( + "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + + ". Skipping relevant nodes iteration" + ); + break; + } final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx]; final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx]; advance_range: if (maxNode.numShards(index) > 0) { From d591b26d90254bc6fce082e33f4e4eaece68c559 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 20 Aug 2024 13:50:46 +0530 Subject: [PATCH 04/20] Add test for allocate unassigned time out Signed-off-by: Rishab Nahata --- .../allocator/BalancedShardsAllocator.java | 2 +- .../allocator/LocalShardsBalancer.java | 9 +- ...TimeBoundBalancedShardsAllocatorTests.java | 124 ++++++++++++++++++ 3 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index d200c0506aabe..7c4b278c92de5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -311,7 +311,7 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } - private boolean allocatorTimedOut(long currentTime) { + protected boolean allocatorTimedOut(long currentTime) { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks"); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index e6c6a252575be..a7c695042c4f2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -75,7 +75,6 @@ public class LocalShardsBalancer extends ShardsBalancer { private final Function timedOutFunc; private int totalShardCount = 0; - public LocalShardsBalancer( Logger logger, RoutingAllocation allocation, @@ -836,9 +835,11 @@ void allocateUnassigned() { for (int i = 0; i < primaryLength; i++) { if (timedOutFunc.apply(System.nanoTime())) { // TODO - maybe check if we can allow wait for active shards thingy bypass this condition - logger.info("Ignoring [{}] unassigned shards for allocation as time allocated to " + - "balanced shards allocator has elapsed", (primaryLength - i)); - while (i < primaryLength - 1) { + logger.info( + "Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed", + (primaryLength - i) + ); + while (i < primaryLength) { unassigned.ignoreShard(primary[i], UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes()); i++; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java new file mode 100644 index 0000000000000..32632f66da7f0 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { + + private final DiscoveryNode node1 = newNode("node1"); + private final DiscoveryNode node2 = newNode("node2"); + private final DiscoveryNode node3 = newNode("node3"); + + public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { + Settings.Builder settings = Settings.builder(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, false); + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(numberOfIndices * (numberOfShards * (numberOfReplicas + 1)), initializingShards.size()); + assertEquals(0, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(numberOfIndices * numberOfShards, node1Recoveries + node2Recoveries + node3Recoveries); + } + + public void testAllUnassignedShardsIgnoredWhenTimedOut() { + Settings.Builder settings = Settings.builder(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, true); + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(0, initializingShards.size()); + assertEquals(numberOfIndices * (numberOfShards * (numberOfReplicas + 1)), allocation.routingNodes().unassigned().ignored().size()); + assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + } + + private RoutingAllocation buildRoutingAllocation( + AllocationDeciders deciders, + int numberOfIndices, + int numberOfShards, + int numberOfReplicas + ) { + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, null, System.nanoTime()); + } + + private RoutingTable buildRoutingTable(Metadata metadata) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for (Map.Entry entry : metadata.getIndices().entrySet()) { + routingTableBuilder.addAsNew(entry.getValue()); + } + return routingTableBuilder.build(); + } + + private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int numberOfShards, int numberOfReplicas) { + for (int i = 0; i < numberOfIndices; i++) { + mb.put( + IndexMetadata.builder("test_" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ); + } + return mb.build(); + } + + static class TestBalancedShardsAllocator extends BalancedShardsAllocator { + private final AtomicBoolean timedOut; + + public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, boolean timedOut) { + super(settings, clusterSettings); + this.timedOut = new AtomicBoolean(timedOut); + } + + @Override + protected boolean allocatorTimedOut(long currentTime) { + return timedOut.get(); + } + } +} From 9ab5a29001a3b0e9535945224abf3e14022cd8ee Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 20 Aug 2024 13:52:47 +0530 Subject: [PATCH 05/20] Add changelog Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34cd4c2097e48..8befd884c50f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) - Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) +- Make balanced shards allocator timebound ([#15239](https://github.com/opensearch-project/OpenSearch/pull/15239)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) From 31f02cf8304cd15f5ba171efd121155d3e69290f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 21 Aug 2024 11:29:06 +0530 Subject: [PATCH 06/20] Use count down latch to mark partial shards timed out Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index 32632f66da7f0..aaa8facd166e1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { @@ -37,30 +37,39 @@ public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationT private final DiscoveryNode node3 = newNode("node3"); public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { - Settings.Builder settings = Settings.builder(); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, false); int numberOfIndices = 2; int numberOfShards = 5; int numberOfReplicas = 1; + int totalPrimaryCount = numberOfIndices * numberOfShards; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // passing total shard count for timed out latch such that no shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator( + settings.build(), + clusterSettings, + new CountDownLatch(totalShardCount) + ); RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); - assertEquals(numberOfIndices * (numberOfShards * (numberOfReplicas + 1)), initializingShards.size()); + assertEquals(totalShardCount, initializingShards.size()); assertEquals(0, allocation.routingNodes().unassigned().ignored().size()); - assertEquals(numberOfIndices * numberOfShards, node1Recoveries + node2Recoveries + node3Recoveries); + assertEquals(totalPrimaryCount, node1Recoveries + node2Recoveries + node3Recoveries); } public void testAllUnassignedShardsIgnoredWhenTimedOut() { - Settings.Builder settings = Settings.builder(); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, true); int numberOfIndices = 2; int numberOfShards = 5; int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // passing 0 for timed out latch such that all shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, new CountDownLatch(0)); RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); @@ -68,7 +77,7 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() { int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); assertEquals(0, initializingShards.size()); - assertEquals(numberOfIndices * (numberOfShards * (numberOfReplicas + 1)), allocation.routingNodes().unassigned().ignored().size()); + assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); } @@ -109,16 +118,20 @@ private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int num } static class TestBalancedShardsAllocator extends BalancedShardsAllocator { - private final AtomicBoolean timedOut; + private final CountDownLatch timedOutLatch; - public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, boolean timedOut) { + public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, CountDownLatch timedOutLatch) { super(settings, clusterSettings); - this.timedOut = new AtomicBoolean(timedOut); + this.timedOutLatch = timedOutLatch; } @Override protected boolean allocatorTimedOut(long currentTime) { - return timedOut.get(); + if (timedOutLatch.getCount() == 0) { + return true; + } + timedOutLatch.countDown(); + return false; } } } From 5eaaa60773de2f75e246cfafaf7b2a78e80db18a Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 21 Aug 2024 12:21:16 +0530 Subject: [PATCH 07/20] Add test Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index aaa8facd166e1..1c0dc02ce46f3 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -81,6 +81,57 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() { assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); } + public void testAllocatePartialPrimaryShardsUntilTimedOut() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + int shardsToAllocate = randomIntBetween(1, numberOfShards * numberOfIndices); + // passing shards to allocate for timed out latch such that only few primary shards are allocated in this reroute round + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator( + settings.build(), + clusterSettings, + new CountDownLatch(shardsToAllocate) + ); + RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(shardsToAllocate, initializingShards.size()); + assertEquals(totalShardCount - shardsToAllocate, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(shardsToAllocate, node1Recoveries + node2Recoveries + node3Recoveries); + } + + public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder(); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + int shardsToAllocate = randomIntBetween(numberOfShards * numberOfIndices, totalShardCount); + // passing shards to allocate for timed out latch such that all primary shards and few replica shards are allocated in this reroute + // round + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator( + settings.build(), + clusterSettings, + new CountDownLatch(shardsToAllocate) + ); + RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(shardsToAllocate, initializingShards.size()); + assertEquals(totalShardCount - shardsToAllocate, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(numberOfShards * numberOfIndices, node1Recoveries + node2Recoveries + node3Recoveries); + } + private RoutingAllocation buildRoutingAllocation( AllocationDeciders deciders, int numberOfIndices, From c67486af8e1a94432dd531c1d466541d52373e81 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 21 Aug 2024 13:43:07 +0530 Subject: [PATCH 08/20] Add test for move Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 165 ++++++++++++++---- .../cluster/OpenSearchAllocationTestCase.java | 11 ++ 2 files changed, 141 insertions(+), 35 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index 1c0dc02ce46f3..1c32716d7f138 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -22,19 +22,21 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; + public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { - private final DiscoveryNode node1 = newNode("node1"); - private final DiscoveryNode node2 = newNode("node2"); - private final DiscoveryNode node3 = newNode("node3"); + private final DiscoveryNode node1 = newNode("node1", "node1", Collections.singletonMap("zone", "1a")); + private final DiscoveryNode node2 = newNode("node2", "node2", Collections.singletonMap("zone", "1b")); + private final DiscoveryNode node3 = newNode("node3", "node3", Collections.singletonMap("zone", "1c")); public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { int numberOfIndices = 2; @@ -43,14 +45,23 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOut() { int totalPrimaryCount = numberOfIndices * numberOfShards; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Settings.Builder settings = Settings.builder(); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); // passing total shard count for timed out latch such that no shard times out - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator( - settings.build(), - clusterSettings, - new CountDownLatch(totalShardCount) + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(totalShardCount)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() ); - RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -67,10 +78,23 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() { int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Settings.Builder settings = Settings.builder(); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); // passing 0 for timed out latch such that all shard times out - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, new CountDownLatch(0)); - RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -87,15 +111,24 @@ public void testAllocatePartialPrimaryShardsUntilTimedOut() { int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Settings.Builder settings = Settings.builder(); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); int shardsToAllocate = randomIntBetween(1, numberOfShards * numberOfIndices); // passing shards to allocate for timed out latch such that only few primary shards are allocated in this reroute round - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator( - settings.build(), - clusterSettings, - new CountDownLatch(shardsToAllocate) + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() ); - RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -112,16 +145,25 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { int numberOfReplicas = 1; int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Settings.Builder settings = Settings.builder(); - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); int shardsToAllocate = randomIntBetween(numberOfShards * numberOfIndices, totalShardCount); // passing shards to allocate for timed out latch such that all primary shards and few replica shards are allocated in this reroute // round - BalancedShardsAllocator allocator = new TestBalancedShardsAllocator( - settings.build(), - clusterSettings, - new CountDownLatch(shardsToAllocate) + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() ); - RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas); allocator.allocate(allocation); List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); @@ -132,12 +174,46 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() { assertEquals(numberOfShards * numberOfIndices, node1Recoveries + node2Recoveries + node3Recoveries); } - private RoutingAllocation buildRoutingAllocation( - AllocationDeciders deciders, - int numberOfIndices, - int numberOfShards, - int numberOfReplicas - ) { + public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() { + int numberOfIndices = 3; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService(); + state = applyStartedShardsUntilNoChange(state, allocationService); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); + int shardsToMove = 10 + 1000; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(settings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(node1ShardCount, relocatingShards.size()); + assertEquals(node1ShardCount, allocation.routingNodes().getRelocatingShardCount()); + } + + public void testNoShardsMoveWhenExcludedAndTimeoutNotBreached() { + int numberOfIndices = 3; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); RoutingTable routingTable = buildRoutingTable(metadata); ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) @@ -145,7 +221,26 @@ private RoutingAllocation buildRoutingAllocation( .routingTable(routingTable) .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, null, System.nanoTime()); + MockAllocationService allocationService = createAllocationService(); + state = applyStartedShardsUntilNoChange(state, allocationService); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); + int shardsToMove = 0; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(settings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(0, relocatingShards.size()); + assertEquals(0, allocation.routingNodes().getRelocatingShardCount()); } private RoutingTable buildRoutingTable(Metadata metadata) { @@ -171,8 +266,8 @@ private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int num static class TestBalancedShardsAllocator extends BalancedShardsAllocator { private final CountDownLatch timedOutLatch; - public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, CountDownLatch timedOutLatch) { - super(settings, clusterSettings); + public TestBalancedShardsAllocator(Settings settings, CountDownLatch timedOutLatch) { + super(settings); this.timedOutLatch = timedOutLatch; } diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index 34b8c58a9c5b2..f54ba36203684 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -213,6 +214,16 @@ protected static AllocationDeciders throttleAllocationDeciders() { ); } + protected static AllocationDeciders allocationDecidersForExcludeAPI(Settings settings) { + return new AllocationDeciders( + Arrays.asList( + new TestAllocateDecision(Decision.YES), + new SameShardAllocationDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + new FilterAllocationDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) + ); + } + protected ClusterState applyStartedShardsUntilNoChange(ClusterState clusterState, AllocationService service) { ClusterState lastClusterState; do { From 4e5bcc80b1633e9734a8a85fee02bd691734ebbb Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 21 Aug 2024 14:57:38 +0530 Subject: [PATCH 09/20] Add tests for move Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 40 +++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index 1c32716d7f138..2f6f030d93f03 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -206,10 +206,9 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() { allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(node1ShardCount, relocatingShards.size()); - assertEquals(node1ShardCount, allocation.routingNodes().getRelocatingShardCount()); } - public void testNoShardsMoveWhenExcludedAndTimeoutNotBreached() { + public void testNoShardsMoveWhenExcludedAndTimeoutBreached() { int numberOfIndices = 3; int numberOfShards = 5; int numberOfReplicas = 1; @@ -240,7 +239,42 @@ public void testNoShardsMoveWhenExcludedAndTimeoutNotBreached() { allocator.allocate(allocation); List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); assertEquals(0, relocatingShards.size()); - assertEquals(0, allocation.routingNodes().getRelocatingShardCount()); + } + + public void testPartialShardsMoveWhenExcludedAndTimeoutBreached() { + int numberOfIndices = 3; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService(); + state = applyStartedShardsUntilNoChange(state, allocationService); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build(); + // since for moves, it creates an iterator over shards which interleaves between nodes, hence + // for shardsToMove=6, it will have 2 shards from node1, node2, node3 each attempting to move with only + // shards from node1 can actually move. Hence, total moves that will be executed is 2 (6/3). + int shardsToMove = 5; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(settings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(shardsToMove / 3, relocatingShards.size()); } private RoutingTable buildRoutingTable(Metadata metadata) { From c094e95a5ce3cbb776b149609298a6eb9143f96e Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 21 Aug 2024 15:24:54 +0530 Subject: [PATCH 10/20] Add tests for rebalance Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index 2f6f030d93f03..559f3c32dbd90 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -277,6 +277,43 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreached() { assertEquals(shardsToMove / 3, relocatingShards.size()); } + public void testClusterRebalancedWhenNotTimedOut() { + int numberOfIndices = 1; + int numberOfShards = 15; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService( + Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() + ); // such that no shards are allocated to node1 + state = applyStartedShardsUntilNoChange(state, allocationService); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + assertEquals(0, node1ShardCount); + Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); + int shardsToMove = 1000; // such that time out is never breached + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(newSettings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(totalShardCount / 3, relocatingShards.size()); + } + private RoutingTable buildRoutingTable(Metadata metadata) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (Map.Entry entry : metadata.getIndices().entrySet()) { @@ -294,6 +331,7 @@ private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int num .numberOfReplicas(numberOfReplicas) ); } + return mb.build(); } From 30aa1ca7d4356f1bcc22c924bdfb041216d70947 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 21 Aug 2024 20:31:35 +0530 Subject: [PATCH 11/20] Minor test fix Signed-off-by: Rishab Nahata --- .../allocation/TimeBoundBalancedShardsAllocatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index 559f3c32dbd90..d75b80dd0919f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -262,7 +262,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreached() { // since for moves, it creates an iterator over shards which interleaves between nodes, hence // for shardsToMove=6, it will have 2 shards from node1, node2, node3 each attempting to move with only // shards from node1 can actually move. Hence, total moves that will be executed is 2 (6/3). - int shardsToMove = 5; // such that time out is never breached + int shardsToMove = 6; // such that time out is never breached BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove)); RoutingAllocation allocation = new RoutingAllocation( allocationDecidersForExcludeAPI(settings), From 825d796a5988c82e57975cf6d148ad6797c72e7f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 22 Aug 2024 02:28:13 +0530 Subject: [PATCH 12/20] Add tests Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java index d75b80dd0919f..c507b52a4c5b3 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java @@ -22,8 +22,14 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -314,6 +320,94 @@ public void testClusterRebalancedWhenNotTimedOut() { assertEquals(totalShardCount / 3, relocatingShards.size()); } + public void testClusterNotRebalancedWhenTimedOut() { + int numberOfIndices = 1; + int numberOfShards = 15; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService( + Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() + ); // such that no shards are allocated to node1 + state = applyStartedShardsUntilNoChange(state, allocationService); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + assertEquals(0, node1ShardCount); + Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); + int shardsToMove = 0; // such that it never balances anything + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove)); + RoutingAllocation allocation = new RoutingAllocation( + allocationDecidersForExcludeAPI(newSettings), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(0, relocatingShards.size()); + } + + public void testClusterPartialRebalancedWhenTimedOut() { + int numberOfIndices = 1; + int numberOfShards = 15; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) + .build(); + MockAllocationService allocationService = createAllocationService( + Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build() + ); // such that no shards are allocated to node1 + state = applyStartedShardsUntilNoChange(state, allocationService); + int node1ShardCount = state.getRoutingNodes().node("node1").size(); + // check all shards allocated + assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size()); + assertEquals(0, node1ShardCount); + Settings newSettings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "").build(); + + // making custom set of allocation deciders such that it never attempts to move shards but always attempts to rebalance + List allocationDeciders = Arrays.asList(new AllocationDecider() { + @Override + public Decision canMoveAnyShard(RoutingAllocation allocation) { + return Decision.NO; + } + }, new AllocationDecider() { + @Override + public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + return Decision.YES; + } + }, new SameShardAllocationDecider(newSettings, new ClusterSettings(newSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + int shardsToMove = 3; // such that it only partially balances few shards + // adding +1 as during rebalance we do per index timeout check and then per node check + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(newSettings, new CountDownLatch(shardsToMove + 1)); + RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(allocationDeciders), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + allocator.allocate(allocation); + List relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING); + assertEquals(3, relocatingShards.size()); + } + private RoutingTable buildRoutingTable(Metadata metadata) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (Map.Entry entry : metadata.getIndices().entrySet()) { From b1b5b99028919cb52fa04a0ae8b1ea6203c77f61 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 27 Aug 2024 12:53:53 +0530 Subject: [PATCH 13/20] Minor changes Signed-off-by: Rishab Nahata --- .../allocation/allocator/BalancedShardsAllocator.java | 2 +- .../allocation/allocator/LocalShardsBalancer.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 7c4b278c92de5..283cd8af5472e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -361,7 +361,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f preferPrimaryShardBalance, preferPrimaryShardRebalance, ignoreThrottleInRestore, - this::allocatorTimedOut + null // as we don't need to check if timed out or not while just understanding ShardAllocationDecision ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index a7c695042c4f2..5b95e1442fc57 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -349,7 +349,7 @@ private void balanceByWeights() { final float[] weights = sorter.weights; for (String index : buildWeightOrderedIndices()) { // Terminate if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { logger.info( "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping indices iteration" @@ -381,12 +381,12 @@ private void balanceByWeights() { int highIdx = relevantNodes - 1; while (true) { // break if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { logger.info( "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping relevant nodes iteration" ); - break; + return; } final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx]; final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx]; @@ -593,7 +593,7 @@ void moveShards() { } // Terminate if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { logger.info( "Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping shard iteration" @@ -833,7 +833,7 @@ void allocateUnassigned() { } do { for (int i = 0; i < primaryLength; i++) { - if (timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { // TODO - maybe check if we can allow wait for active shards thingy bypass this condition logger.info( "Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed", From 87a5cfc1fa729b257042857261bf8a8dd7742507 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 27 Aug 2024 13:04:47 +0530 Subject: [PATCH 14/20] Trigger Build Signed-off-by: Rishab Nahata From fc9a8ff5e411b8135849362e61dd636de44ad9fa Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 27 Aug 2024 14:27:13 +0530 Subject: [PATCH 15/20] Add timer test Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) rename server/src/test/java/org/opensearch/cluster/routing/allocation/{ => allocator}/TimeBoundBalancedShardsAllocatorTests.java (93%) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java similarity index 93% rename from server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java rename to server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index c507b52a4c5b3..f77971b39883b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.cluster.routing.allocation; +package org.opensearch.cluster.routing.allocation.allocator; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; @@ -21,7 +21,7 @@ import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; @@ -37,6 +37,7 @@ import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING; public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { @@ -408,6 +409,35 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca assertEquals(3, relocatingShards.size()); } + public void testAllocatorNeverTimedOutIfValueIsMinusOne() { + Settings build = Settings.builder().put("cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", "-1").build(); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); + assertFalse(allocator.allocatorTimedOut(randomLong())); + } + + public void testAllocatorTimeout() { + String settingKey = "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout"; + // Valid setting with timeout = 20s + Settings build = Settings.builder().put(settingKey, "20s").build(); + assertEquals(20, ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Valid setting with timeout > 20s + build = Settings.builder().put(settingKey, "30000ms").build(); + assertEquals(30, ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds()); + + // Invalid setting with timeout < 20s + Settings lessThan20sSetting = Settings.builder().put(settingKey, "10s").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting) + ); + assertEquals("Setting [" + settingKey + "] should be more than 20s or -1ms to disable timeout", iae.getMessage()); + + // Valid setting with timeout = -1 + build = Settings.builder().put(settingKey, "-1").build(); + assertEquals(-1, ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); + } + private RoutingTable buildRoutingTable(Metadata metadata) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (Map.Entry entry : metadata.getIndices().entrySet()) { From 9a101b9e4e382d3d010e4ae81f5cfaf8cd4f1ef3 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 27 Aug 2024 16:14:13 +0530 Subject: [PATCH 16/20] Trigger Build Signed-off-by: Rishab Nahata From 72a10b410c174614a06b8a36acc5dbbfc9a7e707 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 28 Aug 2024 20:12:21 +0530 Subject: [PATCH 17/20] Remove null Signed-off-by: Rishab Nahata --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 283cd8af5472e..5110efb9a15e3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -361,7 +361,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f preferPrimaryShardBalance, preferPrimaryShardRebalance, ignoreThrottleInRestore, - null // as we don't need to check if timed out or not while just understanding ShardAllocationDecision + x -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -625,7 +625,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, null); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, x -> false); } } From c18d44c6736d49ef1b11d9a6913ea34595ee5348 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 29 Aug 2024 15:04:24 +0530 Subject: [PATCH 18/20] Fix test Signed-off-by: Rishab Nahata --- .../allocator/BalancedShardsAllocator.java | 2 +- .../decider/DiskThresholdDeciderTests.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5110efb9a15e3..11b94f98212b7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -173,7 +173,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { public static final Setting ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", - TimeValue.timeValueSeconds(20), + TimeValue.MINUS_ONE, TimeValue.MINUS_ONE, timeValue -> { if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 2e24640fe858d..e0701f9ba9c7f 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -530,6 +530,8 @@ public void testDiskThresholdWithAbsoluteSizes() { // Primary should initialize, even though both nodes are over the limit initialize assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + // below checks are unnecessary as the primary shard is always assigned to node2 as BSA always picks up that node + // first as both node1 and node2 have equal weight as both of them contain zero shards. String nodeWithPrimary, nodeWithoutPrimary; if (clusterState.getRoutingNodes().node("node1").size() == 1) { nodeWithPrimary = "node1"; @@ -679,10 +681,12 @@ public void testDiskThresholdWithAbsoluteSizes() { clusterState = startInitializingShardsAndReroute(strategy, clusterState); logShardStates(clusterState); - // primary shard already has been relocated away - assertThat(clusterState.getRoutingNodes().node(nodeWithPrimary).size(), equalTo(0)); - // node with increased space still has its shard - assertThat(clusterState.getRoutingNodes().node(nodeWithoutPrimary).size(), equalTo(1)); + // primary shard already has been relocated away - this is a wrong expectation as we don't really move + // primary first unless explicitly set by setting. This is caught with PR + // https://github.com/opensearch-project/OpenSearch/pull/14761/ + // as it randomises nodes to check for potential moves + // assertThat(clusterState.getRoutingNodes().node(nodeWithPrimary).size(), equalTo(0)); + // assertThat(clusterState.getRoutingNodes().node(nodeWithoutPrimary).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1)); From 3ba16156366408c25e7e24d1607eda93140108e0 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 29 Aug 2024 15:05:39 +0530 Subject: [PATCH 19/20] Fix PR number Signed-off-by: Rishab Nahata --- .../routing/allocation/decider/DiskThresholdDeciderTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index e0701f9ba9c7f..94e91c3f7c3c1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -683,7 +683,7 @@ public void testDiskThresholdWithAbsoluteSizes() { logShardStates(clusterState); // primary shard already has been relocated away - this is a wrong expectation as we don't really move // primary first unless explicitly set by setting. This is caught with PR - // https://github.com/opensearch-project/OpenSearch/pull/14761/ + // https://github.com/opensearch-project/OpenSearch/pull/15239/ // as it randomises nodes to check for potential moves // assertThat(clusterState.getRoutingNodes().node(nodeWithPrimary).size(), equalTo(0)); // assertThat(clusterState.getRoutingNodes().node(nodeWithoutPrimary).size(), equalTo(1)); From 75145b7a20c6748af275b13d3df8f540a9bf4ed8 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 29 Aug 2024 17:25:53 +0530 Subject: [PATCH 20/20] Refactor Signed-off-by: Rishab Nahata --- .../allocator/BalancedShardsAllocator.java | 8 ++++---- .../allocation/allocator/LocalShardsBalancer.java | 14 +++++++------- .../TimeBoundBalancedShardsAllocatorTests.java | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 11b94f98212b7..a5193ca602f04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -311,14 +311,14 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } - protected boolean allocatorTimedOut(long currentTime) { + protected boolean allocatorTimedOut() { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks"); } return false; } - return currentTime - this.startTime > allocatorTimeout.nanos(); + return System.nanoTime() - this.startTime > allocatorTimeout.nanos(); } @Override @@ -361,7 +361,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f preferPrimaryShardBalance, preferPrimaryShardRebalance, ignoreThrottleInRestore, - x -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision + () -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -625,7 +625,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, x -> false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, () -> false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 5b95e1442fc57..adb8ee2cf7e85 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -41,7 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -72,7 +72,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; - private final Function timedOutFunc; + private final Supplier timedOutFunc; private int totalShardCount = 0; public LocalShardsBalancer( @@ -84,7 +84,7 @@ public LocalShardsBalancer( boolean preferPrimaryBalance, boolean preferPrimaryRebalance, boolean ignoreThrottleInRestore, - Function timedOutFunc + Supplier timedOutFunc ) { this.logger = logger; this.allocation = allocation; @@ -349,7 +349,7 @@ private void balanceByWeights() { final float[] weights = sorter.weights; for (String index : buildWeightOrderedIndices()) { // Terminate if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { logger.info( "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping indices iteration" @@ -381,7 +381,7 @@ private void balanceByWeights() { int highIdx = relevantNodes - 1; while (true) { // break if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { logger.info( "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping relevant nodes iteration" @@ -593,7 +593,7 @@ void moveShards() { } // Terminate if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { logger.info( "Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping shard iteration" @@ -833,7 +833,7 @@ void allocateUnassigned() { } do { for (int i = 0; i < primaryLength; i++) { - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { // TODO - maybe check if we can allow wait for active shards thingy bypass this condition logger.info( "Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed", diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index f77971b39883b..a10c305686638 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -412,7 +412,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca public void testAllocatorNeverTimedOutIfValueIsMinusOne() { Settings build = Settings.builder().put("cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", "-1").build(); BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); - assertFalse(allocator.allocatorTimedOut(randomLong())); + assertFalse(allocator.allocatorTimedOut()); } public void testAllocatorTimeout() { @@ -468,7 +468,7 @@ public TestBalancedShardsAllocator(Settings settings, CountDownLatch timedOutLat } @Override - protected boolean allocatorTimedOut(long currentTime) { + protected boolean allocatorTimedOut() { if (timedOutLatch.getCount() == 0) { return true; }