Skip to content

Commit

Permalink
Restore job finalisation action.
Browse files Browse the repository at this point in the history
Simplify AutoDetectResultsProcessor by using the restored action
  • Loading branch information
davidkyle committed Nov 29, 2018
1 parent 6b3fc84 commit 948ee32
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand All @@ -47,6 +48,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

Expand Down Expand Up @@ -422,7 +426,10 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo
}, request.getCloseTimeout(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
listener.onResponse(response);
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -18,15 +22,31 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;

import java.util.Collections;
import java.util.Date;
import java.util.Map;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {

private final Client client;

@Inject
public TransportFinalizeJobExecutionAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new);
this.client = client;
}

@Override
Expand All @@ -42,12 +62,36 @@ protected AcknowledgedResponse newResponse() {
@Override
protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
// This action is no longer required but needs to be preserved
// in case it is called by an old node in a mixed cluster
listener.onResponse(new AcknowledgedResponse(true));
String jobIdString = String.join(",", request.getJobIds());
logger.debug("finalizing jobs [{}]", jobIdString);

ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor(
MachineLearning.UTILITY_THREAD_POOL_NAME), true);

Map<String, Object> update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date());

for (String jobId: request.getJobIds()) {
UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
updateRequest.retryOnConflict(3);
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

chainTaskExecutor.add(chainedListener -> {
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap(
updateResponse -> chainedListener.onResponse(null),
chainedListener::onFailure
));
});
}

// TODO restore functionality for index jobs
chainTaskExecutor.execute(ActionListener.wrap(
aVoid -> {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
},
listener::onFailure
));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
Expand All @@ -36,20 +32,15 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -88,7 +79,6 @@ public class AutoDetectResultProcessor {

final CountDownLatch completionLatch = new CountDownLatch(1);
final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
volatile CountDownLatch onCloseActionsLatch;
private final FlushListener flushListener;
private volatile boolean processKilled;
private volatile boolean failed;
Expand Down Expand Up @@ -149,18 +139,8 @@ public void process(AutodetectProcess process) {
} catch (Exception e) {
LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
}
if (processKilled == false) {
try {
onAutodetectClose();
} catch (Exception e) {
if (onCloseActionsLatch != null) {
onCloseActionsLatch.countDown();
}
throw e;
}
}

LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount);

} catch (Exception e) {
failed = true;

Expand Down Expand Up @@ -313,6 +293,9 @@ private void notifyModelMemoryStatusChange(Context context, ModelSizeStats model
}

protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);

try {
// This blocks the main processing thread in the unlikely event
// there are 2 model snapshots queued up. But it also has the
Expand All @@ -324,52 +307,20 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
return;
}

Map<String, Object> update = new HashMap<>();
update.put(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId());
update.put(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshot.getMinVersion().toString());

updateJob(jobId, update, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
updateModelSnapshotSemaphore.release();
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
}

@Override
public void onFailure(Exception e) {
updateModelSnapshotSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" +
modelSnapshot.getSnapshotId() + "]", e);
}
});
}

private void onAutodetectClose() {
onCloseActionsLatch = new CountDownLatch(1);

ActionListener<UpdateResponse> updateListener = ActionListener.wrap(
updateResponse -> {
onCloseActionsLatch.countDown();
},
e -> {
LOGGER.error("[" + jobId + "] Failed to finalize job on autodetect close", e);
onCloseActionsLatch.countDown();
}
);

updateJob(jobId, Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()),
new ThreadedActionListener<>(LOGGER, client.threadPool(),
MachineLearning.UTILITY_THREAD_POOL_NAME, updateListener, false)
);
}
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
updateModelSnapshotSemaphore.release();
LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
}

private void updateJob(String jobId, Map<String, Object> update, ActionListener<UpdateResponse> listener) {
UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
updateRequest.retryOnConflict(3);
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, listener);
@Override
public void onFailure(Exception e) {
updateModelSnapshotSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" +
modelSnapshot.getSnapshotId() + "]", e);
}
});
}

public void awaitCompletion() throws TimeoutException {
Expand All @@ -381,13 +332,6 @@ public void awaitCompletion() throws TimeoutException {
throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId);
}

// Once completionLatch has passed then onCloseActionsLatch must either
// be set or null, it will not be set later.
if (onCloseActionsLatch != null && onCloseActionsLatch.await(
MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES) == false) {
throw new TimeoutException("Timed out waiting for results processor run post close actions " + jobId);
}

// Input stream has been completely processed at this point.
// Wait for any updateModelSnapshotOnJob calls to complete.
updateModelSnapshotSemaphore.acquire();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.junit.Before;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TransportFinalizeJobExecutionActionTests extends ESTestCase {

private ThreadPool threadPool;
private Client client;

@Before
@SuppressWarnings("unchecked")
private void setupMocks() {
ExecutorService executorService = mock(ExecutorService.class);
threadPool = mock(ThreadPool.class);
org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService);

client = mock(Client.class);
doAnswer( invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(null);
return null;
}).when(client).execute(eq(UpdateAction.INSTANCE), any(), any());

when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
}

public void testOperation() {
ClusterService clusterService = mock(ClusterService.class);
TransportFinalizeJobExecutionAction action = createAction(clusterService);

ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")).build();

FinalizeJobExecutionAction.Request request = new FinalizeJobExecutionAction.Request(new String[]{"job1", "job2"});
AtomicReference<AcknowledgedResponse> ack = new AtomicReference<>();
action.masterOperation(request, clusterState, ActionListener.wrap(
ack::set,
e -> assertNull(e.getMessage())
));

assertTrue(ack.get().isAcknowledged());
verify(client, times(2)).execute(eq(UpdateAction.INSTANCE), any(), any());
verify(clusterService, never()).submitStateUpdateTask(any(), any());
}

private TransportFinalizeJobExecutionAction createAction(ClusterService clusterService) {
return new TransportFinalizeJobExecutionAction(mock(TransportService.class), clusterService,
threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), client);

}
}
Loading

0 comments on commit 948ee32

Please sign in to comment.