Skip to content

Commit

Permalink
add better cancellation reason
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 committed Aug 7, 2024
1 parent 3c77d47 commit 8dd0cfa
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.wlm.cancellation;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.search.ResourceType;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -45,9 +46,14 @@ public abstract class AbstractTaskSelectionStrategy implements TaskSelectionStra
* @throws IllegalArgumentException If the limit is less than zero
*/
@Override
public List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType) {
public List<TaskCancellation> selectTasksForCancellation(
QueryGroup querygroup,
List<Task> tasks,
long limit,
ResourceType resourceType
) {
if (limit < 0) {
throw new IllegalArgumentException("reduceBy has to be greater than zero");
throw new IllegalArgumentException("limit has to be greater than zero");
}
if (limit == 0) {
return Collections.emptyList();
Expand All @@ -60,7 +66,8 @@ public List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long

for (Task task : sortedTasks) {
if (task instanceof CancellableTask) {
selectedTasks.add(createTaskCancellation((CancellableTask) task));
String cancellationReason = createCancellationReason(querygroup, resourceType);
selectedTasks.add(createTaskCancellation((CancellableTask) task, cancellationReason));
accumulated += resourceType.getResourceUsage(task);
if (accumulated >= limit) {
break;
Expand All @@ -70,12 +77,25 @@ public List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long
return selectedTasks;
}

private TaskCancellation createTaskCancellation(CancellableTask task) {
// TODO add correct reason and callbacks
return new TaskCancellation(task, List.of(new TaskCancellation.Reason("limits exceeded", 5)), List.of(this::callbackOnCancel));
private String createCancellationReason(QueryGroup querygroup, ResourceType resourceType) {
Double thresholdInPercent = getThresholdInPercent(querygroup, resourceType);
return "[Workload Management] QueryGroup ID : "
+ querygroup.get_id()
+ " breached the resource limit of : "
+ thresholdInPercent
+ " for resource type : "
+ resourceType.getName();
}

private Double getThresholdInPercent(QueryGroup querygroup, ResourceType resourceType) {
return ((Double) (querygroup.getResourceLimits().get(resourceType))) * 100;
}

private TaskCancellation createTaskCancellation(CancellableTask task, String cancellationReason) {
return new TaskCancellation(task, List.of(new TaskCancellation.Reason(cancellationReason, 5)), List.of(this::callbackOnCancel));
}

private void callbackOnCancel() {
// todo Implement callback logic here mostly used for Stats
// TODO Implement callback logic here mostly used for Stats
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private boolean shouldCancelTasks(QueryGroup queryGroup, ResourceType resourceTy

private List<TaskCancellation> getTaskCancellations(QueryGroup queryGroup, ResourceType resourceType) {
return taskSelectionStrategy.selectTasksForCancellation(
queryGroup,
// get the active tasks in the query group
queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getActiveTasks(),
getReduceBy(queryGroup, resourceType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.wlm.cancellation;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
Expand All @@ -28,5 +29,5 @@ public interface TaskSelectionStrategy {
*
* @return List of tasks that should be cancelled.
*/
List<TaskCancellation> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType);
List<TaskCancellation> selectTasksForCancellation(QueryGroup queryGroup, List<Task> tasks, long limit, ResourceType resourceType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchTask;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.core.tasks.resourcetracker.ResourceStatsType;
Expand All @@ -23,6 +24,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

public class TaskSelectionStrategyTests extends OpenSearchTestCase {

Expand All @@ -35,39 +37,79 @@ public Comparator<Task> sortingCondition() {

public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsGreaterThanZero() {
TaskSelectionStrategy testTaskSelectionStrategy = new TestTaskSelectionStrategy();
long threshold = 100L;
long thresholdInLong = 100L;
Double threshold = 0.1;
long reduceBy = 50L;
ResourceType resourceType = ResourceType.MEMORY;
List<Task> tasks = getListOfTasks(threshold);
List<Task> tasks = getListOfTasks(thresholdInLong);

QueryGroup queryGroup = new QueryGroup(
"testQueryGroup",
"queryGroupId1",
QueryGroup.ResiliencyMode.ENFORCED,
Map.of(resourceType, threshold),
1L
);

List<TaskCancellation> selectedTasks = testTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType);
List<TaskCancellation> selectedTasks = testTaskSelectionStrategy.selectTasksForCancellation(
queryGroup,
tasks,
reduceBy,
resourceType
);
assertFalse(selectedTasks.isEmpty());
assertEquals(
"[Workload Management] QueryGroup ID : queryGroupId1 breached the resource limit of : 10.0 for resource type : memory",
selectedTasks.get(0).getReasonString()
);
assertEquals(5, selectedTasks.get(0).getReasons().get(0).getCancellationScore());
assertTrue(tasksUsageMeetsThreshold(selectedTasks, reduceBy));
}

public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsLesserThanZero() {
TaskSelectionStrategy testTaskSelectionStrategy = new TestTaskSelectionStrategy();
long threshold = 100L;
long thresholdInLong = 100L;
Double threshold = 0.1;
long reduceBy = -50L;
ResourceType resourceType = ResourceType.MEMORY;
List<Task> tasks = getListOfTasks(threshold);
List<Task> tasks = getListOfTasks(thresholdInLong);
QueryGroup queryGroup = new QueryGroup(
"testQueryGroup",
"queryGroupId1",
QueryGroup.ResiliencyMode.ENFORCED,
Map.of(resourceType, threshold),
1L
);

try {
testTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType);
testTaskSelectionStrategy.selectTasksForCancellation(queryGroup, tasks, reduceBy, resourceType);
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
assertEquals("reduceBy has to be greater than zero", e.getMessage());
assertEquals("limit has to be greater than zero", e.getMessage());
}
}

public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsEqualToZero() {
TaskSelectionStrategy testTaskSelectionStrategy = new TestTaskSelectionStrategy();
long threshold = 100L;
long thresholdInLong = 100L;
Double threshold = 0.1;
long reduceBy = 0;
ResourceType resourceType = ResourceType.MEMORY;
List<Task> tasks = getListOfTasks(threshold);
List<Task> tasks = getListOfTasks(thresholdInLong);
QueryGroup queryGroup = new QueryGroup(
"testQueryGroup",
"queryGroupId1",
QueryGroup.ResiliencyMode.ENFORCED,
Map.of(resourceType, threshold),
1L
);

List<TaskCancellation> selectedTasks = testTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType);
List<TaskCancellation> selectedTasks = testTaskSelectionStrategy.selectTasksForCancellation(
queryGroup,
tasks,
reduceBy,
resourceType
);
assertTrue(selectedTasks.isEmpty());
}

Expand Down

0 comments on commit 8dd0cfa

Please sign in to comment.