Skip to content

Commit

Permalink
[ML] Job in Index: Stop and preview datafeed (elastic#34605)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 29, 2018
1 parent f8614a1 commit 4e3d565
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.io.BufferedReader;
import java.io.InputStream;
Expand All @@ -39,51 +37,56 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ

private final ThreadPool threadPool;
private final Client client;
private final ClusterService clusterService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, ClusterService clusterService) {
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider) {
super(settings, PreviewDatafeedAction.NAME, transportService, actionFilters,
(Supplier<PreviewDatafeedAction.Request>) PreviewDatafeedAction.Request::new);
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
}

@Override
protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}

DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeed);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
previewDatafeed.setHeaders(headers);
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, previewDatafeed.build(), job, new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
datafeedConfigBuilder -> {
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
jobBuilder -> {
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
previewDatafeed.setHeaders(headers);
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(),
new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
},
listener::onFailure
));
},
listener::onFailure
));
}

/** Visible for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -50,35 +48,35 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS

private final ThreadPool threadPool;
private final PersistentTasksService persistentTasksService;
private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportStopDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, ClusterService clusterService,
PersistentTasksService persistentTasksService) {
PersistentTasksService persistentTasksService, DatafeedConfigProvider datafeedConfigProvider) {
super(settings, StopDatafeedAction.NAME, clusterService, transportService, actionFilters,
StopDatafeedAction.Request::new, StopDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.threadPool = threadPool;
this.persistentTasksService = persistentTasksService;
this.datafeedConfigProvider = datafeedConfigProvider;

}

/**
* Resolve the requested datafeeds and add their IDs to one of the list
* arguments depending on datafeed state.
* Sort the datafeed IDs the their task state and add to one
* of the list arguments depending on the state.
*
* @param request The stop datafeed request
* @param mlMetadata ML Metadata
* @param expandedDatafeedIds The expanded set of IDs
* @param tasks Persistent task meta data
* @param startedDatafeedIds Started datafeed ids are added to this list
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
*/
static void resolveDataFeedIds(StopDatafeedAction.Request request, MlMetadata mlMetadata,
PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {

Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
for (String expandedDatafeedId : expandedDatafeedIds) {
validateDatafeedTask(expandedDatafeedId, mlMetadata);
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
startedDatafeedIds, stoppingDatafeedIds);
}
Expand All @@ -102,20 +100,6 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId,
}
}

/**
* Validate the stop request.
* Throws an {@code ResourceNotFoundException} if there is no datafeed
* with id {@code datafeedId}
* @param datafeedId The datafeed Id
* @param mlMetadata ML meta data
*/
static void validateDatafeedTask(String datafeedId, MlMetadata mlMetadata) {
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
if (datafeed == null) {
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
}
}

@Override
protected void doExecute(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener) {
final ClusterState state = clusterService.state();
Expand All @@ -130,23 +114,27 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new));
}
} else {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
expandedIds -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
resolveDataFeedIds(request, mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
}
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));

if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
}
if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
}
},
listener::onFailure
));
}
}

Expand Down
Loading

0 comments on commit 4e3d565

Please sign in to comment.