Skip to content

Commit

Permalink
[ML] Convert job data remover to work with index configs (#34532)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 19, 2018
1 parent 2794d12 commit 33cf46e
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.nodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredResultsRemover(client, auditor),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool, clusterService),
new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.io.IOException;
import java.io.InputStream;

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {

public BatchedJobsIterator(Client client, String index) {
super(client, index);
}

@Override
protected QueryBuilder getQuery() {
return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
}

@Override
protected Job.Builder map(SearchHit hit) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return Job.LENIENT_PARSER.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse job document [" + hit.getId() + "]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;

import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Removes job data that expired with respect to their retention period.
Expand All @@ -33,23 +33,29 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final ClusterService clusterService;
private final Client client;

AbstractExpiredJobDataRemover(ClusterService clusterService) {
this.clusterService = Objects.requireNonNull(clusterService);
AbstractExpiredJobDataRemover(Client client) {
this.client = client;
}

@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
}

private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> listener) {
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
if (jobIterator.hasNext() == false) {
listener.onResponse(true);
return;
}
Job job = jobIterator.next();
if (job == null) {
// maybe null if the batched iterator search return no results
listener.onResponse(true);
return;
}

Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, listener);
Expand All @@ -59,14 +65,9 @@ private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> liste
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
}

private Iterator<Job> newJobIterator() {
ClusterState clusterState = clusterService.state();
List<Job> jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values());
return createVolatileCursorIterator(jobs);
}

protected static <T> Iterator<T> createVolatileCursorIterator(List<T> items) {
return new VolatileCursorIterator<T>(items);
private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

private long calcCutoffEpochMs(long retentionDays) {
Expand All @@ -87,4 +88,49 @@ protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs)
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));
}

/**
* BatchedJobsIterator efficiently returns batches of jobs using a scroll
* search but AbstractExpiredJobDataRemover works with one job at a time.
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
private class WrappedBatchedJobsIterator implements Iterator<Job> {
private final BatchedJobsIterator batchedIterator;
private VolatileCursorIterator<Job> currentBatch;

WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) {
this.batchedIterator = batchedIterator;
}

@Override
public boolean hasNext() {
return (currentBatch != null && currentBatch.hasNext()) || batchedIterator.hasNext();
}

/**
* Before BatchedJobsIterator has run a search it reports hasNext == true
* but the first search may return no results. In that case null is return
* and clients have to handle null.
*/
@Override
public Job next() {
if (currentBatch != null && currentBatch.hasNext()) {
return currentBatch.next();
}

// currentBatch is either null or all its elements have been iterated.
// get the next currentBatch
currentBatch = createBatchIteratorFromBatch(batchedIterator.next());

// BatchedJobsIterator.hasNext maybe true if searching the first time
// but no results are returned.
return currentBatch.hasNext() ? currentBatch.next() : null;
}

private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Builder> builders) {
List<Job> jobs = builders.stream().map(Job.Builder::build).collect(Collectors.toList());
return new VolatileCursorIterator<>(jobs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -27,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -57,8 +57,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
private final Client client;
private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool, ClusterService clusterService) {
super(clusterService);
public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ public void onResponse(SearchResponse searchResponse) {
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
}
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener);
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);
} catch (Exception e) {
onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -48,8 +47,8 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private final Client client;
private final Auditor auditor;

public ExpiredResultsRemover(Client client, ClusterService clusterService, Auditor auditor) {
super(clusterService);
public ExpiredResultsRemover(Client client, Auditor auditor) {
super(client);
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
}
Expand Down
Loading

0 comments on commit 33cf46e

Please sign in to comment.