diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index fc4e9133ec2a3..dc44c7b1479e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -56,9 +56,9 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener private void deleteExpiredData(ActionListener listener) { Auditor auditor = new Auditor(client, clusterService.nodeName()); List dataRemovers = Arrays.asList( - new ExpiredResultsRemover(client, clusterService, auditor), + new ExpiredResultsRemover(client, auditor), new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, threadPool, clusterService), + new ExpiredModelSnapshotsRemover(client, threadPool), new UnusedStateRemover(client, clusterService) ); Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java new file mode 100644 index 0000000000000..e274b720e701f --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.io.IOException; +import java.io.InputStream; + +public class BatchedJobsIterator extends BatchedDocumentsIterator { + + public BatchedJobsIterator(Client client, String index) { + super(client, index); + } + + @Override + protected QueryBuilder getQuery() { + return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE); + } + + @Override + protected Job.Builder map(SearchHit hit) { + try (InputStream stream = hit.getSourceRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + return Job.LENIENT_PARSER.apply(parser, null); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse job document [" + hit.getId() + "]", e); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 8364e015a3456..b595c564ab9aa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -6,23 +6,23 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; -import java.util.ArrayList; +import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Removes job data that expired with respect to their retention period. @@ -33,10 +33,10 @@ */ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { - private final ClusterService clusterService; + private final Client client; - AbstractExpiredJobDataRemover(ClusterService clusterService) { - this.clusterService = Objects.requireNonNull(clusterService); + AbstractExpiredJobDataRemover(Client client) { + this.client = client; } @Override @@ -44,12 +44,18 @@ public void remove(ActionListener listener) { removeData(newJobIterator(), listener); } - private void removeData(Iterator jobIterator, ActionListener listener) { + private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener) { if (jobIterator.hasNext() == false) { listener.onResponse(true); return; } Job job = jobIterator.next(); + if (job == null) { + // maybe null if the batched iterator search return no results + listener.onResponse(true); + return; + } + Long retentionDays = getRetentionDays(job); if (retentionDays == null) { removeData(jobIterator, listener); @@ -59,14 +65,9 @@ private void removeData(Iterator jobIterator, ActionListener liste removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure)); } - private Iterator newJobIterator() { - ClusterState clusterState = clusterService.state(); - List jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values()); - return createVolatileCursorIterator(jobs); - } - - protected static Iterator createVolatileCursorIterator(List items) { - return new VolatileCursorIterator(items); + private WrappedBatchedJobsIterator newJobIterator() { + BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); + return new WrappedBatchedJobsIterator(jobsIterator); } private long calcCutoffEpochMs(long retentionDays) { @@ -87,4 +88,49 @@ protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); } + + /** + * BatchedJobsIterator efficiently returns batches of jobs using a scroll + * search but AbstractExpiredJobDataRemover works with one job at a time. + * This class abstracts away the logic of pulling one job at a time from + * multiple batches. + */ + private class WrappedBatchedJobsIterator implements Iterator { + private final BatchedJobsIterator batchedIterator; + private VolatileCursorIterator currentBatch; + + WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) { + this.batchedIterator = batchedIterator; + } + + @Override + public boolean hasNext() { + return (currentBatch != null && currentBatch.hasNext()) || batchedIterator.hasNext(); + } + + /** + * Before BatchedJobsIterator has run a search it reports hasNext == true + * but the first search may return no results. In that case null is return + * and clients have to handle null. + */ + @Override + public Job next() { + if (currentBatch != null && currentBatch.hasNext()) { + return currentBatch.next(); + } + + // currentBatch is either null or all its elements have been iterated. + // get the next currentBatch + currentBatch = createBatchIteratorFromBatch(batchedIterator.next()); + + // BatchedJobsIterator.hasNext maybe true if searching the first time + // but no results are returned. + return currentBatch.hasNext() ? currentBatch.next() : null; + } + + private VolatileCursorIterator createBatchIteratorFromBatch(Deque builders) { + List jobs = builders.stream().map(Job.Builder::build).collect(Collectors.toList()); + return new VolatileCursorIterator<>(jobs); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 47a10a8aea381..2272121a68ff3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -27,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import java.util.ArrayList; import java.util.Iterator; @@ -57,8 +57,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover private final Client client; private final ThreadPool threadPool; - public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool, ClusterService clusterService) { - super(clusterService); + public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool) { + super(client); this.client = Objects.requireNonNull(client); this.threadPool = Objects.requireNonNull(threadPool); } @@ -103,7 +103,7 @@ public void onResponse(SearchResponse searchResponse) { for (SearchHit hit : searchResponse.getHits()) { modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); } - deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener); + deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); } catch (Exception e) { onFailure(e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index c882c90116880..dfa9d66814ebb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -48,8 +47,8 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private final Client client; private final Auditor auditor; - public ExpiredResultsRemover(Client client, ClusterService clusterService, Auditor auditor) { - super(clusterService); + public ExpiredResultsRemover(Client client, Auditor auditor) { + super(client); this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java new file mode 100644 index 0000000000000..c2318d1cb4664 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AbstractExpiredJobDataRemoverTests extends ESTestCase { + + // We can't test an abstract class so make a concrete class + // as simple as possible + private class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemover { + + private int getRetentionDaysCallCount = 0; + + ConcreteExpiredJobDataRemover(Client client) { + super(client); + } + + @Override + protected Long getRetentionDays(Job job) { + getRetentionDaysCallCount++; + // cover both code paths + return randomBoolean() ? null : 0L; + } + + @Override + protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { + listener.onResponse(Boolean.TRUE); + } + } + + private Client client; + + @Before + public void setUpTests() { + client = mock(Client.class); + } + + static SearchResponse createSearchResponse(List toXContents) throws IOException { + return createSearchResponse(toXContents, toXContents.size()); + } + + private static SearchResponse createSearchResponse(List toXContents, int totalHits) throws IOException { + SearchHit[] hitsArray = new SearchHit[toXContents.size()]; + for (int i = 0; i < toXContents.size(); i++) { + hitsArray[i] = new SearchHit(randomInt()); + XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); + toXContents.get(i).toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + hitsArray[i].sourceRef(BytesReference.bytes(jsonBuilder)); + } + SearchHits hits = new SearchHits(hitsArray, totalHits, 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(hits); + return searchResponse; + } + + public void testRemoveGivenNoJobs() throws IOException { + SearchResponse response = createSearchResponse(Collections.emptyList()); + + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); + + TestListener listener = new TestListener(); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); + remover.remove(listener); + + listener.waitToCompletion(); + assertThat(listener.success, is(true)); + assertEquals(remover.getRetentionDaysCallCount, 0); + } + + + public void testRemoveGivenMulipleBatches() throws IOException { + // This is testing AbstractExpiredJobDataRemover.WrappedBatchedJobsIterator + int totalHits = 7; + List responses = new ArrayList<>(); + responses.add(createSearchResponse(Arrays.asList( + JobTests.buildJobBuilder("job1").build(), + JobTests.buildJobBuilder("job2").build(), + JobTests.buildJobBuilder("job3").build() + ), totalHits)); + + responses.add(createSearchResponse(Arrays.asList( + JobTests.buildJobBuilder("job4").build(), + JobTests.buildJobBuilder("job5").build(), + JobTests.buildJobBuilder("job6").build() + ), totalHits)); + + responses.add(createSearchResponse(Collections.singletonList( + JobTests.buildJobBuilder("job7").build() + ), totalHits)); + + + AtomicInteger searchCount = new AtomicInteger(0); + + ActionFuture future = mock(ActionFuture.class); + doAnswer(invocationOnMock -> responses.get(searchCount.getAndIncrement())).when(future).actionGet(); + when(client.search(any())).thenReturn(future); + + TestListener listener = new TestListener(); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); + remover.remove(listener); + + listener.waitToCompletion(); + assertThat(listener.success, is(true)); + assertEquals(searchCount.get(), 3); + assertEquals(remover.getRetentionDaysCallCount, 7); + } + + static class TestListener implements ActionListener { + + boolean success; + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onResponse(Boolean aBoolean) { + success = aBoolean; + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + + public void waitToCompletion() { + try { + latch.await(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + fail("listener timed out before completing"); + } + } + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 9f056e91854c3..02d747fb80a50 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -5,27 +5,18 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.mock.orig.Mockito; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; @@ -40,26 +31,22 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; private ThreadPool threadPool; - private ClusterService clusterService; - private ClusterState clusterState; private List capturedSearchRequests; private List capturedDeleteModelSnapshotRequests; private List searchResponsesPerCall; @@ -70,9 +57,6 @@ public void setUpTests() { capturedSearchRequests = new ArrayList<>(); capturedDeleteModelSnapshotRequests = new ArrayList<>(); searchResponsesPerCall = new ArrayList<>(); - clusterService = mock(ClusterService.class); - clusterState = mock(ClusterState.class); - when(clusterService.state()).thenReturn(clusterState); client = mock(Client.class); listener = new TestListener(); @@ -89,7 +73,7 @@ public void shutdownThreadPool() throws InterruptedException { terminate(threadPool); } - public void testRemove_GivenJobsWithoutRetentionPolicy() { + public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("foo").build(), @@ -100,10 +84,11 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() { listener.waitToCompletion(); assertThat(listener.success, is(true)); + verify(client).search(any()); Mockito.verifyNoMoreInteractions(client); } - public void testRemove_GivenJobWithoutActiveSnapshot() { + public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { givenClientRequestsSucceed(); givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); @@ -111,6 +96,7 @@ public void testRemove_GivenJobWithoutActiveSnapshot() { listener.waitToCompletion(); assertThat(listener.success, is(true)); + verify(client).search(any()); Mockito.verifyNoMoreInteractions(client); } @@ -125,8 +111,8 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); createExpiredModelSnapshotsRemover().remove(listener); @@ -162,8 +148,8 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException { List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); createExpiredModelSnapshotsRemover().remove(listener); @@ -188,8 +174,8 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); createExpiredModelSnapshotsRemover().remove(listener); @@ -206,38 +192,22 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } - private void givenJobs(List jobs) { - Map jobsMap = new HashMap<>(); - jobs.stream().forEach(job -> jobsMap.put(job.getId(), job)); - MlMetadata mlMetadata = mock(MlMetadata.class); - when(mlMetadata.getJobs()).thenReturn(jobsMap); - MetaData metadata = mock(MetaData.class); - when(metadata.custom(MlMetadata.TYPE)).thenReturn(mlMetadata); - when(clusterState.getMetaData()).thenReturn(metadata); + private void givenJobs(List jobs) throws IOException { + SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); + + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); } private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { - return new ExpiredModelSnapshotsRemover(client, threadPool, clusterService); + return new ExpiredModelSnapshotsRemover(client, threadPool); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).build(); } - private static SearchResponse createSearchResponse(List modelSnapshots) throws IOException { - SearchHit[] hitsArray = new SearchHit[modelSnapshots.size()]; - for (int i = 0; i < modelSnapshots.size(); i++) { - hitsArray[i] = new SearchHit(randomInt()); - XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); - modelSnapshots.get(i).toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); - hitsArray[i].sourceRef(BytesReference.bytes(jsonBuilder)); - } - SearchHits hits = new SearchHits(hitsArray, hitsArray.length, 1.0f); - SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(hits); - return searchResponse; - } - private void givenClientRequestsSucceed() { givenClientRequests(true, true); } @@ -283,29 +253,4 @@ public Void answer(InvocationOnMock invocationOnMock) { }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); } - private class TestListener implements ActionListener { - - private boolean success; - private final CountDownLatch latch = new CountDownLatch(1); - - @Override - public void onResponse(Boolean aBoolean) { - success = aBoolean; - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - latch.countDown(); - } - - public void waitToCompletion() { - try { - latch.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - fail("listener timed out before completing"); - } - } - } - } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index af9ec8b84a6bd..7dc258a322ac3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -5,11 +5,10 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -18,7 +17,6 @@ import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -31,9 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; @@ -46,17 +42,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase { private Client client; - private ClusterService clusterService; - private ClusterState clusterState; private List capturedDeleteByQueryRequests; private ActionListener listener; @Before public void setUpTests() { capturedDeleteByQueryRequests = new ArrayList<>(); - clusterService = mock(ClusterService.class); - clusterState = mock(ClusterState.class); - when(clusterService.state()).thenReturn(clusterState); client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -74,17 +65,18 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { listener = mock(ActionListener.class); } - public void testRemove_GivenNoJobs() { + public void testRemove_GivenNoJobs() throws IOException { givenClientRequestsSucceed(); givenJobs(Collections.emptyList()); createExpiredResultsRemover().remove(listener); verify(listener).onResponse(true); + verify(client).search(any()); Mockito.verifyNoMoreInteractions(client); } - public void testRemove_GivenJobsWithoutRetentionPolicy() { + public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("foo").build(), @@ -94,6 +86,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() { createExpiredResultsRemover().remove(listener); verify(listener).onResponse(true); + verify(client).search(any()); Mockito.verifyNoMoreInteractions(client); } @@ -158,17 +151,15 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); } - private void givenJobs(List jobs) { - Map jobsMap = new HashMap<>(); - jobs.stream().forEach(job -> jobsMap.put(job.getId(), job)); - MlMetadata mlMetadata = mock(MlMetadata.class); - when(mlMetadata.getJobs()).thenReturn(jobsMap); - MetaData metadata = mock(MetaData.class); - when(metadata.custom(MlMetadata.TYPE)).thenReturn(mlMetadata); - when(clusterState.getMetaData()).thenReturn(metadata); + private void givenJobs(List jobs) throws IOException { + SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); + + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); } private ExpiredResultsRemover createExpiredResultsRemover() { - return new ExpiredResultsRemover(client, clusterService, mock(Auditor.class)); + return new ExpiredResultsRemover(client, mock(Auditor.class)); } } \ No newline at end of file