Skip to content

Commit

Permalink
Add Settings related to Workload Management feature (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#15028)

Signed-off-by: Ruirui Zhang <[email protected]>
* add QeryGroup Service tests
Signed-off-by: Ruirui Zhang <[email protected]>

* add PR to changelog
Signed-off-by: Ruirui Zhang <[email protected]>

* change the test directory
Signed-off-by: Ruirui Zhang <[email protected]>

* modify comments to be more specific
Signed-off-by: Ruirui Zhang <[email protected]>

* add test coverage
Signed-off-by: Ruirui Zhang <[email protected]>

* remove QUERY_GROUP_RUN_INTERVAL_SETTING as we'll define it in QueryGroupService
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>
(cherry picked from commit 01184de)
  • Loading branch information
ruai0511 committed Aug 21, 2024
1 parent 7040df2 commit 5c0ada5
Show file tree
Hide file tree
Showing 4 changed files with 506 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028))
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import org.opensearch.transport.SniffConnectionStrategy;
import org.opensearch.transport.TransportSettings;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.WorkloadManagementSettings;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -756,7 +757,13 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

// WorkloadManagement settings
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Main class to declare Workload Management related settings
*/
public class WorkloadManagementSettings {
private static final Double DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = 0.8;
private static final Double DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = 0.9;
private static final Double DEFAULT_NODE_LEVEL_CPU_REJECTION_THRESHOLD = 0.8;
private static final Double DEFAULT_NODE_LEVEL_CPU_CANCELLATION_THRESHOLD = 0.9;
public static final double NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95;
public static final double NODE_LEVEL_MEMORY_REJECTION_THRESHOLD_MAX_VALUE = 0.9;
public static final double NODE_LEVEL_CPU_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95;
public static final double NODE_LEVEL_CPU_REJECTION_THRESHOLD_MAX_VALUE = 0.9;

private Double nodeLevelMemoryCancellationThreshold;
private Double nodeLevelMemoryRejectionThreshold;
private Double nodeLevelCpuCancellationThreshold;
private Double nodeLevelCpuRejectionThreshold;

/**
* Setting name for node level memory based rejection threshold for QueryGroup service
*/
public static final String NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.memory_rejection_threshold";
/**
* Setting to control the memory based rejection threshold
*/
public static final Setting<Double> NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = Setting.doubleSetting(
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Setting name for node level cpu based rejection threshold for QueryGroup service
*/
public static final String NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.cpu_rejection_threshold";
/**
* Setting to control the cpu based rejection threshold
*/
public static final Setting<Double> NODE_LEVEL_CPU_REJECTION_THRESHOLD = Setting.doubleSetting(
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_CPU_REJECTION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Setting name for node level memory based cancellation threshold for QueryGroup service
*/
public static final String NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.memory_cancellation_threshold";
/**
* Setting to control the memory based cancellation threshold
*/
public static final Setting<Double> NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = Setting.doubleSetting(
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* Setting name for node level cpu based cancellation threshold for QueryGroup service
*/
public static final String NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME = "wlm.query_group.node.cpu_cancellation_threshold";
/**
* Setting to control the cpu based cancellation threshold
*/
public static final Setting<Double> NODE_LEVEL_CPU_CANCELLATION_THRESHOLD = Setting.doubleSetting(
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME,
DEFAULT_NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* QueryGroup service settings constructor
* @param settings - QueryGroup service settings
* @param clusterSettings - QueryGroup cluster settings
*/
public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSettings) {
nodeLevelMemoryCancellationThreshold = NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD.get(settings);
nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings);
nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings);
nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings);

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
nodeLevelMemoryCancellationThreshold,
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME
);
ensureRejectionThresholdIsLessThanCancellation(
nodeLevelCpuRejectionThreshold,
nodeLevelCpuCancellationThreshold,
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME
);

clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD, this::setNodeLevelMemoryCancellationThreshold);
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_MEMORY_REJECTION_THRESHOLD, this::setNodeLevelMemoryRejectionThreshold);
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_CANCELLATION_THRESHOLD, this::setNodeLevelCpuCancellationThreshold);
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold);
}

/**
* Method to get the node level memory based cancellation threshold
* @return current node level memory based cancellation threshold
*/
public Double getNodeLevelMemoryCancellationThreshold() {
return nodeLevelMemoryCancellationThreshold;
}

