diff --git a/CHANGELOG.md b/CHANGELOG.md index ada99d6c2796b..6406993a86a58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603)) - Adds support for minimum compatible version for extensions ([#6003](https://github.com/opensearch-project/OpenSearch/pull/6003)) - Add a guardrail to limit maximum number of shard on the cluster ([#6143](https://github.com/opensearch-project/OpenSearch/pull/6143)) +- Add cancellation of in-flight SearchTasks based on resource consumption ([#5606](https://github.com/opensearch-project/OpenSearch/pull/5605)) ### Dependencies - Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index f8629e2c88b07..a63c3287ea124 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -16,7 +16,9 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.common.inject.Inject; @@ -29,9 +31,8 @@ import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; -import org.opensearch.search.backpressure.trackers.CpuUsageTracker; -import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; -import org.opensearch.search.backpressure.trackers.HeapUsageTracker; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; +import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancelledException; import org.opensearch.tasks.TaskId; @@ -47,6 +48,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -56,6 +58,7 @@ public class SearchBackpressureIT extends OpenSearchIntegTestCase { private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); + private static final int MOVING_AVERAGE_WINDOW_SIZE = 10; @Override protected Collection> nodePlugins() { @@ -70,6 +73,7 @@ public final void setupNodeSettings() { .put(NodeDuressSettings.SETTING_CPU_THRESHOLD.getKey(), 0.0) .put(NodeDuressSettings.SETTING_HEAP_THRESHOLD.getKey(), 0.0) .put(NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES.getKey(), 1) + .put(SearchTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) .put(SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); @@ -86,15 +90,56 @@ public final void cleanupNodeSettings() { ); } + public void testCancellationSettingsChanged() { + Settings request = Settings.builder().put(SearchTaskSettings.SETTING_CANCELLATION_RATE.getKey(), "0.05").build(); + ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); + assertEquals(response.getPersistentSettings().get(SearchTaskSettings.SETTING_CANCELLATION_RATE.getKey()), "0.05"); + + request = Settings.builder().put(SearchShardTaskSettings.SETTING_CANCELLATION_RATIO.getKey(), "0.7").build(); + response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get(); + assertEquals(response.getPersistentSettings().get(SearchShardTaskSettings.SETTING_CANCELLATION_RATIO.getKey()), "0.7"); + } + + public void testSearchTaskCancellationWithHighElapsedTime() throws InterruptedException { + Settings request = Settings.builder() + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(SearchTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 1000) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); + + ExceptionCatchingListener listener = new ExceptionCatchingListener(); + client().execute( + TestTransportAction.ACTION, + new TestRequest<>( + RequestType.HIGH_ELAPSED_TIME, + (TaskFactory) (id, type, action, description, parentTaskId, headers) -> new SearchTask( + id, + type, + action, + descriptionSupplier(description), + parentTaskId, + headers + ) + ), + listener + ); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + + Exception caughtException = listener.getException(); + assertNotNull("SearchTask should have been cancelled with TaskCancelledException", caughtException); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("elapsed time exceeded")); + } + public void testSearchShardTaskCancellationWithHighElapsedTime() throws InterruptedException { Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 1000) + .put(SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 1000) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); ExceptionCatchingListener listener = new ExceptionCatchingListener(); - client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); + client().execute(TestTransportAction.ACTION, new TestRequest<>(RequestType.HIGH_ELAPSED_TIME, SearchShardTask::new), listener); assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); @@ -103,15 +148,46 @@ public void testSearchShardTaskCancellationWithHighElapsedTime() throws Interrup MatcherAssert.assertThat(caughtException.getMessage(), containsString("elapsed time exceeded")); } + public void testSearchTaskCancellationWithHighCpu() throws InterruptedException { + Settings request = Settings.builder() + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); + + ExceptionCatchingListener listener = new ExceptionCatchingListener(); + client().execute( + TestTransportAction.ACTION, + new TestRequest<>( + RequestType.HIGH_CPU, + (TaskFactory) (id, type, action, description, parentTaskId, headers) -> new SearchTask( + id, + type, + action, + descriptionSupplier(description), + parentTaskId, + headers + ) + ), + listener + ); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + + Exception caughtException = listener.getException(); + assertNotNull("SearchTask should have been cancelled with TaskCancelledException", caughtException); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("cpu usage exceeded")); + } + public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000) + .put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); ExceptionCatchingListener listener = new ExceptionCatchingListener(); - client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), listener); + client().execute(TestTransportAction.ACTION, new TestRequest<>(RequestType.HIGH_CPU, SearchShardTask::new), listener); assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); @@ -120,27 +196,82 @@ public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedExcep MatcherAssert.assertThat(caughtException.getMessage(), containsString("cpu usage exceeded")); } + public void testSearchTaskCancellationWithHighHeapUsage() throws InterruptedException { + // Before SearchBackpressureService cancels a task based on its heap usage, we need to build up the heap moving average + // To build up the heap moving average, we need to hit the same node with multiple requests and then hit the same node with a + // request having higher heap usage + String node = randomFrom(internalCluster().getNodeNames()); + Settings request = Settings.builder() + .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") + .put(SearchTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) + .put(SearchTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 1.0) + .put(SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), MOVING_AVERAGE_WINDOW_SIZE) + .build(); + assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); + + ExceptionCatchingListener listener = new ExceptionCatchingListener(); + for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) { + client(node).execute( + TestTransportAction.ACTION, + new TestRequest<>( + RequestType.HIGH_HEAP, + (TaskFactory) (id, type, action, description, parentTaskId, headers) -> new SearchTask( + id, + type, + action, + descriptionSupplier(description), + parentTaskId, + headers + ) + ), + listener + ); + } + + listener = new ExceptionCatchingListener(); + client(node).execute( + TestTransportAction.ACTION, + new TestRequest<>( + RequestType.HIGHER_HEAP, + (TaskFactory) (id, type, action, description, parentTaskId, headers) -> new SearchTask( + id, + type, + action, + descriptionSupplier(description), + parentTaskId, + headers + ) + ), + listener + ); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + + Exception caughtException = listener.getException(); + assertNotNull("SearchTask should have been cancelled with TaskCancelledException", caughtException); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("heap usage exceeded")); + } + public void testSearchShardTaskCancellationWithHighHeapUsage() throws InterruptedException { // Before SearchBackpressureService cancels a task based on its heap usage, we need to build up the heap moving average // To build up the heap moving average, we need to hit the same node with multiple requests and then hit the same node with a // request having higher heap usage String node = randomFrom(internalCluster().getNodeNames()); - final int MOVING_AVERAGE_WINDOW_SIZE = 10; Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) - .put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 1.0) - .put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), MOVING_AVERAGE_WINDOW_SIZE) + .put(SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.0) + .put(SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 1.0) + .put(SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), MOVING_AVERAGE_WINDOW_SIZE) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); ExceptionCatchingListener listener = new ExceptionCatchingListener(); for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) { - client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_HEAP), listener); + client(node).execute(TestTransportAction.ACTION, new TestRequest<>(RequestType.HIGH_HEAP, SearchShardTask::new), listener); } listener = new ExceptionCatchingListener(); - client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGHER_HEAP), listener); + client(node).execute(TestTransportAction.ACTION, new TestRequest<>(RequestType.HIGHER_HEAP, SearchShardTask::new), listener); assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); @@ -154,7 +285,7 @@ public void testSearchCancellationWithBackpressureDisabled() throws InterruptedE assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); ExceptionCatchingListener listener = new ExceptionCatchingListener(); - client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); + client().execute(TestTransportAction.ACTION, new TestRequest<>(RequestType.HIGH_ELAPSED_TIME, SearchShardTask::new), listener); // waiting for the TIMEOUT * 3 time for the request to complete and the latch to countdown. assertTrue( "SearchShardTask should have been completed by now and countdown the latch", @@ -196,11 +327,21 @@ enum RequestType { HIGH_ELAPSED_TIME; } - public static class TestRequest extends ActionRequest { + private Supplier descriptionSupplier(String description) { + return () -> description; + } + + interface TaskFactory { + T createTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers); + } + + public static class TestRequest extends ActionRequest { private final RequestType type; + private TaskFactory taskFactory; - public TestRequest(RequestType type) { + public TestRequest(RequestType type, TaskFactory taskFactory) { this.type = type; + this.taskFactory = taskFactory; } public TestRequest(StreamInput in) throws IOException { @@ -215,7 +356,7 @@ public ActionRequestValidationException validate() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, "", parentTaskId, headers); + return taskFactory.createTask(id, type, action, "", parentTaskId, headers); } @Override @@ -252,7 +393,7 @@ public TestTransportAction(TransportService transportService, ThreadPool threadP protected void doExecute(Task task, TestRequest request, ActionListener listener) { threadPool.executor(ThreadPool.Names.SEARCH).execute(() -> { try { - SearchShardTask searchShardTask = (SearchShardTask) task; + CancellableTask cancellableTask = (CancellableTask) task; long startTime = System.nanoTime(); // Doing a busy-wait until task cancellation or timeout. @@ -260,11 +401,11 @@ protected void doExecute(Task task, TestRequest request, ActionListener request) throws InterruptedException { switch (request.getType()) { case HIGH_CPU: long i = 0, j = 1, k = 1, iterations = 1000; diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index c9d0d6e2d3d47..c94f02395cf38 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -36,6 +36,7 @@ import org.opensearch.search.fetch.ShardFetchSearchRequest; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.SearchBackpressureTask; import org.opensearch.tasks.TaskId; import java.util.Map; @@ -47,7 +48,7 @@ * * @opensearch.internal */ -public class SearchShardTask extends CancellableTask { +public class SearchShardTask extends CancellableTask implements SearchBackpressureTask { // generating metadata in a lazy way since source can be quite big private final MemoizedSupplier metadataSupplier; diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 987485fe44c65..dad6c44da4f10 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -34,6 +34,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.SearchBackpressureTask; import org.opensearch.tasks.TaskId; import java.util.Map; @@ -46,7 +47,7 @@ * * @opensearch.internal */ -public class SearchTask extends CancellableTask { +public class SearchTask extends CancellableTask implements SearchBackpressureTask { // generating description in a lazy way since source can be quite big private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e84d9ef1e8a96..887a091552b91 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -44,9 +44,7 @@ import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; -import org.opensearch.search.backpressure.trackers.CpuUsageTracker; -import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; -import org.opensearch.search.backpressure.trackers.HeapUsageTracker; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; @@ -600,18 +598,31 @@ public void apply(Settings value, Settings current, Settings previous) { // Settings related to search backpressure SearchBackpressureSettings.SETTING_MODE, - SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, - SearchBackpressureSettings.SETTING_CANCELLATION_RATE, - SearchBackpressureSettings.SETTING_CANCELLATION_BURST, + NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES, NodeDuressSettings.SETTING_CPU_THRESHOLD, NodeDuressSettings.SETTING_HEAP_THRESHOLD, + SearchTaskSettings.SETTING_CANCELLATION_RATIO, + SearchTaskSettings.SETTING_CANCELLATION_RATE, + SearchTaskSettings.SETTING_CANCELLATION_BURST, + SearchTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD, + SearchTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD, + SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, + SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD, + SearchTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, + SearchTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, + SearchShardTaskSettings.SETTING_CANCELLATION_RATIO, + SearchShardTaskSettings.SETTING_CANCELLATION_RATE, + SearchShardTaskSettings.SETTING_CANCELLATION_BURST, + SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD, + SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD, + SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, + SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD, + SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, - HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD, - HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD, - HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, - CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD, - ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD + SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, // deprecated + SearchBackpressureSettings.SETTING_CANCELLATION_RATE, // deprecated + SearchBackpressureSettings.SETTING_CANCELLATION_BURST // deprecated ) ) ); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ca27c639bec09..db95b64312df0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -867,7 +867,8 @@ protected Node( final SearchBackpressureService searchBackpressureService = new SearchBackpressureService( searchBackpressureSettings, taskResourceTrackingService, - threadPool + threadPool, + transportService.getTaskManager() ); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); diff --git a/server/src/main/java/org/opensearch/search/backpressure/CancellationSettingsListener.java b/server/src/main/java/org/opensearch/search/backpressure/CancellationSettingsListener.java new file mode 100644 index 0000000000000..f630b3b8ed987 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/CancellationSettingsListener.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure; + +/** + * Listener for callbacks related to cancellation settings + */ +public interface CancellationSettingsListener { + + void onRatioChanged(double ratio); + + void onRateChanged(double rate); + + void onBurstChanged(double burst); +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index fd13198b957da..765f2c5b6b228 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -12,14 +12,19 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.component.AbstractLifecycleComponent; -import org.opensearch.common.util.TokenBucket; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; import org.opensearch.search.backpressure.stats.SearchShardTaskStats; +import org.opensearch.search.backpressure.stats.SearchTaskStats; import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; @@ -27,8 +32,10 @@ import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.SearchBackpressureTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; +import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener; import org.opensearch.threadpool.Scheduler; @@ -36,23 +43,24 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.DoubleSupplier; import java.util.function.LongSupplier; import java.util.stream.Collectors; +import static org.opensearch.search.backpressure.trackers.HeapUsageTracker.isHeapTrackingSupported; + /** * SearchBackpressureService is responsible for monitoring and cancelling in-flight search tasks if they are * breaching resource usage limits when the node is in duress. * * @opensearch.internal */ -public class SearchBackpressureService extends AbstractLifecycleComponent - implements - TaskCompletionListener, - SearchBackpressureSettings.Listener { +public class SearchBackpressureService extends AbstractLifecycleComponent implements TaskCompletionListener { private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class); private volatile Scheduler.Cancellable scheduledFuture; @@ -63,19 +71,16 @@ public class SearchBackpressureService extends AbstractLifecycleComponent private final LongSupplier timeNanosSupplier; private final List nodeDuressTrackers; - private final List taskResourceUsageTrackers; - - private final AtomicReference taskCancellationRateLimiter = new AtomicReference<>(); - private final AtomicReference taskCancellationRatioLimiter = new AtomicReference<>(); + private final Map, List> taskTrackers; - // Currently, only the state of SearchShardTask is being tracked. - // This can be generalized to Map once we start supporting cancellation of SearchTasks as well. - private final SearchBackpressureState state = new SearchBackpressureState(); + private final Map, SearchBackpressureState> searchBackpressureStates; + private final TaskManager taskManager; public SearchBackpressureService( SearchBackpressureSettings settings, TaskResourceTrackingService taskResourceTrackingService, - ThreadPool threadPool + ThreadPool threadPool, + TaskManager taskManager ) { this( settings, @@ -90,7 +95,25 @@ public SearchBackpressureService( () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold() ) ), - List.of(new CpuUsageTracker(settings), new HeapUsageTracker(settings), new ElapsedTimeTracker(settings, System::nanoTime)) + getTrackers( + settings.getSearchTaskSettings()::getCpuTimeNanosThreshold, + settings.getSearchTaskSettings()::getHeapVarianceThreshold, + settings.getSearchTaskSettings()::getHeapPercentThreshold, + settings.getSearchTaskSettings().getHeapMovingAverageWindowSize(), + settings.getSearchTaskSettings()::getElapsedTimeNanosThreshold, + settings.getClusterSettings(), + SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE + ), + getTrackers( + settings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold, + settings.getSearchShardTaskSettings()::getHeapVarianceThreshold, + settings.getSearchShardTaskSettings()::getHeapPercentThreshold, + settings.getSearchShardTaskSettings().getHeapMovingAverageWindowSize(), + settings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, + settings.getClusterSettings(), + SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE + ), + taskManager ); } @@ -100,24 +123,38 @@ public SearchBackpressureService( ThreadPool threadPool, LongSupplier timeNanosSupplier, List nodeDuressTrackers, - List taskResourceUsageTrackers + List searchTaskTrackers, + List searchShardTaskTrackers, + TaskManager taskManager ) { this.settings = settings; - this.settings.addListener(this); this.taskResourceTrackingService = taskResourceTrackingService; this.taskResourceTrackingService.addTaskCompletionListener(this); this.threadPool = threadPool; this.timeNanosSupplier = timeNanosSupplier; this.nodeDuressTrackers = nodeDuressTrackers; - this.taskResourceUsageTrackers = taskResourceUsageTrackers; - - this.taskCancellationRateLimiter.set( - new TokenBucket(timeNanosSupplier, getSettings().getCancellationRateNanos(), getSettings().getCancellationBurst()) + this.taskManager = taskManager; + + this.searchBackpressureStates = Map.of( + SearchTask.class, + new SearchBackpressureState( + timeNanosSupplier, + getSettings().getSearchTaskSettings().getCancellationRateNanos(), + getSettings().getSearchTaskSettings().getCancellationBurst(), + getSettings().getSearchTaskSettings().getCancellationRatio() + ), + SearchShardTask.class, + new SearchBackpressureState( + timeNanosSupplier, + getSettings().getSearchShardTaskSettings().getCancellationRateNanos(), + getSettings().getSearchShardTaskSettings().getCancellationBurst(), + getSettings().getSearchShardTaskSettings().getCancellationRatio() + ) ); + this.settings.getSearchTaskSettings().addListener(searchBackpressureStates.get(SearchTask.class)); + this.settings.getSearchShardTaskSettings().addListener(searchBackpressureStates.get(SearchShardTask.class)); - this.taskCancellationRatioLimiter.set( - new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) - ); + this.taskTrackers = Map.of(SearchTask.class, searchTaskTrackers, SearchShardTask.class, searchShardTaskTrackers); } void doRun() { @@ -130,18 +167,36 @@ void doRun() { return; } - // We are only targeting in-flight cancellation of SearchShardTask for now. - List searchShardTasks = getSearchShardTasks(); + List searchTasks = getTaskByType(SearchTask.class); + List searchShardTasks = getTaskByType(SearchShardTask.class); + List cancellableTasks = new ArrayList<>(); // Force-refresh usage stats of these tasks before making a cancellation decision. + taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0])); taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0])); - // Skip cancellation if the increase in heap usage is not due to search requests. - if (isHeapUsageDominatedBySearch(searchShardTasks) == false) { + // Check if increase in heap usage is due to SearchTasks + if (HeapUsageTracker.isHeapUsageDominatedBySearch( + searchTasks, + getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold() + )) { + cancellableTasks.addAll(searchTasks); + } + + // Check if increase in heap usage is due to SearchShardTasks + if (HeapUsageTracker.isHeapUsageDominatedBySearch( + searchShardTasks, + getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold() + )) { + cancellableTasks.addAll(searchShardTasks); + } + + // none of the task type is breaching the heap usage thresholds and hence we do not cancel any tasks + if (cancellableTasks.isEmpty()) { return; } - for (TaskCancellation taskCancellation : getTaskCancellations(searchShardTasks)) { + for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) { logger.debug( "[{} mode] cancelling task [{}] due to high resource consumption [{}]", mode.getName(), @@ -153,18 +208,34 @@ void doRun() { continue; } + Class taskType = getTaskType(taskCancellation.getTask()); + // Independently remove tokens from both token buckets. - boolean rateLimitReached = taskCancellationRateLimiter.get().request() == false; - boolean ratioLimitReached = taskCancellationRatioLimiter.get().request() == false; + SearchBackpressureState searchBackpressureState = searchBackpressureStates.get(taskType); + boolean rateLimitReached = searchBackpressureState.getRateLimiter().request() == false; + boolean ratioLimitReached = searchBackpressureState.getRatioLimiter().request() == false; // Stop cancelling tasks if there are no tokens in either of the two token buckets. if (rateLimitReached && ratioLimitReached) { logger.debug("task cancellation limit reached"); - state.incrementLimitReachedCount(); + searchBackpressureState.incrementLimitReachedCount(); break; } - taskCancellation.cancel(); + taskCancellation.cancelTaskAndDescendants(taskManager); + } + } + + /** + * Given a task, returns the type of the task + */ + Class getTaskType(Task task) { + if (task instanceof SearchTask) { + return SearchTask.class; + } else if (task instanceof SearchShardTask) { + return SearchShardTask.class; + } else { + throw new IllegalArgumentException("task must be instance of either SearchTask or SearchShardTask"); } } @@ -187,26 +258,16 @@ boolean isNodeInDuress() { /** * Returns true if the increase in heap usage is due to search requests. */ - boolean isHeapUsageDominatedBySearch(List searchShardTasks) { - long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); - long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold(); - if (usage < threshold) { - logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold); - return false; - } - - return true; - } /** - * Filters and returns the list of currently running SearchShardTasks. + * Filters and returns the list of currently running tasks of specified type. */ - List getSearchShardTasks() { + List getTaskByType(Class type) { return taskResourceTrackingService.getResourceAwareTasks() .values() .stream() - .filter(task -> task instanceof SearchShardTask) - .map(task -> (SearchShardTask) task) + .filter(type::isInstance) + .map(type::cast) .collect(Collectors.toUnmodifiableList()); } @@ -218,18 +279,16 @@ List getSearchShardTasks() { TaskCancellation getTaskCancellation(CancellableTask task) { List reasons = new ArrayList<>(); List callbacks = new ArrayList<>(); - - for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { + Class taskType = getTaskType(task); + List trackers = taskTrackers.get(taskType); + for (TaskResourceUsageTracker tracker : trackers) { Optional reason = tracker.checkAndMaybeGetCancellationReason(task); if (reason.isPresent()) { - reasons.add(reason.get()); callbacks.add(tracker::incrementCancellations); + reasons.add(reason.get()); } } - - if (task instanceof SearchShardTask) { - callbacks.add(state::incrementCancellationCount); - } + callbacks.add(searchBackpressureStates.get(taskType)::incrementCancellationCount); return new TaskCancellation(task, reasons, callbacks); } @@ -249,8 +308,39 @@ SearchBackpressureSettings getSettings() { return settings; } - SearchBackpressureState getState() { - return state; + SearchBackpressureState getSearchBackpressureStats(Class taskType) { + return searchBackpressureStates.get(taskType); + } + + /** + * Given the threshold suppliers, returns the list of applicable trackers + */ + public static List getTrackers( + LongSupplier cpuThresholdSupplier, + DoubleSupplier heapVarianceSupplier, + DoubleSupplier heapPercentThresholdSupplier, + int heapMovingAverageWindowSize, + LongSupplier ElapsedTimeNanosSupplier, + ClusterSettings clusterSettings, + Setting windowSizeSetting + ) { + List trackers = new ArrayList<>(); + trackers.add(new CpuUsageTracker(cpuThresholdSupplier)); + if (isHeapTrackingSupported()) { + trackers.add( + new HeapUsageTracker( + heapVarianceSupplier, + heapPercentThresholdSupplier, + heapMovingAverageWindowSize, + clusterSettings, + windowSizeSetting + ) + ); + } else { + logger.warn("heap size couldn't be determined"); + } + trackers.add(new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime)); + return Collections.unmodifiableList(trackers); } @Override @@ -259,44 +349,27 @@ public void onTaskCompleted(Task task) { return; } - if (task instanceof SearchShardTask == false) { + if (task instanceof SearchBackpressureTask == false) { return; } - SearchShardTask searchShardTask = (SearchShardTask) task; - if (searchShardTask.isCancelled() == false) { - state.incrementCompletionCount(); + CancellableTask cancellableTask = (CancellableTask) task; + Class taskType = getTaskType(task); + if (cancellableTask.isCancelled() == false) { + searchBackpressureStates.get(taskType).incrementCompletionCount(); } List exceptions = new ArrayList<>(); - for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) { + List trackers = taskTrackers.get(taskType); + for (TaskResourceUsageTracker tracker : trackers) { try { - tracker.update(searchShardTask); + tracker.update(task); } catch (Exception e) { exceptions.add(e); } } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); - } - @Override - public void onCancellationRatioChanged() { - taskCancellationRatioLimiter.set( - new TokenBucket(state::getCompletionCount, getSettings().getCancellationRatio(), getSettings().getCancellationBurst()) - ); - } - - @Override - public void onCancellationRateChanged() { - taskCancellationRateLimiter.set( - new TokenBucket(timeNanosSupplier, getSettings().getCancellationRateNanos(), getSettings().getCancellationBurst()) - ); - } - - @Override - public void onCancellationBurstChanged() { - onCancellationRatioChanged(); - onCancellationRateChanged(); + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); } @Override @@ -321,15 +394,24 @@ protected void doStop() { protected void doClose() throws IOException {} public SearchBackpressureStats nodeStats() { - List searchShardTasks = getSearchShardTasks(); + List searchTasks = getTaskByType(SearchTask.class); + List searchShardTasks = getTaskByType(SearchShardTask.class); + SearchTaskStats searchTaskStats = new SearchTaskStats( + searchBackpressureStates.get(SearchTask.class).getCancellationCount(), + searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(), + taskTrackers.get(SearchTask.class) + .stream() + .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks))) + ); SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats( - state.getCancellationCount(), - state.getLimitReachedCount(), - taskResourceUsageTrackers.stream() + searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(), + searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(), + taskTrackers.get(SearchShardTask.class) + .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks))) ); - return new SearchBackpressureStats(searchShardTaskStats, getSettings().getMode()); + return new SearchBackpressureStats(searchTaskStats, searchShardTaskStats, getSettings().getMode()); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java index a62231ec29ede..5f086bd498036 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java @@ -8,28 +8,48 @@ package org.opensearch.search.backpressure; +import org.opensearch.common.util.TokenBucket; + import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; /** * Tracks the current state of task completions and cancellations. * * @opensearch.internal */ -public class SearchBackpressureState { +public class SearchBackpressureState implements CancellationSettingsListener { + private final AtomicReference rateLimiter; + private final AtomicReference ratioLimiter; + private final LongSupplier timeNanosSupplier; /** * The number of successful task completions. */ private final AtomicLong completionCount = new AtomicLong(); - /** * The number of task cancellations due to limit breaches. */ private final AtomicLong cancellationCount = new AtomicLong(); - /** * The number of times task cancellation limit was reached. */ private final AtomicLong limitReachedCount = new AtomicLong(); + private volatile double cancellationBurst; + private volatile double cancellationRate; + private volatile double cancellationRatio; + + SearchBackpressureState( + LongSupplier timeNanosSupplier, + double cancellationRateNanos, + double cancellationBurst, + double cancellationRatio + ) { + rateLimiter = new AtomicReference<>(new TokenBucket(timeNanosSupplier, cancellationRateNanos, cancellationBurst)); + ratioLimiter = new AtomicReference<>(new TokenBucket(this::getCompletionCount, cancellationRatio, cancellationBurst)); + this.timeNanosSupplier = timeNanosSupplier; + this.cancellationBurst = cancellationBurst; + } public long getCompletionCount() { return completionCount.get(); @@ -54,4 +74,31 @@ public long getLimitReachedCount() { long incrementLimitReachedCount() { return limitReachedCount.incrementAndGet(); } + + public TokenBucket getRateLimiter() { + return rateLimiter.get(); + } + + public TokenBucket getRatioLimiter() { + return ratioLimiter.get(); + } + + @Override + public void onRatioChanged(double ratio) { + this.cancellationRatio = ratio; + ratioLimiter.set(new TokenBucket(this::getCompletionCount, cancellationRatio, cancellationBurst)); + } + + @Override + public void onRateChanged(double rate) { + this.cancellationRate = rate; + rateLimiter.set(new TokenBucket(timeNanosSupplier, cancellationRate, cancellationBurst)); + } + + @Override + public void onBurstChanged(double burst) { + this.cancellationBurst = burst; + onRateChanged(cancellationRate); + onRatioChanged(cancellationRatio); + } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java index df2c04a730fbc..f06acb4c952a5 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -8,19 +8,13 @@ package org.opensearch.search.backpressure.settings; -import org.opensearch.ExceptionsHelper; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - /** - * Settings related to search backpressure and cancellation of in-flight requests. + * Settings related to search backpressure mode and internal * * @opensearch.internal */ @@ -59,13 +53,16 @@ private static class Defaults { /** * Defines the percentage of tasks to cancel relative to the number of successful task completions. * In other words, it is the number of tokens added to the bucket on each successful task completion. + * + * The setting below is deprecated. + * To keep backwards compatibility, the old usage is remained, and it's also used as the fallback for the new usage. */ - private volatile double cancellationRatio; public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( "search_backpressure.cancellation_ratio", Defaults.CANCELLATION_RATIO, 0.0, 1.0, + Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -73,68 +70,54 @@ private static class Defaults { /** * Defines the number of tasks to cancel per unit time (in millis). * In other words, it is the number of tokens added to the bucket each millisecond. + * + * The setting below is deprecated. + * To keep backwards compatibility, the old usage is remained, and it's also used as the fallback for the new usage. */ - private volatile double cancellationRate; public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( "search_backpressure.cancellation_rate", Defaults.CANCELLATION_RATE, 0.0, + Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope ); /** * Defines the maximum number of tasks that can be cancelled before being rate-limited. + * + * The setting below is deprecated. + * To keep backwards compatibility, the old usage is remained, and it's also used as the fallback for the new usage. */ - private volatile double cancellationBurst; public static final Setting SETTING_CANCELLATION_BURST = Setting.doubleSetting( "search_backpressure.cancellation_burst", Defaults.CANCELLATION_BURST, 1.0, + Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope ); - /** - * Callback listeners. - */ - public interface Listener { - void onCancellationRatioChanged(); - - void onCancellationRateChanged(); - - void onCancellationBurstChanged(); - } - - private final List listeners = new ArrayList<>(); private final Settings settings; private final ClusterSettings clusterSettings; private final NodeDuressSettings nodeDuressSettings; + private final SearchTaskSettings searchTaskSettings; private final SearchShardTaskSettings searchShardTaskSettings; public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) { this.settings = settings; this.clusterSettings = clusterSettings; this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); + this.searchTaskSettings = new SearchTaskSettings(settings, clusterSettings); this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings); interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings)); mode = SearchBackpressureMode.fromName(SETTING_MODE.get(settings)); clusterSettings.addSettingsUpdateConsumer(SETTING_MODE, s -> this.setMode(SearchBackpressureMode.fromName(s))); - - cancellationRatio = SETTING_CANCELLATION_RATIO.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, this::setCancellationRatio); - - cancellationRate = SETTING_CANCELLATION_RATE.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, this::setCancellationRate); - - cancellationBurst = SETTING_CANCELLATION_BURST.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); - } - - public void addListener(Listener listener) { - listeners.add(listener); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, searchShardTaskSettings::setCancellationRatio); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, searchShardTaskSettings::setCancellationRate); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, searchShardTaskSettings::setCancellationBurst); } public Settings getSettings() { @@ -149,6 +132,10 @@ public NodeDuressSettings getNodeDuressSettings() { return nodeDuressSettings; } + public SearchTaskSettings getSearchTaskSettings() { + return searchTaskSettings; + } + public SearchShardTaskSettings getSearchShardTaskSettings() { return searchShardTaskSettings; } @@ -164,49 +151,4 @@ public SearchBackpressureMode getMode() { public void setMode(SearchBackpressureMode mode) { this.mode = mode; } - - public double getCancellationRatio() { - return cancellationRatio; - } - - private void setCancellationRatio(double cancellationRatio) { - this.cancellationRatio = cancellationRatio; - notifyListeners(Listener::onCancellationRatioChanged); - } - - public double getCancellationRate() { - return cancellationRate; - } - - public double getCancellationRateNanos() { - return getCancellationRate() / TimeUnit.MILLISECONDS.toNanos(1); // rate per nanoseconds - } - - private void setCancellationRate(double cancellationRate) { - this.cancellationRate = cancellationRate; - notifyListeners(Listener::onCancellationRateChanged); - } - - public double getCancellationBurst() { - return cancellationBurst; - } - - private void setCancellationBurst(double cancellationBurst) { - this.cancellationBurst = cancellationBurst; - notifyListeners(Listener::onCancellationBurstChanged); - } - - private void notifyListeners(Consumer consumer) { - List exceptions = new ArrayList<>(); - - for (Listener listener : listeners) { - try { - consumer.accept(listener); - } catch (Exception e) { - exceptions.add(e); - } - } - - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); - } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java index 7e40f1c0eab53..105023a20173f 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java @@ -8,10 +8,16 @@ package org.opensearch.search.backpressure.settings; +import org.opensearch.ExceptionsHelper; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.search.backpressure.CancellationSettingsListener; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Defines the settings related to the cancellation of SearchShardTasks. @@ -19,12 +25,60 @@ * @opensearch.internal */ public class SearchShardTaskSettings { - private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); + private final List listeners = new ArrayList<>(); + private final ClusterSettings clusterSettings; private static class Defaults { + private static final double CANCELLATION_RATIO = 0.1; + private static final double CANCELLATION_RATE = 0.003; + private static final double CANCELLATION_BURST = 10.0; private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05; + private static final long CPU_TIME_MILLIS_THRESHOLD = 15000; + private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; + private static final double HEAP_PERCENT_THRESHOLD = 0.005; + private static final double HEAP_VARIANCE_THRESHOLD = 2.0; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; } + /** + * Defines the percentage of SearchShardTasks to cancel relative to the number of successful SearchShardTasks completions. + * In other words, it is the number of tokens added to the bucket on each successful SearchShardTask completion. + */ + private volatile double cancellationRatio; + public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( + "search_backpressure.search_shard_task.cancellation_ratio", + SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the number of SearchShardTasks to cancel per unit time (in millis). + * In other words, it is the number of tokens added to the bucket each millisecond. + */ + private volatile double cancellationRate; + public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( + "search_backpressure.search_shard_task.cancellation_rate", + SearchBackpressureSettings.SETTING_CANCELLATION_RATE, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the maximum number of SearchShardTasks that can be cancelled before being rate-limited. + */ + private volatile double cancellationBurst; + public static final Setting SETTING_CANCELLATION_BURST = Setting.doubleSetting( + "search_backpressure.search_shard_task.cancellation_burst", + SearchBackpressureSettings.SETTING_CANCELLATION_BURST, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search shard tasks * before in-flight cancellation is applied. @@ -39,20 +93,185 @@ private static class Defaults { Setting.Property.NodeScope ); + /** + * Defines the CPU usage threshold (in millis) for an individual search shard task before it is considered for cancellation. + */ + private volatile long cpuTimeMillisThreshold; + public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.cpu_time_millis_threshold", + Defaults.CPU_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the elapsed time threshold (in millis) for an individual search shard task before it is considered for cancellation. + */ + private volatile long elapsedTimeMillisThreshold; + public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.elapsed_time_millis_threshold", + Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage threshold (in percentage) for an individual search shard task before it is considered for cancellation. + */ + private volatile double heapPercentThreshold; + public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_percent_threshold", + Defaults.HEAP_PERCENT_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage variance for an individual search shard task before it is considered for cancellation. + * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. + */ + private volatile double heapVarianceThreshold; + public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_variance", + Defaults.HEAP_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the window size to calculate the moving average of heap usage of completed search shard tasks. + */ + private volatile int heapMovingAverageWindowSize; + public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "search_backpressure.search_shard_task.heap_moving_average_window_size", + Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public SearchShardTaskSettings(Settings settings, ClusterSettings clusterSettings) { totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.get(settings); + this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings); + this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings); + this.heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings); + this.heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings); + this.heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + this.cancellationRatio = SETTING_CANCELLATION_RATIO.get(settings); + this.cancellationRate = SETTING_CANCELLATION_RATE.get(settings); + this.cancellationBurst = SETTING_CANCELLATION_BURST.get(settings); + this.clusterSettings = clusterSettings; + clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, this::setTotalHeapPercentThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, this::setCancellationRatio); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, this::setCancellationRate); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); } public double getTotalHeapPercentThreshold() { return totalHeapPercentThreshold; } - public long getTotalHeapBytesThreshold() { - return (long) (HEAP_SIZE_BYTES * getTotalHeapPercentThreshold()); + public long getCpuTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold); + } + + public long getElapsedTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold); + } + + public double getHeapPercentThreshold() { + return heapPercentThreshold; + } + + public double getHeapVarianceThreshold() { + return heapVarianceThreshold; } - private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { + public int getHeapMovingAverageWindowSize() { + return heapMovingAverageWindowSize; + } + + public void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { this.totalHeapPercentThreshold = totalHeapPercentThreshold; } + + public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { + this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; + } + + public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { + this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; + } + + public void setHeapPercentThreshold(double heapPercentThreshold) { + this.heapPercentThreshold = heapPercentThreshold; + } + + public void setHeapVarianceThreshold(double heapVarianceThreshold) { + this.heapVarianceThreshold = heapVarianceThreshold; + } + + public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { + this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; + } + + public double getCancellationRatio() { + return cancellationRatio; + } + + void setCancellationRatio(double cancellationRatio) { + this.cancellationRatio = cancellationRatio; + notifyListeners(listener -> listener.onRatioChanged(cancellationRatio)); + } + + public double getCancellationRate() { + return cancellationRate; + } + + public double getCancellationRateNanos() { + return getCancellationRate() / TimeUnit.MILLISECONDS.toNanos(1); // rate per nanoseconds + } + + void setCancellationRate(double cancellationRate) { + this.cancellationRate = cancellationRate; + notifyListeners(listener -> listener.onRateChanged(cancellationRate)); + } + + public double getCancellationBurst() { + return cancellationBurst; + } + + void setCancellationBurst(double cancellationBurst) { + this.cancellationBurst = cancellationBurst; + notifyListeners(listener -> listener.onBurstChanged(cancellationBurst)); + } + + public void addListener(CancellationSettingsListener listener) { + listeners.add(listener); + } + + private void notifyListeners(Consumer consumer) { + List exceptions = new ArrayList<>(); + + for (CancellationSettingsListener listener : listeners) { + try { + consumer.accept(listener); + } catch (Exception e) { + exceptions.add(e); + } + } + + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); + } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java new file mode 100644 index 0000000000000..74f41f286de6c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchTaskSettings.java @@ -0,0 +1,282 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.backpressure.CancellationSettingsListener; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Defines the settings related to the cancellation of SearchTasks. + * + * @opensearch.internal + */ + +public class SearchTaskSettings { + private final List listeners = new ArrayList<>(); + private final ClusterSettings clusterSettings; + + private static class Defaults { + private static final double CANCELLATION_RATIO = 0.1; + private static final double CANCELLATION_RATE = 0.003; + private static final double CANCELLATION_BURST = 5.0; + private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05; + private static final long CPU_TIME_MILLIS_THRESHOLD = 30000; + private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 45000; + private static final double HEAP_PERCENT_THRESHOLD = 0.02; + private static final double HEAP_VARIANCE_THRESHOLD = 2.0; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; + } + + /** + * Defines the percentage of SearchTasks to cancel relative to the number of successful SearchTask completions. + * In other words, it is the number of tokens added to the bucket on each successful SearchTask completion. + */ + private volatile double cancellationRatio; + public static final Setting SETTING_CANCELLATION_RATIO = Setting.doubleSetting( + "search_backpressure.search_task.cancellation_ratio", + Defaults.CANCELLATION_RATIO, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the number of SearchTasks to cancel per unit time (in millis). + * In other words, it is the number of tokens added to the bucket each millisecond. + */ + private volatile double cancellationRate; + public static final Setting SETTING_CANCELLATION_RATE = Setting.doubleSetting( + "search_backpressure.search_task.cancellation_rate", + Defaults.CANCELLATION_RATE, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the maximum number of SearchTasks that can be cancelled before being rate-limited. + */ + private volatile double cancellationBurst; + public static final Setting SETTING_CANCELLATION_BURST = Setting.doubleSetting( + "search_backpressure.search_task.cancellation_burst", + Defaults.CANCELLATION_BURST, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search tasks + * before in-flight cancellation is applied. + */ + private volatile double totalHeapPercentThreshold; + public static final Setting SETTING_TOTAL_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_task.total_heap_percent_threshold", + Defaults.TOTAL_HEAP_PERCENT_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the CPU usage threshold (in millis) for an individual search task before it is considered for cancellation. + */ + private volatile long cpuTimeMillisThreshold; + public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_task.cpu_time_millis_threshold", + Defaults.CPU_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the elapsed time threshold (in millis) for an individual search task before it is considered for cancellation. + */ + private volatile long elapsedTimeMillisThreshold; + public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_task.elapsed_time_millis_threshold", + Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage threshold (in percentage) for an individual search task before it is considered for cancellation. + */ + private volatile double heapPercentThreshold; + public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_task.heap_percent_threshold", + Defaults.HEAP_PERCENT_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage variance for an individual search task before it is considered for cancellation. + * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. + */ + private volatile double heapVarianceThreshold; + public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_task.heap_variance", + Defaults.HEAP_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the window size to calculate the moving average of heap usage of completed search tasks. + */ + private volatile int heapMovingAverageWindowSize; + public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "search_backpressure.search_task.heap_moving_average_window_size", + Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public SearchTaskSettings(Settings settings, ClusterSettings clusterSettings) { + this.totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.get(settings); + this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings); + this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings); + this.heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings); + this.heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings); + this.heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + this.cancellationRatio = SETTING_CANCELLATION_RATIO.get(settings); + this.cancellationRate = SETTING_CANCELLATION_RATE.get(settings); + this.cancellationBurst = SETTING_CANCELLATION_BURST.get(settings); + this.clusterSettings = clusterSettings; + + clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, this::setTotalHeapPercentThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATIO, this::setCancellationRatio); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_RATE, this::setCancellationRate); + clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); + } + + /** + * Callback listeners. + */ + + public double getTotalHeapPercentThreshold() { + return totalHeapPercentThreshold; + } + + public long getCpuTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold); + } + + public long getElapsedTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold); + } + + public double getHeapPercentThreshold() { + return heapPercentThreshold; + } + + public double getHeapVarianceThreshold() { + return heapVarianceThreshold; + } + + public int getHeapMovingAverageWindowSize() { + return heapMovingAverageWindowSize; + } + + public void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { + this.totalHeapPercentThreshold = totalHeapPercentThreshold; + } + + public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { + this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; + } + + public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { + this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; + } + + public void setHeapPercentThreshold(double heapPercentThreshold) { + this.heapPercentThreshold = heapPercentThreshold; + } + + public void setHeapVarianceThreshold(double heapVarianceThreshold) { + this.heapVarianceThreshold = heapVarianceThreshold; + } + + public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { + this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; + } + + public double getCancellationRatio() { + return cancellationRatio; + } + + private void setCancellationRatio(double cancellationRatio) { + this.cancellationRatio = cancellationRatio; + notifyListeners(listener -> listener.onRatioChanged(cancellationRatio)); + } + + public double getCancellationRate() { + return cancellationRate; + } + + public double getCancellationRateNanos() { + return getCancellationRate() / TimeUnit.MILLISECONDS.toNanos(1); // rate per nanoseconds + } + + private void setCancellationRate(double cancellationRate) { + this.cancellationRate = cancellationRate; + notifyListeners(listener -> listener.onRateChanged(cancellationRate)); + } + + public double getCancellationBurst() { + return cancellationBurst; + } + + private void setCancellationBurst(double cancellationBurst) { + this.cancellationBurst = cancellationBurst; + notifyListeners(listener -> listener.onBurstChanged(cancellationBurst)); + } + + public void addListener(CancellationSettingsListener listener) { + listeners.add(listener); + } + + private void notifyListeners(Consumer consumer) { + List exceptions = new ArrayList<>(); + + for (CancellationSettingsListener listener : listeners) { + try { + consumer.accept(listener); + } catch (Exception e) { + exceptions.add(e); + } + } + + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java index 3aec0dfc579c5..8f8f41b583c42 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchBackpressureStats.java @@ -8,6 +8,8 @@ package org.opensearch.search.backpressure.stats; +import org.opensearch.Version; +import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; @@ -24,28 +26,47 @@ public class SearchBackpressureStats implements ToXContentFragment, Writeable { private final SearchShardTaskStats searchShardTaskStats; private final SearchBackpressureMode mode; + @Nullable + private final SearchTaskStats searchTaskStats; - public SearchBackpressureStats(SearchShardTaskStats searchShardTaskStats, SearchBackpressureMode mode) { + public SearchBackpressureStats( + SearchTaskStats searchTaskStats, + SearchShardTaskStats searchShardTaskStats, + SearchBackpressureMode mode + ) { this.searchShardTaskStats = searchShardTaskStats; this.mode = mode; + this.searchTaskStats = searchTaskStats; } public SearchBackpressureStats(StreamInput in) throws IOException { - this(new SearchShardTaskStats(in), SearchBackpressureMode.fromName(in.readString())); + searchShardTaskStats = new SearchShardTaskStats(in); + mode = SearchBackpressureMode.fromName(in.readString()); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + searchTaskStats = in.readOptionalWriteable(SearchTaskStats::new); + } else { + searchTaskStats = null; + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject("search_backpressure") - .field("search_shard_task", searchShardTaskStats) - .field("mode", mode.getName()) - .endObject(); + builder.startObject("search_backpressure"); + if (searchTaskStats != null) { + builder.field("search_task", searchTaskStats); + } + builder.field("search_shard_task", searchShardTaskStats); + builder.field("mode", mode.getName()); + return builder.endObject(); } @Override public void writeTo(StreamOutput out) throws IOException { searchShardTaskStats.writeTo(out); out.writeString(mode.getName()); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(searchTaskStats); + } } @Override @@ -53,11 +74,13 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SearchBackpressureStats that = (SearchBackpressureStats) o; - return searchShardTaskStats.equals(that.searchShardTaskStats) && mode == that.mode; + return mode == that.mode + && Objects.equals(searchTaskStats, that.searchTaskStats) + && Objects.equals(searchShardTaskStats, that.searchShardTaskStats); } @Override public int hashCode() { - return Objects.hash(searchShardTaskStats, mode); + return Objects.hash(searchTaskStats, searchShardTaskStats, mode); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java index 4d532cfb12f80..00eb21b7f3d57 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.search.backpressure.trackers.CpuUsageTracker; @@ -25,8 +26,9 @@ import java.util.Objects; /** - * Stats related to cancelled search shard tasks. + * Stats related to cancelled SearchShardTasks. */ + public class SearchShardTaskStats implements ToXContentObject, Writeable { private final long cancellationCount; private final long limitReachedCount; @@ -54,7 +56,7 @@ public SearchShardTaskStats(StreamInput in) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); builder.startObject("resource_tracker_stats"); diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java new file mode 100644 index 0000000000000..08b7f39f8aeff --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.stats; + +import org.opensearch.common.collect.MapBuilder; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Stats related to cancelled SearchTasks. + */ + +public class SearchTaskStats implements ToXContentObject, Writeable { + private final long cancellationCount; + private final long limitReachedCount; + private final Map resourceUsageTrackerStats; + + public SearchTaskStats( + long cancellationCount, + long limitReachedCount, + Map resourceUsageTrackerStats + ) { + this.cancellationCount = cancellationCount; + this.limitReachedCount = limitReachedCount; + this.resourceUsageTrackerStats = resourceUsageTrackerStats; + } + + public SearchTaskStats(StreamInput in) throws IOException { + this.cancellationCount = in.readVLong(); + this.limitReachedCount = in.readVLong(); + + MapBuilder builder = new MapBuilder<>(); + builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new)); + builder.put(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, in.readOptionalWriteable(HeapUsageTracker.Stats::new)); + builder.put(TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, in.readOptionalWriteable(ElapsedTimeTracker.Stats::new)); + this.resourceUsageTrackerStats = builder.immutableMap(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + builder.startObject("resource_tracker_stats"); + for (Map.Entry entry : resourceUsageTrackerStats.entrySet()) { + builder.field(entry.getKey().getName(), entry.getValue()); + } + builder.endObject(); + + builder.startObject("cancellation_stats") + .field("cancellation_count", cancellationCount) + .field("cancellation_limit_reached_count", limitReachedCount) + .endObject(); + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(cancellationCount); + out.writeVLong(limitReachedCount); + + out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER)); + out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER)); + out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SearchTaskStats that = (SearchTaskStats) o; + return cancellationCount == that.cancellationCount + && limitReachedCount == that.limitReachedCount + && resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats); + } + + @Override + public int hashCode() { + return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats); + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 21bb3af32ae08..fb4cd342de25b 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -8,11 +8,9 @@ package org.opensearch.search.backpressure.trackers; -import org.opensearch.common.settings.Setting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; -import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -22,6 +20,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER; @@ -31,25 +30,11 @@ * @opensearch.internal */ public class CpuUsageTracker extends TaskResourceUsageTracker { - private static class Defaults { - private static final long CPU_TIME_MILLIS_THRESHOLD = 15000; - } - /** - * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long cpuTimeMillisThreshold; - public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.cpu_time_millis_threshold", - Defaults.CPU_TIME_MILLIS_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public CpuUsageTracker(SearchBackpressureSettings settings) { - this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); - settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); + private final LongSupplier thresholdSupplier; + + public CpuUsageTracker(LongSupplier thresholdSupplier) { + this.thresholdSupplier = thresholdSupplier; } @Override @@ -60,7 +45,7 @@ public String name() { @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = task.getTotalResourceStats().getCpuTimeInNanos(); - long threshold = getCpuTimeNanosThreshold(); + long threshold = thresholdSupplier.getAsLong(); if (usage < threshold) { return Optional.empty(); @@ -78,14 +63,6 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } - public long getCpuTimeNanosThreshold() { - return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold); - } - - public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { - this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; - } - @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index 10e53e2bce5ae..1175d68fb8550 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -8,11 +8,9 @@ package org.opensearch.search.backpressure.trackers; -import org.opensearch.common.settings.Setting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; -import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -32,28 +30,12 @@ * @opensearch.internal */ public class ElapsedTimeTracker extends TaskResourceUsageTracker { - private static class Defaults { - private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; - } - - /** - * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long elapsedTimeMillisThreshold; - public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.elapsed_time_millis_threshold", - Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - + private final LongSupplier thresholdSupplier; private final LongSupplier timeNanosSupplier; - public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier timeNanosSupplier) { + public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) { + this.thresholdSupplier = thresholdSupplier; this.timeNanosSupplier = timeNanosSupplier; - this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); - settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); } @Override @@ -64,7 +46,7 @@ public String name() { @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); - long threshold = getElapsedTimeNanosThreshold(); + long threshold = thresholdSupplier.getAsLong(); if (usage < threshold) { return Optional.empty(); @@ -82,14 +64,6 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } - public long getElapsedTimeNanosThreshold() { - return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold); - } - - public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { - this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; - } - @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long now = timeNanosSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index d1a264609e522..d6a38c8797174 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -8,14 +8,17 @@ package org.opensearch.search.backpressure.trackers; -import org.opensearch.common.settings.Setting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.MovingAverage; -import org.opensearch.monitor.jvm.JvmStats; -import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -24,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.DoubleSupplier; import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER; @@ -34,66 +38,23 @@ * @opensearch.internal */ public class HeapUsageTracker extends TaskResourceUsageTracker { + private static final Logger logger = LogManager.getLogger(HeapUsageTracker.class); private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); - - private static class Defaults { - private static final double HEAP_PERCENT_THRESHOLD = 0.005; - private static final double HEAP_VARIANCE_THRESHOLD = 2.0; - private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; - } - - /** - * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. - */ - private volatile double heapPercentThreshold; - public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_percent_threshold", - Defaults.HEAP_PERCENT_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage variance for an individual task before it is considered for cancellation. - * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. - */ - private volatile double heapVarianceThreshold; - public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_variance", - Defaults.HEAP_VARIANCE_THRESHOLD, - 0.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the window size to calculate the moving average of heap usage of completed tasks. - */ - private volatile int heapMovingAverageWindowSize; - public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( - "search_backpressure.search_shard_task.heap_moving_average_window_size", - Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - + private final DoubleSupplier heapVarianceSupplier; + private final DoubleSupplier heapPercentThresholdSupplier; private final AtomicReference movingAverageReference; - public HeapUsageTracker(SearchBackpressureSettings settings) { - heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings.getSettings()); - settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); - - heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings.getSettings()); - settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); - - heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings.getSettings()); - settings.getClusterSettings() - .addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); - + public HeapUsageTracker( + DoubleSupplier heapVarianceSupplier, + DoubleSupplier heapPercentThresholdSupplier, + int heapMovingAverageWindowSize, + ClusterSettings clusterSettings, + Setting windowSizeSetting + ) { + this.heapVarianceSupplier = heapVarianceSupplier; + this.heapPercentThresholdSupplier = heapPercentThresholdSupplier; this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize)); + clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize); } @Override @@ -117,9 +78,11 @@ public Optional checkAndMaybeGetCancellationReason(Task double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); double averageUsage = movingAverage.getAverage(); - double allowedUsage = averageUsage * getHeapVarianceThreshold(); + double variance = heapVarianceSupplier.getAsDouble(); + double allowedUsage = averageUsage * variance; + double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES; - if (currentUsage < getHeapBytesThreshold() || currentUsage < allowedUsage) { + if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) { return Optional.empty(); } @@ -131,25 +94,26 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } - public long getHeapBytesThreshold() { - return (long) (HEAP_SIZE_BYTES * heapPercentThreshold); - } - - public void setHeapPercentThreshold(double heapPercentThreshold) { - this.heapPercentThreshold = heapPercentThreshold; + private void updateWindowSize(int heapMovingAverageWindowSize) { + this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); } - public double getHeapVarianceThreshold() { - return heapVarianceThreshold; + public static boolean isHeapTrackingSupported() { + return HEAP_SIZE_BYTES > 0; } - public void setHeapVarianceThreshold(double heapVarianceThreshold) { - this.heapVarianceThreshold = heapVarianceThreshold; - } + /** + * Returns true if the increase in heap usage is due to search requests. + */ + public static boolean isHeapUsageDominatedBySearch(List cancellableTasks, double heapPercentThreshold) { + long usage = cancellableTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); + long threshold = (long) heapPercentThreshold * HEAP_SIZE_BYTES; + if (isHeapTrackingSupported() && usage < threshold) { + logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold); + return false; + } - public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { - this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; - this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); + return true; } @Override diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java index cbbb751b996be..e54cfcd5d3970 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java @@ -52,7 +52,7 @@ public void update(Task task) {} public abstract Optional checkAndMaybeGetCancellationReason(Task task); /** - * Returns the tracker's state as seen in the stats API. + * Returns the tracker's state for tasks as seen in the stats API. */ public abstract Stats stats(List activeTasks); diff --git a/server/src/main/java/org/opensearch/tasks/SearchBackpressureTask.java b/server/src/main/java/org/opensearch/tasks/SearchBackpressureTask.java new file mode 100644 index 0000000000000..0cab67e35ab02 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/SearchBackpressureTask.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +/** + * A task related to search backpressure + * + * @opensearch.internal + */ +public interface SearchBackpressureTask {} diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java index d09312f38e3eb..b718bd2395cc5 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java @@ -9,6 +9,7 @@ package org.opensearch.tasks; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; import java.util.ArrayList; import java.util.List; @@ -54,7 +55,25 @@ public void cancel() { } task.cancel(getReasonString()); + runOnCancelCallbacks(); + } + /** + * Cancels the task and its descendants and invokes all onCancelCallbacks. + */ + public void cancelTaskAndDescendants(TaskManager taskManager) { + if (isEligibleForCancellation() == false) { + return; + } + + taskManager.cancelTaskAndDescendants(task, getReasonString(), false, ActionListener.wrap(() -> {})); + runOnCancelCallbacks(); + } + + /** + * invokes all onCancelCallbacks. + */ + private void runOnCancelCallbacks() { List exceptions = new ArrayList<>(); for (Runnable callback : onCancelCallbacks) { try { diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index 07a962c6824ca..3515f02ff13d7 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -8,7 +8,11 @@ package org.opensearch.search.backpressure; +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.settings.ClusterSettings; @@ -16,17 +20,23 @@ import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; +import org.opensearch.search.backpressure.stats.SearchShardTaskStats; +import org.opensearch.search.backpressure.stats.SearchTaskStats; import org.opensearch.search.backpressure.trackers.NodeDuressTracker; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; -import org.opensearch.search.backpressure.stats.SearchShardTaskStats; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; +import org.opensearch.tasks.TaskCancellationService; +import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -41,6 +51,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -51,10 +63,28 @@ import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; public class SearchBackpressureServiceTests extends OpenSearchTestCase { + MockTransportService transportService; + TaskManager taskManager; + ThreadPool threadPool; + + @Before + public void setup() { + threadPool = new TestThreadPool(getClass().getName()); + transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool); + transportService.start(); + transportService.acceptIncomingRequests(); + taskManager = transportService.getTaskManager(); + taskManager.setTaskCancellationService(new TaskCancellationService(transportService)); + } + + @After + public void cleanup() { + transportService.close(); + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } public void testIsNodeInDuress() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); - ThreadPool mockThreadPool = mock(ThreadPool.class); AtomicReference cpuUsage = new AtomicReference<>(); AtomicReference heapUsage = new AtomicReference<>(); @@ -69,10 +99,12 @@ public void testIsNodeInDuress() { SearchBackpressureService service = new SearchBackpressureService( settings, mockTaskResourceTrackingService, - mockThreadPool, + threadPool, System::nanoTime, List.of(cpuUsageTracker, heapUsageTracker), - Collections.emptyList() + Collections.emptyList(), + Collections.emptyList(), + taskManager ); // Node not in duress. @@ -95,9 +127,37 @@ public void testIsNodeInDuress() { assertFalse(service.isNodeInDuress()); } - public void testTrackerStateUpdateOnTaskCompletion() { + public void testTrackerStateUpdateOnSearchTaskCompletion() { + TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234); + TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class); + + SearchBackpressureSettings settings = new SearchBackpressureSettings( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + SearchBackpressureService service = new SearchBackpressureService( + settings, + mockTaskResourceTrackingService, + threadPool, + mockTimeNanosSupplier, + Collections.emptyList(), + List.of(mockTaskResourceUsageTracker), + Collections.emptyList(), + taskManager + ); + + for (int i = 0; i < 100; i++) { + // service.onTaskCompleted(new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>())); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200)); + } + assertEquals(100, service.getSearchBackpressureStats(SearchTask.class).getCompletionCount()); + verify(mockTaskResourceUsageTracker, times(100)).update(any()); + } + + public void testTrackerStateUpdateOnSearchShardTaskCompletion() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); - ThreadPool mockThreadPool = mock(ThreadPool.class); LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234); TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class); @@ -109,72 +169,116 @@ public void testTrackerStateUpdateOnTaskCompletion() { SearchBackpressureService service = new SearchBackpressureService( settings, mockTaskResourceTrackingService, - mockThreadPool, + threadPool, mockTimeNanosSupplier, Collections.emptyList(), - List.of(mockTaskResourceUsageTracker) + Collections.emptyList(), + List.of(mockTaskResourceUsageTracker), + taskManager ); - // Record task completions to update the tracker state. Tasks other than SearchShardTask are ignored. + // Record task completions to update the tracker state. Tasks other than SearchTask & SearchShardTask are ignored. service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200)); for (int i = 0; i < 100; i++) { service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200)); } - assertEquals(100, service.getState().getCompletionCount()); + assertEquals(100, service.getSearchBackpressureStats(SearchShardTask.class).getCompletionCount()); verify(mockTaskResourceUsageTracker, times(100)).update(any()); } - public void testInFlightCancellation() { + public void testSearchTaskInFlightCancellation() { + TaskManager mockTaskManager = spy(taskManager); TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); - ThreadPool mockThreadPool = mock(ThreadPool.class); AtomicLong mockTime = new AtomicLong(0); LongSupplier mockTimeNanosSupplier = mockTime::get; NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); - TaskResourceUsageTracker mockTaskResourceUsageTracker = new TaskResourceUsageTracker() { - @Override - public String name() { - return TaskResourceUsageTrackerType.CPU_USAGE_TRACKER.getName(); - } + TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(); - @Override - public void update(Task task) {} + // Mocking 'settings' with predictable rate limiting thresholds. + SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 5.0); - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { - return Optional.empty(); - } + SearchBackpressureService service = new SearchBackpressureService( + settings, + mockTaskResourceTrackingService, + threadPool, + mockTimeNanosSupplier, + List.of(mockNodeDuressTracker), + List.of(mockTaskResourceUsageTracker), + Collections.emptyList(), + mockTaskManager + ); - return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); - } + // Run two iterations so that node is marked 'in duress' from the third iteration onwards. + service.doRun(); + service.doRun(); - @Override - public Stats stats(List activeTasks) { - return new MockStats(getCancellations()); + // Mocking 'settings' with predictable totalHeapBytesThreshold so that cancellation logic doesn't get skipped. + long taskHeapUsageBytes = 500; + SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class); + // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService + when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0); + when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings); + + // Create a mix of low and high resource usage SearchTasks (50 low + 25 high resource usage tasks). + Map activeSearchTasks = new HashMap<>(); + for (long i = 0; i < 75; i++) { + if (i % 3 == 0) { + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes)); + } else { + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes)); } - }; + } + doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); - // Mocking 'settings' with predictable rate limiting thresholds. - SearchBackpressureSettings settings = spy( - new SearchBackpressureSettings( - Settings.builder() - .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.getKey(), 0.1) - .put(SearchBackpressureSettings.SETTING_CANCELLATION_RATE.getKey(), 0.003) - .put(SearchBackpressureSettings.SETTING_CANCELLATION_BURST.getKey(), 10.0) - .build(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ) + // There are 25 SearchTasks eligible for cancellation but only 5 will be cancelled (burst limit). + service.doRun(); + verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(1, service.getSearchBackpressureStats(SearchTask.class).getLimitReachedCount()); + + // If the clock or completed task count haven't made sufficient progress, we'll continue to be rate-limited. + service.doRun(); + verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(2, service.getSearchBackpressureStats(SearchTask.class).getLimitReachedCount()); + + // Fast-forward the clock by ten second to replenish some tokens. + // This will add 50 tokens (time delta * rate) to 'rateLimitPerTime' but it will cancel only 5 tasks (burst limit). + mockTime.addAndGet(TimeUnit.SECONDS.toNanos(10)); + service.doRun(); + verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(3, service.getSearchBackpressureStats(SearchTask.class).getLimitReachedCount()); + + // Verify search backpressure stats. + SearchBackpressureStats expectedStats = new SearchBackpressureStats( + new SearchTaskStats(10, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))), + new SearchShardTaskStats(0, 0, Collections.emptyMap()), + SearchBackpressureMode.ENFORCED ); + SearchBackpressureStats actualStats = service.nodeStats(); + assertEquals(expectedStats, actualStats); + } + + public void testSearchShardTaskInFlightCancellation() { + TaskManager mockTaskManager = spy(taskManager); + TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + AtomicLong mockTime = new AtomicLong(0); + LongSupplier mockTimeNanosSupplier = mockTime::get; + NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); + + TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(); + + // Mocking 'settings' with predictable rate limiting thresholds. + SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); SearchBackpressureService service = new SearchBackpressureService( settings, mockTaskResourceTrackingService, - mockThreadPool, + threadPool, mockTimeNanosSupplier, List.of(mockNodeDuressTracker), - List.of(mockTaskResourceUsageTracker) + Collections.emptyList(), + List.of(mockTaskResourceUsageTracker), + mockTaskManager ); // Run two iterations so that node is marked 'in duress' from the third iteration onwards. @@ -183,30 +287,31 @@ public Stats stats(List activeTasks) { // Mocking 'settings' with predictable totalHeapBytesThreshold so that cancellation logic doesn't get skipped. long taskHeapUsageBytes = 500; - SearchShardTaskSettings shardTaskSettings = mock(SearchShardTaskSettings.class); - when(shardTaskSettings.getTotalHeapBytesThreshold()).thenReturn(taskHeapUsageBytes); - when(settings.getSearchShardTaskSettings()).thenReturn(shardTaskSettings); + SearchShardTaskSettings searchShardTaskSettings = mock(SearchShardTaskSettings.class); + // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService + when(searchShardTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0); + when(settings.getSearchShardTaskSettings()).thenReturn(searchShardTaskSettings); // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). - Map activeTasks = new HashMap<>(); + Map activeSearchShardTasks = new HashMap<>(); for (long i = 0; i < 75; i++) { if (i % 5 == 0) { - activeTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes)); + activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes)); } else { - activeTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); + activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); } } - doReturn(activeTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); + doReturn(activeSearchShardTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); - // There are 15 tasks eligible for cancellation but only 10 will be cancelled (burst limit). + // There are 15 SearchShardTasks eligible for cancellation but only 10 will be cancelled (burst limit). service.doRun(); - assertEquals(10, service.getState().getCancellationCount()); - assertEquals(1, service.getState().getLimitReachedCount()); + verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(1, service.getSearchBackpressureStats(SearchShardTask.class).getLimitReachedCount()); // If the clock or completed task count haven't made sufficient progress, we'll continue to be rate-limited. service.doRun(); - assertEquals(10, service.getState().getCancellationCount()); - assertEquals(2, service.getState().getLimitReachedCount()); + verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(2, service.getSearchBackpressureStats(SearchShardTask.class).getLimitReachedCount()); // Simulate task completion to replenish some tokens. // This will add 2 tokens (task count delta * cancellationRatio) to 'rateLimitPerTaskCompletion'. @@ -214,25 +319,54 @@ public Stats stats(List activeTasks) { service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); } service.doRun(); - assertEquals(12, service.getState().getCancellationCount()); - assertEquals(3, service.getState().getLimitReachedCount()); - - // Fast-forward the clock by one second to replenish some tokens. - // This will add 3 tokens (time delta * rate) to 'rateLimitPerTime'. - mockTime.addAndGet(TimeUnit.SECONDS.toNanos(1)); - service.doRun(); - assertEquals(15, service.getState().getCancellationCount()); - assertEquals(3, service.getState().getLimitReachedCount()); // no more tasks to cancel; limit not reached + verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(3, service.getSearchBackpressureStats(SearchShardTask.class).getLimitReachedCount()); // Verify search backpressure stats. SearchBackpressureStats expectedStats = new SearchBackpressureStats( - new SearchShardTaskStats(15, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(15))), + new SearchTaskStats(0, 0, Collections.emptyMap()), + new SearchShardTaskStats(12, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))), SearchBackpressureMode.ENFORCED ); SearchBackpressureStats actualStats = service.nodeStats(); assertEquals(expectedStats, actualStats); } + private SearchBackpressureSettings getBackpressureSettings(String mode, double ratio, double rate, double burst) { + return spy( + new SearchBackpressureSettings( + Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), mode).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ) + ); + } + + private TaskResourceUsageTracker getMockedTaskResourceUsageTracker() { + return new TaskResourceUsageTracker() { + @Override + public String name() { + return TaskResourceUsageTrackerType.CPU_USAGE_TRACKER.getName(); + } + + @Override + public void update(Task task) {} + + @Override + public Optional checkAndMaybeGetCancellationReason(Task task) { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { + return Optional.empty(); + } + + return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + } + + @Override + public Stats stats(List tasks) { + return new MockStats(getCancellations()); + } + }; + } + private static class MockStats implements TaskResourceUsageTracker.Stats { private final long cancellationCount; diff --git a/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureRenamedSettingsTests.java b/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureRenamedSettingsTests.java new file mode 100644 index 0000000000000..a0eb05ecaa91d --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/settings/SearchBackpressureRenamedSettingsTests.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.Set; + +public class SearchBackpressureRenamedSettingsTests extends OpenSearchTestCase { + + /** + * Validate the both settings are known and supported. + */ + public void testOldSettingsExist() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "Both 'search_backpressure.search_shard_task.cancellation_ratio' and its predecessor should be supported built-in settings", + settings.containsAll( + Arrays.asList(SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, SearchShardTaskSettings.SETTING_CANCELLATION_RATIO) + ) + ); + + assertTrue( + "Both 'search_backpressure.search_shard_task.cancellation_rate' and its predecessor should be supported built-in settings", + settings.containsAll( + Arrays.asList(SearchBackpressureSettings.SETTING_CANCELLATION_RATE, SearchShardTaskSettings.SETTING_CANCELLATION_RATE) + ) + ); + + assertTrue( + "Both 'search_backpressure.search_shard_task.cancellation_burst' and its predecessor should be supported built-in settings", + settings.containsAll( + Arrays.asList(SearchBackpressureSettings.SETTING_CANCELLATION_BURST, SearchShardTaskSettings.SETTING_CANCELLATION_BURST) + ) + ); + } + + /** + * Validate the default value of the both settings is the same. + */ + public void testSettingFallback() { + assertEquals( + SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.get(Settings.EMPTY), + SearchShardTaskSettings.SETTING_CANCELLATION_RATIO.get(Settings.EMPTY) + ); + + assertEquals( + SearchBackpressureSettings.SETTING_CANCELLATION_RATE.get(Settings.EMPTY), + SearchShardTaskSettings.SETTING_CANCELLATION_RATE.get(Settings.EMPTY) + ); + + assertEquals( + SearchBackpressureSettings.SETTING_CANCELLATION_BURST.get(Settings.EMPTY), + SearchShardTaskSettings.SETTING_CANCELLATION_BURST.get(Settings.EMPTY) + ); + } + + /** + * Validate the new setting can be configured correctly, and it doesn't impact the old setting. + */ + public void testSettingGetValue() { + Settings settings = Settings.builder().put("search_backpressure.search_shard_task.cancellation_ratio", "0.5").build(); + assertEquals(Double.valueOf(0.5), SearchShardTaskSettings.SETTING_CANCELLATION_RATIO.get(settings)); + assertEquals( + SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.getDefault(Settings.EMPTY), + SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.get(settings) + ); + } + + /** + * Validate the value of the old setting will be applied to the new setting, if the new setting is not configured. + */ + public void testSettingGetValueWithFallback() { + Settings settings = Settings.builder().put("search_backpressure.cancellation_ratio", "0.3").build(); + assertEquals(Double.valueOf(0.3), SearchShardTaskSettings.SETTING_CANCELLATION_RATIO.get(settings)); + assertSettingDeprecationsAndWarnings(new Setting[] { SearchBackpressureSettings.SETTING_CANCELLATION_RATIO }); + } + + /** + * Validate the value of the old setting will be ignored, if the new setting is configured. + */ + public void testSettingGetValueWhenBothAreConfigured() { + Settings settings = Settings.builder() + .put("search_backpressure.search_shard_task.cancellation_ratio", "0.2") + .put("search_backpressure.cancellation_ratio", "0.4") + .build(); + assertEquals(Double.valueOf(0.2), SearchShardTaskSettings.SETTING_CANCELLATION_RATIO.get(settings)); + assertEquals(Double.valueOf(0.4), SearchBackpressureSettings.SETTING_CANCELLATION_RATIO.get(settings)); + assertSettingDeprecationsAndWarnings(new Setting[] { SearchBackpressureSettings.SETTING_CANCELLATION_RATIO }); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java index 2665a6d5e05aa..0c86cf4b11239 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchBackpressureStatsTests.java @@ -25,6 +25,7 @@ protected SearchBackpressureStats createTestInstance() { public static SearchBackpressureStats randomInstance() { return new SearchBackpressureStats( + SearchTaskStatsTests.randomInstance(), SearchShardTaskStatsTests.randomInstance(), randomFrom(SearchBackpressureMode.DISABLED, SearchBackpressureMode.MONITOR_ONLY, SearchBackpressureMode.ENFORCED) ); diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java new file mode 100644 index 0000000000000..07cec723efb17 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.stats; + +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.test.AbstractWireSerializingTestCase; + +import java.util.Map; + +public class SearchTaskStatsTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return SearchTaskStats::new; + } + + @Override + protected SearchTaskStats createTestInstance() { + return randomInstance(); + } + + public static SearchTaskStats randomInstance() { + Map resourceUsageTrackerStats = Map.of( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + new CpuUsageTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + new HeapUsageTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), + TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, + new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) + ); + + return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java index c790fb2e60eea..8cdcbc7511bd2 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -9,9 +9,12 @@ package org.opensearch.search.backpressure.trackers; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; @@ -23,14 +26,25 @@ public class CpuUsageTrackerTests extends OpenSearchTestCase { private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( Settings.builder() - .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15) // 15 ms + .put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15) // 15 ms + .put(SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 25) // 25 ms .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - public void testEligibleForCancellation() { + public void testSearchTaskEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchTaskSettings()::getCpuTimeNanosThreshold); + + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(1, reason.get().getCancellationScore()); + assertEquals("cpu usage exceeded [100ms >= 25ms]", reason.get().getMessage()); + } + + public void testSearchShardTaskEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); - CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); @@ -40,7 +54,7 @@ public void testEligibleForCancellation() { public void testNotEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); - CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java index 67ed6059a1914..921d01e7355a7 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -9,9 +9,12 @@ package org.opensearch.search.backpressure.trackers; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; @@ -24,14 +27,31 @@ public class ElapsedTimeTrackerTests extends OpenSearchTestCase { private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( Settings.builder() - .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 100) // 100 ms + .put(SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 100) // 100 ms + .put(SearchTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 150) // 150 ms .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - public void testEligibleForCancellation() { + public void testSearchTaskEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 1, 0); + ElapsedTimeTracker tracker = new ElapsedTimeTracker( + mockSettings.getSearchTaskSettings()::getElapsedTimeNanosThreshold, + () -> 150000000 + ); + + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(1, reason.get().getCancellationScore()); + assertEquals("elapsed time exceeded [150ms >= 150ms]", reason.get().getMessage()); + } + + public void testSearchShardTaskEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); - ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); + ElapsedTimeTracker tracker = new ElapsedTimeTracker( + mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, + () -> 200000000 + ); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); @@ -41,7 +61,10 @@ public void testEligibleForCancellation() { public void testNotEligibleForCancellation() { Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); - ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); + ElapsedTimeTracker tracker = new ElapsedTimeTracker( + mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, + () -> 200000000 + ); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java index b9967da22fbf1..2acb23641667a 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -9,9 +9,12 @@ package org.opensearch.search.backpressure.trackers; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchTask; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; @@ -23,20 +26,65 @@ import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; public class HeapUsageTrackerTests extends OpenSearchTestCase { - private static final long HEAP_BYTES_THRESHOLD = 100; + private static final long HEAP_BYTES_THRESHOLD_SEARCH_SHARD_TASK = 100; + private static final long HEAP_BYTES_THRESHOLD_SEARCH_TASK = 50; private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( Settings.builder() - .put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 2.0) - .put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), HEAP_MOVING_AVERAGE_WINDOW_SIZE) + .put(SearchTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 3.0) + .put(SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 2.0) + .put(SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), HEAP_MOVING_AVERAGE_WINDOW_SIZE) + .put(SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), HEAP_MOVING_AVERAGE_WINDOW_SIZE) .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - public void testEligibleForCancellation() { - HeapUsageTracker tracker = spy(new HeapUsageTracker(mockSettings)); - when(tracker.getHeapBytesThreshold()).thenReturn(HEAP_BYTES_THRESHOLD); + public void testSearchTaskEligibleForCancellation() { + SearchTaskSettings mockSearchTaskSettings = spy( + new SearchTaskSettings(mockSettings.getSettings(), mockSettings.getClusterSettings()) + ); + // setting the heap percent threshold to minimum + when(mockSearchTaskSettings.getHeapPercentThreshold()).thenReturn(0.0); + HeapUsageTracker tracker = spy( + new HeapUsageTracker( + mockSearchTaskSettings::getHeapVarianceThreshold, + mockSearchTaskSettings::getHeapPercentThreshold, + mockSearchTaskSettings.getHeapMovingAverageWindowSize(), + mockSettings.getClusterSettings(), + SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE + ) + ); + Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50); + + // Record enough observations to make the moving average 'ready'. + for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { + tracker.update(task); + } + + // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). + task = createMockTaskWithResourceStats(SearchTask.class, 1, 300); + Optional reason = tracker.checkAndMaybeGetCancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(6, reason.get().getCancellationScore()); + assertEquals("heap usage exceeded [300b >= 150b]", reason.get().getMessage()); + } + + public void testSearchShardTaskEligibleForCancellation() { + SearchShardTaskSettings mockSearchShardTaskSettings = spy( + new SearchShardTaskSettings(mockSettings.getSettings(), mockSettings.getClusterSettings()) + ); + // setting the heap percent threshold to minimum + when(mockSearchShardTaskSettings.getHeapPercentThreshold()).thenReturn(0.0); + HeapUsageTracker tracker = spy( + new HeapUsageTracker( + mockSearchShardTaskSettings::getHeapVarianceThreshold, + mockSearchShardTaskSettings::getHeapPercentThreshold, + mockSearchShardTaskSettings.getHeapMovingAverageWindowSize(), + mockSettings.getClusterSettings(), + SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE + ) + ); Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50); // Record enough observations to make the moving average 'ready'. @@ -55,8 +103,20 @@ public void testEligibleForCancellation() { public void testNotEligibleForCancellation() { Task task; Optional reason; - HeapUsageTracker tracker = spy(new HeapUsageTracker(mockSettings)); - when(tracker.getHeapBytesThreshold()).thenReturn(HEAP_BYTES_THRESHOLD); + SearchShardTaskSettings mockSearchShardTaskSettings = spy( + new SearchShardTaskSettings(mockSettings.getSettings(), mockSettings.getClusterSettings()) + ); + // setting the heap percent threshold to minimum + when(mockSearchShardTaskSettings.getHeapPercentThreshold()).thenReturn(0.0); + HeapUsageTracker tracker = spy( + new HeapUsageTracker( + mockSearchShardTaskSettings::getHeapVarianceThreshold, + mockSearchShardTaskSettings::getHeapPercentThreshold, + mockSearchShardTaskSettings.getHeapMovingAverageWindowSize(), + mockSettings.getClusterSettings(), + SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE + ) + ); // Task with heap usage < heapBytesThreshold. task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index e74f89c905499..bb577edd6667d 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -69,7 +69,7 @@ public Optional checkAndMaybeGetCancellationReason(Task } @Override - public Stats stats(List activeTasks) { + public Stats stats(List searchShardTasks) { return null; } };