Skip to content

Commit

Permalink
[ML] JIndex: Restore finalize job action (#35939)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Nov 30, 2018
1 parent d72ad3b commit f048c52
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public final class Messages {
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";

public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";

public static final String INCONSISTENT_ID =
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand All @@ -47,6 +48,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

Expand Down Expand Up @@ -422,7 +426,10 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo
}, request.getCloseTimeout(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
listener.onResponse(response);
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

Expand Down Expand Up @@ -58,7 +59,7 @@ protected void doExecute(Task task, DeleteFilterAction.Request request, ActionLi
List<String> currentlyUsedBy = findJobsUsingFilter(jobs, filterId);
if (!currentlyUsedBy.isEmpty()) {
listener.onFailure(ExceptionsHelper.conflictStatusException(
"Cannot delete filter, currently used by jobs: " + currentlyUsedBy));
Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, currentlyUsedBy)));
} else {
deleteFilter(filterId, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -18,15 +22,31 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;

import java.util.Collections;
import java.util.Date;
import java.util.Map;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {

private final Client client;

@Inject
public TransportFinalizeJobExecutionAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new);
this.client = client;
}

@Override
Expand All @@ -42,9 +62,36 @@ protected AcknowledgedResponse newResponse() {
@Override
protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
// This action is no longer required but needs to be preserved
// in case it is called by an old node in a mixed cluster
listener.onResponse(new AcknowledgedResponse(true));
String jobIdString = String.join(",", request.getJobIds());
logger.debug("finalizing jobs [{}]", jobIdString);

ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor(
MachineLearning.UTILITY_THREAD_POOL_NAME), true);

Map<String, Object> update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date());

for (String jobId: request.getJobIds()) {
UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
updateRequest.retryOnConflict(3);
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

chainTaskExecutor.add(chainedListener -> {
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap(
updateResponse -> chainedListener.onResponse(null),
chainedListener::onFailure
));
});
}

chainTaskExecutor.execute(ActionListener.wrap(
aVoid -> {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
},
listener::onFailure
));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public TransportGetJobsStatsAction(TransportService transportService,

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {
logger.debug("Get stats for job [{}]", request.getJobId());

jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
expandedIds -> {
Expand Down Expand Up @@ -105,7 +106,6 @@ protected QueryPage<GetJobsStatsAction.Response.JobStats> readTaskResponse(Strea
protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<QueryPage<GetJobsStatsAction.Response.JobStats>> listener) {
String jobId = task.getJobId();
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
Expand All @@ -36,20 +32,15 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -88,7 +79,6 @@ public class AutoDetectResultProcessor {

final CountDownLatch completionLatch = new CountDownLatch(1);
final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
volatile CountDownLatch onCloseActionsLatch;
private final FlushListener flushListener;
private volatile boolean processKilled;
private volatile boolean failed;
Expand Down Expand Up @@ -149,18 +139,8 @@ public void process(AutodetectProcess process) {
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
}
if (processKilled == false) {
try {
onAutodetectClose();
} catch (Exception e) {
if (onCloseActionsLatch != null) {
onCloseActionsLatch.countDown();
}
throw e;
}
}

LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);

} catch (Exception e) {
failed = true;

Expand Down Expand Up @@ -313,6 +293,9 @@ private void notifyModelMemoryStatusChange(Context context, ModelSizeStats model
}

protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);

try {
// This blocks the main processing thread in the unlikely event
// there are 2 model snapshots queued up. But it also has the
Expand All @@ -324,52 +307,20 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
return;
}

Map<String, Object> update = new HashMap<>();
update.put(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId());
update.put(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshot.getMinVersion().toString());

updateJob(jobId, update, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
updateModelSnapshotSemaphore.release();
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
}

@Override
public void onFailure(Exception e) {
updateModelSnapshotSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" +
modelSnapshot.getSnapshotId() + "]", e);
}
});
}

private void onAutodetectClose() {
onCloseActionsLatch = new CountDownLatch(1);

ActionListener<UpdateResponse> updateListener = ActionListener.wrap(
updateResponse -> {
onCloseActionsLatch.countDown();
},
e -> {
LOGGER.error("[" + jobId + "] Failed to finalize job on autodetect close", e);
onCloseActionsLatch.countDown();
}
);

updateJob(jobId, Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()),
new ThreadedActionListener<>(LOGGER, client.threadPool(),
MachineLearning.UTILITY_THREAD_POOL_NAME, updateListener, false)
);
}
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
updateModelSnapshotSemaphore.release();
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
}

private void updateJob(String jobId, Map<String, Object> update, ActionListener<UpdateResponse> listener) {
UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
updateRequest.retryOnConflict(3);
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, listener);
@Override
public void onFailure(Exception e) {
updateModelSnapshotSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" +
modelSnapshot.getSnapshotId() + "]", e);
}
});
}

public void awaitCompletion() throws TimeoutException {
Expand All @@ -381,13 +332,6 @@ public void awaitCompletion() throws TimeoutException {
throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId);
}

// Once completionLatch has passed then onCloseActionsLatch must either
// be set or null, it will not be set later.
if (onCloseActionsLatch != null && onCloseActionsLatch.await(
MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES) == false) {
throw new TimeoutException("Timed out waiting for results processor run post close actions " + jobId);
}

// Input stream has been completely processed at this point.
// Wait for any updateModelSnapshotOnJob calls to complete.
updateModelSnapshotSemaphore.acquire();
Expand Down
Loading

0 comments on commit f048c52

Please sign in to comment.