Skip to content

Commit

Permalink
Move get lifecycle API to Management thread pool and make cancellable (
Browse files Browse the repository at this point in the history
…elastic#97248)

* Move get lifecycle API to Management thread pool and make cancellable

This commit moves the `TransportGetLifecycleAction` to execute on the `Management` thread pool, and to be cancellable so that wildcard requests looping through all or a subset of policies checks for cancellation while running.

Resolves elastic#96568
  • Loading branch information
dakrone committed Jun 30, 2023
1 parent 75cb6fe commit 869ad83
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/97248.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 97248
summary: Move get lifecycle API to Management thread pool and make cancellable
area: ILM+SLM
type: enhancement
issues:
- 96568
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetLifecycleAction extends ActionType<GetLifecycleAction.Response> {
Expand Down Expand Up @@ -113,6 +117,11 @@ public Request() {
policyNames = Strings.EMPTY_ARRAY;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "get-lifecycle-task", parentTaskId, headers);
}

public String[] getPolicyNames() {
return policyNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ilm.action.GetLifecycleAction;

Expand All @@ -37,6 +38,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
getLifecycleRequest.timeout(restRequest.paramAsTime("timeout", getLifecycleRequest.timeout()));
getLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", getLifecycleRequest.masterNodeTimeout()));

return channel -> client.execute(GetLifecycleAction.INSTANCE, getLifecycleRequest, new RestToXContentListener<>(channel));
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
GetLifecycleAction.INSTANCE,
getLifecycleRequest,
new RestToXContentListener<>(channel)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -55,12 +56,18 @@ public TransportGetLifecycleAction(
Request::new,
indexNameExpressionResolver,
Response::new,
ThreadPool.Names.SAME
ThreadPool.Names.MANAGEMENT
);
}

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
assert task instanceof CancellableTask : "get lifecycle requests should be cancellable";
final CancellableTask cancellableTask = (CancellableTask) task;
if (cancellableTask.notifyIfCancelled(listener)) {
return;
}

IndexLifecycleMetadata metadata = clusterService.state().metadata().custom(IndexLifecycleMetadata.TYPE);
if (metadata == null) {
if (request.getPolicyNames().length == 0) {
Expand Down Expand Up @@ -88,6 +95,9 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
for (String name : names) {
if (Regex.isSimpleMatchPattern(name)) {
for (Map.Entry<String, LifecyclePolicyMetadata> entry : metadata.getPolicyMetadatas().entrySet()) {
if (cancellableTask.notifyIfCancelled(listener)) {
return;
}
LifecyclePolicyMetadata policyMetadata = entry.getValue();
if (Regex.simpleMatch(name, entry.getKey())) {
policyResponseItemMap.put(
Expand Down

0 comments on commit 869ad83

Please sign in to comment.