Skip to content

Commit

Permalink
[ML] Delete job document (elastic#34595)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 29, 2018
1 parent 4e3d565 commit cef9f30
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.core.ml.MlMetadata;

/**
* Methods for handling index naming related functions
*/
Expand Down Expand Up @@ -40,15 +37,6 @@ public static String resultsWriteAlias(String jobId) {
return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + ".write-" + jobId;
}

/**
* Retrieves the currently defined physical index from the job state
* @param jobId Job Id
* @return The index name
*/
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName();
}

/**
* The name of the default index where a job's state is stored
* @return The index name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Nullable;
Expand All @@ -52,30 +49,36 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand All @@ -89,6 +92,8 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private final PersistentTasksService persistentTasksService;
private final Auditor auditor;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;

/**
* A map of task listeners by job_id.
Expand All @@ -102,13 +107,16 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
Client client, Auditor auditor, JobResultsProvider jobResultsProvider) {
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider) {
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteJobAction.Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
this.auditor = auditor;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.listenersByJobId = new HashMap<>();
}

Expand Down Expand Up @@ -137,6 +145,10 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
ActionListener<AcknowledgedResponse> listener) {
logger.debug("Deleting job '{}'", request.getJobId());

if (request.isForce() == false) {
checkJobIsNotOpen(request.getJobId(), state);
}

TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);

Expand Down Expand Up @@ -175,7 +187,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
finalListener.onFailure(e);
});

markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener);
}

private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) {
Expand Down Expand Up @@ -211,33 +223,15 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ
}
};

// Step 3. When the physical storage has been deleted, remove from Cluster State
// Step 3. When the physical storage has been deleted, delete the job config document
// -------
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> clusterService.submitStateUpdateTask(
"delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(apiResponseHandler, listener::onFailure)) {

@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged && response;
}

@Override
public ClusterState execute(ClusterState currentState) {
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
if (currentMlMetadata.getJobs().containsKey(jobId) == false) {
// We wouldn't have got here if the job never existed so
// the Job must have been deleted by another action.
// Don't error in this case
return currentState;
}

MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return buildNewClusterState(currentState, builder);
}
});

// Don't report an error if the document has already been deleted
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
ActionListener.wrap(
deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
listener::onFailure
)
);

// Step 2. Remove the job from any calendars
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
Expand All @@ -251,26 +245,26 @@ public ClusterState execute(ClusterState currentState) {
private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId,
CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {

final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId);
final String indexPattern = indexName + "-*";
AtomicReference<String> indexName = new AtomicReference<>();

final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
response -> finishedHandler.accept(response.isAcknowledged()),
failureHandler);

// Step 7. If we did not drop the index and after DBQ state done, we delete the aliases
// Step 8. If we did not drop the index and after DBQ state done, we delete the aliases
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
bulkByScrollResponse -> {
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
completionHandler.onResponse(new AcknowledgedResponse(true));
} else {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName.get(),
indexName.get() + "-*");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].",
jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
indexName, indexPattern);
indexName.get(), indexName.get() + "-*");
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
logger.warn("DBQ failure: " + failure);
}
Expand All @@ -280,12 +274,13 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
},
failureHandler);

// Step 6. If we did not delete the index, we run a delete by query
// Step 7. If we did not delete the index, we run a delete by query
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
response -> {
if (response) {
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern);
String indexPattern = indexName.get() + "-*";
logger.info("Running DBQ on [" + indexName.get() + "," + indexPattern + "] for job [" + jobId + "]");
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName.get(), indexPattern);
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
request.setQuery(query);
Expand All @@ -301,15 +296,15 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
},
failureHandler);

// Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it
// Step 6. If we have any hits, that means we are NOT the only job on this index, and should not delete it
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
searchResponse -> {
if (searchResponse == null || searchResponse.getHits().totalHits > 0) {
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
} else {
logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]");
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
logger.info("Running DELETE Index on [" + indexName.get() + "] for job [" + jobId + "]");
DeleteIndexRequest request = new DeleteIndexRequest(indexName.get());
request.indicesOptions(IndicesOptions.lenientExpandOpen());
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
executeAsyncWithOrigin(
Expand All @@ -331,9 +326,11 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
}
);

// Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
// Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
builder -> {
Job job = builder.build();
indexName.set(job.getResultsIndexName());
if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
//don't bother searching the index any further, we are on the default shared
Expand All @@ -344,14 +341,22 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
.query(QueryBuilders.boolQuery().filter(
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));

SearchRequest searchRequest = new SearchRequest(indexName);
SearchRequest searchRequest = new SearchRequest(indexName.get());
searchRequest.source(source);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
}
},
failureHandler
);

// Step 4. Get the job as the result index name is required
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
jobConfigProvider.getJob(jobId, getJobHandler);
},
failureHandler
);

// Step 3. Delete quantiles done, delete the categorizer state
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler),
Expand Down Expand Up @@ -554,36 +559,28 @@ public void onFailure(Exception e) {
}
}

private void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.markJobAsDeleting(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("Job [" + jobId + "] is successfully marked as deleted");
listener.onResponse(true);
}
});
private void checkJobIsNotOpen(String jobId, ClusterState state) {
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
+ ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState()));
}
}

static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
}
private void markJobAsDeletingIfNotUsed(String jobId, ActionListener<Boolean> listener) {

private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
return newState.build();
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty() == false) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
+ datafeedIds.iterator().next() + "] refers to it"));
return;
}
jobConfigProvider.markJobAsDeleting(jobId, listener);
},
listener::onFailure
));
}
}
Loading

0 comments on commit cef9f30

Please sign in to comment.