Skip to content

Commit

Permalink
[7.x][ML] Refresh state index before completing data frame analytics …
Browse files Browse the repository at this point in the history
…job (#50322) (#50324)

In order to ensure any persisted model state is searchable by the moment
the job reports itself as `stopped`, we need to refresh the state index
before completing.

This should fix the occasional failures we see in #50168 and #50313 where
the model state appears missing.

Closes #50168
Closes #50313

Backport of #50322
  • Loading branch information
dimitris-athanasiou authored Dec 18, 2019
1 parent 73ca2e5 commit d3c83cd
Showing 1 changed file with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -159,6 +160,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont
processContext.setFailureReason(resultProcessor.getFailure());

refreshDest(config);
refreshStateIndex(config.getId());
LOGGER.info("[{}] Result processor has completed", config.getId());
} catch (Exception e) {
if (task.isStopping()) {
Expand Down Expand Up @@ -288,6 +290,17 @@ private void refreshDest(DataFrameAnalyticsConfig config) {
() -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet());
}

private void refreshStateIndex(String jobId) {
String indexName = AnomalyDetectorsIndex.jobStateIndexPattern();
LOGGER.debug("[{}] Refresh index {}", jobId, indexName);

RefreshRequest refreshRequest = new RefreshRequest(indexName);
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
client.admin().indices().refresh(refreshRequest).actionGet();
}
}

private void closeProcess(DataFrameAnalyticsTask task) {
String configId = task.getParams().getId();
LOGGER.info("[{}] Closing process", configId);
Expand Down

0 comments on commit d3c83cd

Please sign in to comment.