Skip to content

Commit

Permalink
[ML] Snapshot ml configs before migrating (elastic#36645)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Dec 17, 2018
1 parent 88b14bd commit ec4601e
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ public final class XPackRestTestHelper {
private XPackRestTestHelper() {
}

/**
* Waits for the Machine Learning templates to be created
* and check the version is up to date
*/


/**
* For each template name wait for the template to be created and
* for the template version to be equal to the master node version.
Expand Down Expand Up @@ -96,5 +90,4 @@ public static void waitForTemplates(RestClient client, List<String> templateName
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -26,57 +25,39 @@
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMasterListener {
public class MlAssignmentNotifier implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);

private final Auditor auditor;
private final ClusterService clusterService;
private final MlConfigMigrator mlConfigMigrator;
private final ThreadPool threadPool;
private final AtomicBoolean enabled = new AtomicBoolean(false);

MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addListener(this);
}

MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = mlConfigMigrator;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addListener(this);
}

@Override
public void onMaster() {
if (enabled.compareAndSet(false, true)) {
clusterService.addListener(this);
}
}

@Override
public void offMaster() {
if (enabled.compareAndSet(true, false)) {
clusterService.removeListener(this);
}
}

@Override
public String executorName() {
private String executorName() {
return ThreadPool.Names.GENERIC;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (enabled.get() == false) {

if (event.localNodeMaster() == false) {
return;
}

if (event.metaDataChanged() == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -31,12 +35,14 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -90,12 +96,14 @@ public class MlConfigMigrator {
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

private final AtomicBoolean migrationInProgress;
private final AtomicBoolean firstTime;

public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.migrationInProgress = new AtomicBoolean(false);
this.firstTime = new AtomicBoolean(true);
}

/**
Expand Down Expand Up @@ -127,9 +135,6 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
return;
}


logger.debug("migrating ml configurations");

Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
Expand All @@ -148,19 +153,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

if (firstTime.get()) {
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
firstTime.set(false);
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
return;
}

migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
}

private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener<Boolean> listener) {
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap(
failedDocumentIds -> {
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
List<String> successfulDatafeedWrites =
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener);
},
unMarkMigrationInProgress::onFailure
listener::onFailure
));
}

Expand Down Expand Up @@ -300,6 +322,45 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
return indexRequest;
}


// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) {
listener.onResponse(Boolean.TRUE);
return;
}

logger.debug("taking a snapshot of mlmetadata");
String documentId = "ml-config";
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, documentId)
.setOpType(DocWriteRequest.OpType.CREATE);

ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject();
mlMetadata.toXContent(builder, params);
builder.endObject();

indexRequest.setSource(builder);
} catch (IOException e) {
logger.error("failed to serialise mlmetadata", e);
listener.onFailure(e);
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
ActionListener.<IndexResponse>wrap(
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
listener::onFailure),
client::index
);
}


public static Job updateJobForMigration(Job job) {
Job.Builder builder = new Job.Builder(job);
Map<String, Object> custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,39 @@ private void setupMocks() {

public void testClusterChanged_info() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

DiscoveryNode node =
new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", null, tasksBuilder);
addJobTask("job_id", "_node_id", null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder().add(node))
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).info(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());

notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
}

public void testClusterChanged_warning() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
Expand All @@ -106,21 +111,31 @@ public void testClusterChanged_warning() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).warning(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());

notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
}

public void testClusterChanged_noPersistentTaskChanges() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
Expand All @@ -129,14 +144,25 @@ public void testClusterChanged_noPersistentTaskChanges() {
.metaData(metaData)
.build();

ClusterState current = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());

notifier.offMaster();
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
}
}
Loading

0 comments on commit ec4601e

Please sign in to comment.