Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in Index: Convert job data remover to work with index configs #34532

Merged
merged 2 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.nodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredResultsRemover(client, auditor),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool, clusterService),
new ExpiredModelSnapshotsRemover(client, threadPool),
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.io.IOException;
import java.io.InputStream;

public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need the builders, we could make this BatchedDocumentsIterator<Job> and build in map().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning I made the decision that builders are returned when the config document is read. This is proving to be the wrong choice as I now have to build everywhere, the job should be returned and it should be re-validated before opening a job etc. I will open an issue for this. Whether build is called by the client or in map is moot


public BatchedJobsIterator(Client client, String index) {
super(client, index);
}

@Override
protected QueryBuilder getQuery() {
return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
}

@Override
protected Job.Builder map(SearchHit hit) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return Job.LENIENT_PARSER.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse job document [" + hit.getId() + "]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;

import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Removes job data that expired with respect to their retention period.
Expand All @@ -33,23 +33,29 @@
*/
abstract class AbstractExpiredJobDataRemover implements MlDataRemover {

private final ClusterService clusterService;
private final Client client;

AbstractExpiredJobDataRemover(ClusterService clusterService) {
this.clusterService = Objects.requireNonNull(clusterService);
AbstractExpiredJobDataRemover(Client client) {
this.client = client;
}

@Override
public void remove(ActionListener<Boolean> listener) {
removeData(newJobIterator(), listener);
}

private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> listener) {
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) {
if (jobIterator.hasNext() == false) {
listener.onResponse(true);
return;
}
Job job = jobIterator.next();
if (job == null) {
// maybe null if the batched iterator search return no results
listener.onResponse(true);
return;
}

Long retentionDays = getRetentionDays(job);
if (retentionDays == null) {
removeData(jobIterator, listener);
Expand All @@ -59,14 +65,9 @@ private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> liste
removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure));
}

private Iterator<Job> newJobIterator() {
ClusterState clusterState = clusterService.state();
List<Job> jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values());
return createVolatileCursorIterator(jobs);
}

protected static <T> Iterator<T> createVolatileCursorIterator(List<T> items) {
return new VolatileCursorIterator<T>(items);
private WrappedBatchedJobsIterator newJobIterator() {
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
return new WrappedBatchedJobsIterator(jobsIterator);
}

private long calcCutoffEpochMs(long retentionDays) {
Expand All @@ -87,4 +88,49 @@ protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs)
.filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));
}

/**
* BatchedJobsIterator efficiently returns batches of jobs using a scroll
* search but AbstractExpiredJobDataRemover works with one job at a time.
* This class abstracts away the logic of pulling one job at a time from
* multiple batches.
*/
private class WrappedBatchedJobsIterator implements Iterator<Job> {
private final BatchedJobsIterator batchedIterator;
private VolatileCursorIterator<Job> currentBatch;

WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) {
this.batchedIterator = batchedIterator;
}

@Override
public boolean hasNext() {
return (currentBatch != null && currentBatch.hasNext()) || batchedIterator.hasNext();
}

/**
* Before BatchedJobsIterator has run a search it reports hasNext == true
* but the first search may return no results. In that case null is return
* and clients have to handle null.
*/
@Override
public Job next() {
if (currentBatch != null && currentBatch.hasNext()) {
return currentBatch.next();
}

// currentBatch is either null or all its elements have been iterated.
// get the next currentBatch
currentBatch = createBatchIteratorFromBatch(batchedIterator.next());

// BatchedJobsIterator.hasNext maybe true if searching the first time
// but no results are returned.
return currentBatch.hasNext() ? currentBatch.next() : null;
}

private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Builder> builders) {
List<Job> jobs = builders.stream().map(Job.Builder::build).collect(Collectors.toList());
return new VolatileCursorIterator<>(jobs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -27,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -57,8 +57,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
private final Client client;
private final ThreadPool threadPool;

public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool, ClusterService clusterService) {
super(clusterService);
public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool) {
super(client);
this.client = Objects.requireNonNull(client);
this.threadPool = Objects.requireNonNull(threadPool);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ public void onResponse(SearchResponse searchResponse) {
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
}
deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener);
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);
} catch (Exception e) {
onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -48,8 +47,8 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
private final Client client;
private final Auditor auditor;

public ExpiredResultsRemover(Client client, ClusterService clusterService, Auditor auditor) {
super(clusterService);
public ExpiredResultsRemover(Client client, Auditor auditor) {
super(client);
this.client = Objects.requireNonNull(client);
this.auditor = Objects.requireNonNull(auditor);
}
Expand Down
Loading