Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Bug/sbp cancellation #14502

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix bug in SBP cancellation logic ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13474))
- Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379))
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
Expand Down
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/search/ResourceType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

/**
* Enum to hold the resource type
*/
public enum ResourceType {
CPU("cpu"),
JVM("jvm");

private final String name;

ResourceType(String name) {
this.name = name;
}

/**
* The string match here is case-sensitive
* @param s name matching the resource type name
* @return a {@link ResourceType}
*/
public static ResourceType fromName(String s) {
for (ResourceType resourceType : values()) {
if (resourceType.getName().equals(s)) {
return resourceType;

Check warning on line 32 in server/src/main/java/org/opensearch/search/ResourceType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/ResourceType.java#L32

Added line #L32 was not covered by tests
}
}
throw new IllegalArgumentException("Unknown resource type: [" + s + "]");

Check warning on line 35 in server/src/main/java/org/opensearch/search/ResourceType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/ResourceType.java#L35

Added line #L35 was not covered by tests
}

private String getName() {
return name;

Check warning on line 39 in server/src/main/java/org/opensearch/search/ResourceType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/ResourceType.java#L39

Added line #L39 was not covered by tests
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;

import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;

import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

Expand All @@ -34,35 +35,37 @@ public class CpuUsageTracker extends TaskResourceUsageTracker {
private final LongSupplier thresholdSupplier;

public CpuUsageTracker(LongSupplier thresholdSupplier) {
this(thresholdSupplier, (task) -> {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"cpu usage exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
});
}

public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) {
this.thresholdSupplier = thresholdSupplier;
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
}

@Override
public String name() {
return CPU_USAGE_TRACKER.getName();
}

@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"cpu usage exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
}

@Override
public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

Expand All @@ -34,36 +35,42 @@ public class ElapsedTimeTracker extends TaskResourceUsageTracker {
private final LongSupplier timeNanosSupplier;

public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) {
this(thresholdSupplier, timeNanosSupplier, (Task task) -> {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"elapsed time exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
});
}

public ElapsedTimeTracker(
LongSupplier thresholdSupplier,
LongSupplier timeNanosSupplier,
ResourceUsageBreachEvaluator resourceUsageBreachEvaluator
) {
this.thresholdSupplier = thresholdSupplier;
this.timeNanosSupplier = timeNanosSupplier;
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
}

@Override
public String name() {
return ELAPSED_TIME_TRACKER.getName();
}

@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"elapsed time exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
}

@Override
public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
long now = timeNanosSupplier.getAsLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
Expand Down Expand Up @@ -55,6 +56,43 @@ public HeapUsageTracker(
this.heapPercentThresholdSupplier = heapPercentThresholdSupplier;
this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize));
clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize);
setDefaultResourceUsageBreachEvaluator();
}

/**
* Had to refactor this method out of the constructor as we can't pass a lambda which references a member variable in constructor
* error: cannot reference movingAverageReference before supertype constructor has been called
*/
private void setDefaultResourceUsageBreachEvaluator() {
this.resourceUsageBreachEvaluator = (task) -> {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
if (movingAverage.isReady() == false) {
return Optional.empty();
}

double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
double averageUsage = movingAverage.getAverage();
double variance = heapVarianceSupplier.getAsDouble();
double allowedUsage = averageUsage * variance;
double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;

if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"heap usage exceeded ["
+ new ByteSizeValue((long) currentUsage)
+ " >= "
+ new ByteSizeValue((long) allowedUsage)
+ "]",
(int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight
)
);
};
}

@Override
Expand All @@ -67,33 +105,6 @@ public void update(Task task) {
movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes());
}

@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
if (movingAverage.isReady() == false) {
return Optional.empty();
}

double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
double averageUsage = movingAverage.getAverage();
double variance = heapVarianceSupplier.getAsDouble();
double allowedUsage = averageUsage * variance;
double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;

if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]",
(int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight
)
);
}

private void updateWindowSize(int heapMovingAverageWindowSize) {
this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize));
}
Expand Down

This file was deleted.

Loading
Loading