diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 11b94f98212b7..a5193ca602f04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -311,14 +311,14 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } - protected boolean allocatorTimedOut(long currentTime) { + protected boolean allocatorTimedOut() { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks"); } return false; } - return currentTime - this.startTime > allocatorTimeout.nanos(); + return System.nanoTime() - this.startTime > allocatorTimeout.nanos(); } @Override @@ -361,7 +361,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f preferPrimaryShardBalance, preferPrimaryShardRebalance, ignoreThrottleInRestore, - x -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision + () -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -625,7 +625,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, x -> false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, () -> false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 5b95e1442fc57..adb8ee2cf7e85 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -41,7 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -72,7 +72,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; - private final Function timedOutFunc; + private final Supplier timedOutFunc; private int totalShardCount = 0; public LocalShardsBalancer( @@ -84,7 +84,7 @@ public LocalShardsBalancer( boolean preferPrimaryBalance, boolean preferPrimaryRebalance, boolean ignoreThrottleInRestore, - Function timedOutFunc + Supplier timedOutFunc ) { this.logger = logger; this.allocation = allocation; @@ -349,7 +349,7 @@ private void balanceByWeights() { final float[] weights = sorter.weights; for (String index : buildWeightOrderedIndices()) { // Terminate if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { logger.info( "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping indices iteration" @@ -381,7 +381,7 @@ private void balanceByWeights() { int highIdx = relevantNodes - 1; while (true) { // break if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { logger.info( "Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping relevant nodes iteration" @@ -593,7 +593,7 @@ void moveShards() { } // Terminate if the time allocated to the balanced shards allocator has elapsed - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { logger.info( "Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed" + ". Skipping shard iteration" @@ -833,7 +833,7 @@ void allocateUnassigned() { } do { for (int i = 0; i < primaryLength; i++) { - if (timedOutFunc != null && timedOutFunc.apply(System.nanoTime())) { + if (timedOutFunc != null && timedOutFunc.get()) { // TODO - maybe check if we can allow wait for active shards thingy bypass this condition logger.info( "Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed", diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index f77971b39883b..a10c305686638 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -412,7 +412,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca public void testAllocatorNeverTimedOutIfValueIsMinusOne() { Settings build = Settings.builder().put("cluster.routing.allocation.balanced_shards_allocator.allocator_timeout", "-1").build(); BalancedShardsAllocator allocator = new BalancedShardsAllocator(build); - assertFalse(allocator.allocatorTimedOut(randomLong())); + assertFalse(allocator.allocatorTimedOut()); } public void testAllocatorTimeout() { @@ -468,7 +468,7 @@ public TestBalancedShardsAllocator(Settings settings, CountDownLatch timedOutLat } @Override - protected boolean allocatorTimedOut(long currentTime) { + protected boolean allocatorTimedOut() { if (timedOutLatch.getCount() == 0) { return true; }