Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Offline Nodes] Adds new library for offline tasks #15247

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- Add path prefix support to hashed prefix snapshots ([#15664](https://github.com/opensearch-project/OpenSearch/pull/15664))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))


### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
- Bump `protobuf` from 3.22.3 to 3.25.4 ([#15684](https://github.com/opensearch-project/OpenSearch/pull/15684))
Expand Down
25 changes: 25 additions & 0 deletions libs/task-commons/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

dependencies {
api project(':libs:opensearch-common')

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-task-commons'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.TaskStatus;
import org.opensearch.task.commons.task.TaskType;
import org.opensearch.task.commons.worker.WorkerNode;

/**
* Request object for listing tasks
*/
public class TaskListRequest {

/**
* Filters listTasks response by specific task status'
*/
private TaskStatus[] taskStatus;

/**
* Filter listTasks response by specific task types
*/
private TaskType[] taskTypes;

/**
* Filter listTasks response by specific worker node
*/
private WorkerNode workerNodes;

/**
* Depicts the start page number for the list call.
*
* @see TaskManagerClient#listTasks(TaskListRequest)
*/
private int startPageNumber;

/**
* Depicts the page size for the list call.
*
* @see TaskManagerClient#listTasks(TaskListRequest)
*/
private int pageSize;

/**
* Default constructor
*/
public TaskListRequest() {}

Check warning on line 52 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L52

Added line #L52 was not covered by tests

/**
* Update task types to filter with in the request
* @param taskTypes TaskType[]
* @return ListTaskRequest
*/
public TaskListRequest taskType(TaskType... taskTypes) {
this.taskTypes = taskTypes;
return this;

Check warning on line 61 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L60-L61

Added lines #L60 - L61 were not covered by tests
}

/**
* Update task status to filter with in the request
* @param taskStatus TaskStatus[]
* @return ListTaskRequest
*/
public TaskListRequest taskType(TaskStatus... taskStatus) {
this.taskStatus = taskStatus;
return this;

Check warning on line 71 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L70-L71

Added lines #L70 - L71 were not covered by tests
}

/**
* Update worker node to filter with in the request
* @param workerNode WorkerNode
* @return ListTaskRequest
*/
private TaskListRequest workerNode(WorkerNode workerNode) {
this.workerNodes = workerNode;
return this;

Check warning on line 81 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L80-L81

Added lines #L80 - L81 were not covered by tests
}

/**
* Update page number to start with when fetching the list of tasks
* @param startPageNumber startPageNumber
* @return ListTaskRequest
*/
public TaskListRequest startPageNumber(int startPageNumber) {
this.startPageNumber = startPageNumber;
return this;

Check warning on line 91 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L90-L91

Added lines #L90 - L91 were not covered by tests
}

/**
* Update page size for the list tasks response
* @param pageSize int
* @return ListTaskRequest
*/
public TaskListRequest pageSize(int pageSize) {
this.pageSize = pageSize;
return this;

Check warning on line 101 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.task.commons.task.Task;
import org.opensearch.task.commons.task.TaskId;
import org.opensearch.task.commons.worker.WorkerNode;

import java.util.List;

/**
* Client used to interact with Task Store/Queue.
*
* TODO: TaskManager can be something not running an opensearch process.
* We need to come up with a way to allow this interface to be used with in and out opensearch as well
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface TaskManagerClient {

/**
* Get task from TaskStore/Queue
*
* @param taskId TaskId of the task to be retrieved
* @return Task corresponding to TaskId
*/
Task getTask(TaskId taskId);

/**
* Update task in TaskStore/Queue
*
* @param task Task to be updated
*/
void updateTask(Task task);

/**
* Mark task as cancelled.
* Ongoing Tasks can be cancelled as well if the corresponding worker supports cancellation
*
* @param taskId TaskId of the task to be cancelled
*/
void cancelTask(TaskId taskId);

/**
* List all tasks applying all the filters present in listTaskRequest
*
* @param taskListRequest TaskListRequest
* @return list of all the task matching the filters in listTaskRequest
*/
List<Task> listTasks(TaskListRequest taskListRequest);

/**
* Assign Task to a particular WorkerNode. This ensures no 2 worker Nodes work on the same task.
* This API can be used in both pull and push models of task assignment.
*
* @param taskId TaskId of the task to be assigned
* @param node WorkerNode task is being assigned to
* @return true if task is assigned successfully, false otherwise
*/
boolean assignTask(TaskId taskId, WorkerNode node);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.Task;

/**
* Producer interface used to submit new tasks for execution on worker nodes.
*/
public interface TaskProducerClient {

/**
* Submit a new task to TaskStore/Queue
*
* @param task Task to be submitted for execution on offline nodes
*/
void submitTask(Task task);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.Task;
import org.opensearch.task.commons.task.TaskId;

import java.util.List;

/**
* Consumer interface used to find new tasks assigned to a {@code WorkerNode} for execution.
*/
public interface TaskWorkerClient {

/**
* List all tasks assigned to a WorkerNode.
* Useful when the implementation uses a separate store for Task assignments to Worker nodes
*
* @param taskListRequest TaskListRequest
* @return list of all tasks assigned to a WorkerNode
*/
List<Task> getAssignedTasks(TaskListRequest taskListRequest);

/**
* Sends task heart beat to Task Store/Queue
*
* @param taskId TaskId of Task to send heartbeat for
* @param timestamp timestamp of heartbeat to be recorded in TaskStore/Queue
*/
void sendTaskHeartbeat(TaskId taskId, long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains task client related classes
*/
package org.opensearch.task.commons.clients;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains offline tasks related classes
*/
package org.opensearch.task.commons;
Loading
Loading