Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in Index: Stop and preview datafeed #34605

Merged
merged 4 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ public TransportStopDatafeedAction(Settings settings, TransportService transport
}

/**
* Resolve the requested datafeeds and add their IDs to one of the list
* arguments depending on datafeed state.
* Sort the datafeed IDs the their task state and add to one
* of the list arguments depending on the state.
*
* @param expandedDatafeedIds The expanded set of IDs
* @param tasks Persistent task meta data
* @param startedDatafeedIds Started datafeed ids are added to this list
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
*/
static void resolveDataFeedIds(Set<String> expandedDatafeedIds,
PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we also have to rename the unit tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

PersistentTasksCustomMetaData tasks,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {

for (String expandedDatafeedId : expandedDatafeedIds) {
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
Expand Down Expand Up @@ -121,7 +121,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
resolveDataFeedIds(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
listener.onResponse(new StopDatafeedAction.Response(true));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public void testResolveDataFeedIds_GivenDatafeedId() {

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
TransportStopDatafeedAction.resolveDataFeedIds(Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds);
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
Collections.singleton("datafeed_1"), tasks, startedDatafeeds, stoppingDatafeeds);
assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);

startedDatafeeds.clear();
stoppingDatafeeds.clear();
TransportStopDatafeedAction.resolveDataFeedIds(Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds);
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
Collections.singleton("datafeed_2"), tasks, startedDatafeeds, stoppingDatafeeds);
assertEquals(Collections.emptyList(), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
}
Expand All @@ -48,14 +50,14 @@ public void testResolveDataFeedIds_GivenAll() {

List<String> startedDatafeeds = new ArrayList<>();
List<String> stoppingDatafeeds = new ArrayList<>();
TransportStopDatafeedAction.resolveDataFeedIds(
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(
new HashSet<>(Arrays.asList("datafeed_1", "datafeed_2", "datafeed_3")), tasks, startedDatafeeds, stoppingDatafeeds);
assertEquals(Collections.singletonList("datafeed_1"), startedDatafeeds);
assertEquals(Collections.singletonList("datafeed_3"), stoppingDatafeeds);

startedDatafeeds.clear();
stoppingDatafeeds.clear();
TransportStopDatafeedAction.resolveDataFeedIds(Collections.singleton("datafeed_2"), tasks, startedDatafeeds,
TransportStopDatafeedAction.sortDatafeedIdsByTaskState(Collections.singleton("datafeed_2"), tasks, startedDatafeeds,
stoppingDatafeeds);
assertEquals(Collections.emptyList(), startedDatafeeds);
assertEquals(Collections.emptyList(), stoppingDatafeeds);
Expand Down