Skip to content

Commit

Permalink
[FEATURE][ML] Split in batches and migrate all jobs and datafeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitris-athanasiou committed Dec 17, 2018
1 parent ec4601e commit 5105df2
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,14 +98,14 @@ public class MlConfigMigrator {
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

private final AtomicBoolean migrationInProgress;
private final AtomicBoolean firstTime;
private final AtomicBoolean tookConfigSnapshot;

public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.migrationInProgress = new AtomicBoolean(false);
this.firstTime = new AtomicBoolean(true);
this.tookConfigSnapshot = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -135,12 +137,7 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
return;
}

Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));

JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites(stoppedDatafeeds, eligibleJobs);
logger.debug("migrating ml configurations");

ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
Expand All @@ -153,37 +150,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

if (firstTime.get()) {
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
firstTime.set(false);
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
return;
}
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);

migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
}

private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener<Boolean> listener) {
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap(
private void migrateBatches(List<JobsAndDatafeeds> batches, ActionListener<Boolean> listener) {
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(EsExecutors.newDirectExecutorService(), true);
for (JobsAndDatafeeds batch : batches) {
chainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap(
failedDocumentIds -> {
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs);
List<String> successfulDatafeedWrites =
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener);
filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener);
},
listener::onFailure
));
chainedListener::onFailure
)));
}
chainTaskExecutor.execute(ActionListener.wrap(aVoid -> listener.onResponse(true), listener::onFailure));
}

// Exposed for testing
Expand All @@ -208,9 +204,9 @@ public void writeConfigToIndex(Collection<DatafeedConfig> datafeedsToMigrate,
}

private void removeFromClusterState(List<String> jobsToRemoveIds, List<String> datafeedsToRemoveIds,
ActionListener<Boolean> listener) {
ActionListener<Void> listener) {
if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) {
listener.onResponse(Boolean.FALSE);
listener.onResponse(null);
return;
}

Expand Down Expand Up @@ -244,7 +240,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.info("ml datafeed configurations migrated: {}", removedConfigs.get().removedDatafeedIds);
}
}
listener.onResponse(Boolean.TRUE);
listener.onResponse(null);
}
});
}
Expand Down Expand Up @@ -326,12 +322,17 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

if (tookConfigSnapshot.get()) {
listener.onResponse(true);
return;
}

if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) {
listener.onResponse(Boolean.TRUE);
listener.onResponse(true);
return;
}

logger.debug("taking a snapshot of mlmetadata");
logger.debug("taking a snapshot of ml_metadata");
String documentId = "ml-config";
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, documentId)
Expand All @@ -345,7 +346,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen

indexRequest.setSource(builder);
} catch (IOException e) {
logger.error("failed to serialise mlmetadata", e);
logger.error("failed to serialise ml_metadata", e);
listener.onFailure(e);
return;
}
Expand Down Expand Up @@ -437,6 +438,22 @@ public int totalCount() {
}
}

public static List<JobsAndDatafeeds> splitInBatches(ClusterState clusterState) {
Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));

List<JobsAndDatafeeds> batches = new ArrayList<>();
while (stoppedDatafeeds.isEmpty() == false || eligibleJobs.isEmpty() == false) {
JobsAndDatafeeds batch = limitWrites(stoppedDatafeeds, eligibleJobs);
batches.add(batch);
stoppedDatafeeds.removeAll(batch.datafeedConfigs);
batch.jobs.forEach(job -> eligibleJobs.remove(job.getId()));
}
return batches;
}

/**
* Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring
* datafeed and job pairs so if a datafeed is chosen so is its job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public void testWriteConfigToIndex() throws InterruptedException {
}

public void testMigrateConfigs() throws InterruptedException, IOException {

// and jobs and datafeeds clusterstate
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
Expand Down Expand Up @@ -166,6 +165,82 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
}

public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
int jobCount = randomIntBetween(150, 201);
int datafeedCount = randomIntBetween(150, jobCount);

// and jobs and datafeeds clusterstate
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
for (int i = 0; i < jobCount; i++) {
mlMetadata.putJob(buildJobBuilder("job-" + i).build(), false);
}
for (int i = 0; i < datafeedCount; i++) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-" + i, "job-" + i);
builder.setIndices(Collections.singletonList("beats*"));
mlMetadata.putDatafeed(builder.build(), Collections.emptyMap());
}

ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.build();

doAnswer(invocation -> {
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
return null;
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());

AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();

// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertTrue(responseHolder.get());

// check the jobs have been migrated
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
jobsHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertThat(jobsHolder.get(), hasSize(jobCount));

// check datafeeds are migrated
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry());
AtomicReference<List<DatafeedConfig.Builder>> datafeedsHolder = new AtomicReference<>();
blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener),
datafeedsHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertThat(datafeedsHolder.get(), hasSize(datafeedCount));
}

public void testMigrateConfigs_GivenNoJobsOrDatafeeds() throws InterruptedException {
// Add empty ML metadata
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.build();

AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();

// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertFalse(responseHolder.get());
}

public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws InterruptedException {
Settings settings = Settings.builder().put(nodeSettings())
.put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), false)
Expand Down

0 comments on commit 5105df2

Please sign in to comment.