/**
* Method to set the node level memory based cancellation threshold
* @param nodeLevelMemoryCancellationThreshold sets the new node level memory based cancellation threshold
* @throws IllegalArgumentException if the value is &gt; 0.95 and cancellation &lt; rejection threshold
*/
public void setNodeLevelMemoryCancellationThreshold(Double nodeLevelMemoryCancellationThreshold) {
if (Double.compare(nodeLevelMemoryCancellationThreshold, NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.95 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
nodeLevelMemoryCancellationThreshold,
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelMemoryCancellationThreshold = nodeLevelMemoryCancellationThreshold;
}

/**
* Method to get the node level cpu based cancellation threshold
* @return current node level cpu based cancellation threshold
*/
public Double getNodeLevelCpuCancellationThreshold() {
return nodeLevelCpuCancellationThreshold;
}

/**
* Method to set the node level cpu based cancellation threshold
* @param nodeLevelCpuCancellationThreshold sets the new node level cpu based cancellation threshold
* @throws IllegalArgumentException if the value is &gt; 0.95 and cancellation &lt; rejection threshold
*/
public void setNodeLevelCpuCancellationThreshold(Double nodeLevelCpuCancellationThreshold) {
if (Double.compare(nodeLevelCpuCancellationThreshold, NODE_LEVEL_CPU_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.95 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelCpuRejectionThreshold,
nodeLevelCpuCancellationThreshold,
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelCpuCancellationThreshold = nodeLevelCpuCancellationThreshold;
}

/**
* Method to get the memory based node level rejection threshold
* @return the current memory based node level rejection threshold
*/
public Double getNodeLevelMemoryRejectionThreshold() {
return nodeLevelMemoryRejectionThreshold;
}

/**
* Method to set the node level memory based rejection threshold
* @param nodeLevelMemoryRejectionThreshold sets the new memory based rejection threshold
* @throws IllegalArgumentException if rejection &gt; 0.90 and rejection &lt; cancellation threshold
*/
public void setNodeLevelMemoryRejectionThreshold(Double nodeLevelMemoryRejectionThreshold) {
if (Double.compare(nodeLevelMemoryRejectionThreshold, NODE_LEVEL_MEMORY_REJECTION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.90 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
nodeLevelMemoryCancellationThreshold,
NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME,
NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelMemoryRejectionThreshold = nodeLevelMemoryRejectionThreshold;
}

/**
* Method to get the cpu based node level rejection threshold
* @return the current cpu based node level rejection threshold
*/
public Double getNodeLevelCpuRejectionThreshold() {
return nodeLevelCpuRejectionThreshold;
}

/**
* Method to set the node level cpu based rejection threshold
* @param nodeLevelCpuRejectionThreshold sets the new cpu based rejection threshold
* @throws IllegalArgumentException if rejection &gt; 0.90 and rejection &lt; cancellation threshold
*/
public void setNodeLevelCpuRejectionThreshold(Double nodeLevelCpuRejectionThreshold) {
if (Double.compare(nodeLevelCpuRejectionThreshold, NODE_LEVEL_CPU_REJECTION_THRESHOLD_MAX_VALUE) > 0) {
throw new IllegalArgumentException(
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME + " value cannot be greater than 0.90 as it can result in a node drop"
);
}

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelCpuRejectionThreshold,
nodeLevelCpuCancellationThreshold,
NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME,
NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME
);

this.nodeLevelCpuRejectionThreshold = nodeLevelCpuRejectionThreshold;
}

/**
* Method to validate that the cancellation threshold is greater than or equal to rejection threshold
* @param nodeLevelRejectionThreshold rejection threshold to be compared
* @param nodeLevelCancellationThreshold cancellation threshold to be compared
* @param rejectionThresholdSettingName name of the rejection threshold setting
* @param cancellationThresholdSettingName name of the cancellation threshold setting
* @throws IllegalArgumentException if cancellation threshold is less than rejection threshold
*/
private void ensureRejectionThresholdIsLessThanCancellation(
Double nodeLevelRejectionThreshold,
Double nodeLevelCancellationThreshold,
String rejectionThresholdSettingName,
String cancellationThresholdSettingName
) {
if (Double.compare(nodeLevelCancellationThreshold, nodeLevelRejectionThreshold) < 0) {
throw new IllegalArgumentException(
cancellationThresholdSettingName + " value should not be less than " + rejectionThresholdSettingName
);
}
}
}
Loading

0 comments on commit 5c0ada5

Please sign in to comment.