Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Allow asynchronous job deletion #34058

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) {
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params(request);
params.putParam("force", Boolean.toString(deleteJobRequest.isForce()));
if (deleteJobRequest.getForce() != null) {
params.putParam("force", Boolean.toString(deleteJobRequest.getForce()));
}
if (deleteJobRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion()));
}

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
Expand Down Expand Up @@ -207,14 +208,15 @@ public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options,
*
* @param request The request to delete the job
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return action acknowledgement
* @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for
* completion
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::deleteJob,
options,
AcknowledgedResponse::fromXContent,
DeleteJobResponse::fromXContent,
Collections.emptySet());
}

Expand All @@ -228,11 +230,11 @@ public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions o
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<DeleteJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::deleteJob,
options,
AcknowledgedResponse::fromXContent,
DeleteJobResponse::fromXContent,
listener,
Collections.emptySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
public class DeleteJobRequest extends ActionRequest {

private String jobId;
private boolean force;
private Boolean force;
private Boolean waitForCompletion;

public DeleteJobRequest(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
Expand All @@ -47,7 +48,7 @@ public void setJobId(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
}

public boolean isForce() {
public Boolean getForce() {
return force;
}

Expand All @@ -57,10 +58,24 @@ public boolean isForce() {
*
* @param force When {@code true} forcefully delete an opened job. Defaults to {@code false}
*/
public void setForce(boolean force) {
public void setForce(Boolean force) {
this.force = force;
}

public Boolean getWaitForCompletion() {
return waitForCompletion;
}

/**
* Set whether this request should wait until the operation has completed before returning
* @param waitForCompletion When {@code true} the call will wait for the job deletion to complete.
* Otherwise, the deletion will be executed asynchronously and the response
* will contain the task id.
*/
public void setWaitForCompletion(Boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Objects;

/**
* Response object that contains the acknowledgement or the task id
* depending on whether the delete job action was requested to wait for completion.
*/
public class DeleteJobResponse extends ActionResponse implements ToXContentObject {

private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
private static final ParseField TASK = new ParseField("task");

public static final ConstructingObjectParser<DeleteJobResponse, Void> PARSER = new ConstructingObjectParser<>("delete_job_response",
true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1]));

static {
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
}

public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private final Boolean acknowledged;
private final TaskId task;

DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) {
assert acknowledged != null || task != null;
this.acknowledged = acknowledged;
this.task = task;
}

/**
* Get the action acknowledgement
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or
* otherwise a {@code boolean} that indicates whether the job was deleted successfully.
*/
public Boolean getAcknowledged() {
return acknowledged;
}

/**
* Get the task id
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or
* otherwise the id of the job deletion task.
*/
public TaskId getTask() {
return task;
}

@Override
public int hashCode() {
return Objects.hash(acknowledged, task);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

DeleteJobResponse that = (DeleteJobResponse) other;
return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (acknowledged != null) {
builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged);
}
if (task != null) {
builder.field(TASK.getPreferredName(), task.toString());
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class Job implements ToXContentObject {
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days");
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETING = new ParseField("deleting");

public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);

Expand Down Expand Up @@ -99,6 +100,7 @@ public class Job implements ToXContentObject {
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
PARSER.declareBoolean(Builder::setDeleting, DELETING);
}

private final String jobId;
Expand All @@ -121,13 +123,14 @@ public class Job implements ToXContentObject {
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final String resultsIndexName;
private final Boolean deleting;

private Job(String jobId, String jobType, List<String> groups, String description, Date createTime,
Date finishedTime, Date lastDataTime, Long establishedModelMemory,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName) {
String modelSnapshotId, String resultsIndexName, Boolean deleting) {

this.jobId = jobId;
this.jobType = jobType;
Expand All @@ -148,6 +151,7 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
this.modelSnapshotId = modelSnapshotId;
this.resultsIndexName = resultsIndexName;
this.deleting = deleting;
}

/**
Expand Down Expand Up @@ -292,6 +296,10 @@ public String getModelSnapshotId() {
return modelSnapshotId;
}

public Boolean getDeleting() {
return deleting;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -351,6 +359,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (resultsIndexName != null) {
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
}
if (deleting != null) {
builder.field(DELETING.getPreferredName(), deleting);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -384,15 +395,16 @@ public boolean equals(Object other) {
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName);
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
}

@Override
public int hashCode() {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName);
modelSnapshotId, resultsIndexName, deleting);
}

@Override
Expand Down Expand Up @@ -425,6 +437,7 @@ public static class Builder {
private Map<String, Object> customSettings;
private String modelSnapshotId;
private String resultsIndexName;
private Boolean deleting;

private Builder() {
}
Expand Down Expand Up @@ -453,6 +466,7 @@ public Builder(Job job) {
this.customSettings = job.getCustomSettings();
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleting = job.getDeleting();
}

public Builder setId(String id) {
Expand Down Expand Up @@ -559,6 +573,11 @@ public Builder setResultsIndexName(String resultsIndexName) {
return this;
}

Builder setDeleting(Boolean deleting) {
this.deleting = deleting;
return this;
}

/**
* Builds a job.
*
Expand All @@ -571,7 +590,7 @@ public Job build() {
id, jobType, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName);
modelSnapshotId, resultsIndexName, deleting);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,18 @@ public void testDeleteJob() {
Request request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint());
assertEquals(Boolean.toString(false), request.getParameters().get("force"));
assertNull(request.getParameters().get("force"));
assertNull(request.getParameters().get("wait_for_completion"));

deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setForce(true);
request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(Boolean.toString(true), request.getParameters().get("force"));

deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setWaitForCompletion(false);
request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion"));
}

public void testFlushJob() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
Expand Down Expand Up @@ -142,17 +143,33 @@ public void testGetJob() throws Exception {
assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2));
}

public void testDeleteJob() throws Exception {
public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

AcknowledgedResponse response = execute(new DeleteJobRequest(jobId),
DeleteJobResponse response = execute(new DeleteJobRequest(jobId),
machineLearningClient::deleteJob,
machineLearningClient::deleteJobAsync);

assertTrue(response.isAcknowledged());
assertTrue(response.getAcknowledged());
assertNull(response.getTask());
}

public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setWaitForCompletion(false);

DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);

assertNull(response.getAcknowledged());
assertNotNull(response.getTask());
}

public void testOpenJob() throws Exception {
Expand Down
Loading