Skip to content

Commit

Permalink
[ML] Ensure immutability of MlMetadata
Browse files Browse the repository at this point in the history
The test failure in elastic#31916 revealed that updating
rules on a job was modifying the detectors list
in-place. That meant the old cluster state and the
updated cluster state had no difference and thus the
change was not propagated to non-master nodes.

This commit fixes that and also reviews all of ML
metadata in order to ensure immutability.

Closes elastic#31916
  • Loading branch information
dimitris-athanasiou committed Jul 11, 2018
1 parent 25cd835 commit 319f5af
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,14 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.jobId = jobId;
this.queryDelay = queryDelay;
this.frequency = frequency;
this.indices = indices;
this.types = types;
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
this.types = types == null ? null : Collections.unmodifiableList(types);
this.query = query;
this.aggregations = aggregations;
this.scriptFields = scriptFields;
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.headers = Objects.requireNonNull(headers);
this.headers = headers == null ? null : Collections.unmodifiableMap(headers);
}

public DatafeedConfig(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, Lis
this.latency = latency;
this.categorizationFieldName = categorizationFieldName;
this.categorizationAnalyzerConfig = categorizationAnalyzerConfig;
this.categorizationFilters = categorizationFilters;
this.categorizationFilters = categorizationFilters == null ? null : Collections.unmodifiableList(categorizationFilters);
this.summaryCountFieldName = summaryCountFieldName;
this.influencers = influencers;
this.influencers = Collections.unmodifiableList(influencers);
this.overlappingBuckets = overlappingBuckets;
this.resultFinalizationWindow = resultFinalizationWindow;
this.multivariateByFields = multivariateByFields;
this.multipleBucketSpans = multipleBucketSpans;
this.multipleBucketSpans = multipleBucketSpans == null ? null : Collections.unmodifiableList(multipleBucketSpans);
this.usePerPartitionNormalization = usePerPartitionNormalization;
}

Expand Down Expand Up @@ -487,18 +487,20 @@ public Builder(List<Detector> detectors) {
}

public Builder(AnalysisConfig analysisConfig) {
this.detectors = analysisConfig.detectors;
this.detectors = new ArrayList<>(analysisConfig.detectors);
this.bucketSpan = analysisConfig.bucketSpan;
this.latency = analysisConfig.latency;
this.categorizationFieldName = analysisConfig.categorizationFieldName;
this.categorizationFilters = analysisConfig.categorizationFilters;
this.categorizationFilters = analysisConfig.categorizationFilters == null ? null
: new ArrayList<>(analysisConfig.categorizationFilters);
this.categorizationAnalyzerConfig = analysisConfig.categorizationAnalyzerConfig;
this.summaryCountFieldName = analysisConfig.summaryCountFieldName;
this.influencers = analysisConfig.influencers;
this.influencers = new ArrayList<>(analysisConfig.influencers);
this.overlappingBuckets = analysisConfig.overlappingBuckets;
this.resultFinalizationWindow = analysisConfig.resultFinalizationWindow;
this.multivariateByFields = analysisConfig.multivariateByFields;
this.multipleBucketSpans = analysisConfig.multipleBucketSpans;
this.multipleBucketSpans = analysisConfig.multipleBucketSpans == null ? null
: new ArrayList<>(analysisConfig.multipleBucketSpans);
this.usePerPartitionNormalization = analysisConfig.usePerPartitionNormalization;
}

Expand All @@ -518,6 +520,10 @@ public void setDetectors(List<Detector> detectors) {
this.detectors = sequentialIndexDetectors;
}

public void setDetector(int detectorIndex, Detector detector) {
detectors.set(detectorIndex, detector);
}

public void setBucketSpan(TimeValue bucketSpan) {
this.bucketSpan = bucketSpan;
}
Expand All @@ -543,7 +549,7 @@ public void setSummaryCountFieldName(String summaryCountFieldName) {
}

public void setInfluencers(List<String> influencers) {
this.influencers = influencers;
this.influencers = ExceptionsHelper.requireNonNull(influencers, INFLUENCERS.getPreferredName());
}

public void setOverlappingBuckets(Boolean overlappingBuckets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public Builder(Detector detector) {
partitionFieldName = detector.partitionFieldName;
useNull = detector.useNull;
excludeFrequent = detector.excludeFrequent;
rules = new ArrayList<>(detector.getRules());
rules = new ArrayList<>(detector.rules);
detectorIndex = detector.detectorIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private Job(String jobId, String jobType, Version jobVersion, List<String> group
this.jobId = jobId;
this.jobType = jobType;
this.jobVersion = jobVersion;
this.groups = groups;
this.groups = Collections.unmodifiableList(groups);
this.description = description;
this.createTime = createTime;
this.finishedTime = finishedTime;
Expand All @@ -207,7 +207,7 @@ private Job(String jobId, String jobType, Version jobVersion, List<String> group
this.backgroundPersistInterval = backgroundPersistInterval;
this.modelSnapshotRetentionDays = modelSnapshotRetentionDays;
this.resultsRetentionDays = resultsRetentionDays;
this.customSettings = customSettings;
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
this.modelSnapshotId = modelSnapshotId;
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.resultsIndexName = resultsIndexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,33 +373,33 @@ public Set<String> getUpdateFields() {
*/
public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
Job.Builder builder = new Job.Builder(source);
AnalysisConfig currentAnalysisConfig = source.getAnalysisConfig();
AnalysisConfig.Builder newAnalysisConfig = new AnalysisConfig.Builder(currentAnalysisConfig);

if (groups != null) {
builder.setGroups(groups);
}
if (description != null) {
builder.setDescription(description);
}
if (detectorUpdates != null && detectorUpdates.isEmpty() == false) {
AnalysisConfig ac = source.getAnalysisConfig();
int numDetectors = ac.getDetectors().size();
int numDetectors = currentAnalysisConfig.getDetectors().size();
for (DetectorUpdate dd : detectorUpdates) {
if (dd.getDetectorIndex() >= numDetectors) {
throw ExceptionsHelper.badRequestException("Supplied detector_index [{}] is >= the number of detectors [{}]",
dd.getDetectorIndex(), numDetectors);
}

Detector.Builder detectorbuilder = new Detector.Builder(ac.getDetectors().get(dd.getDetectorIndex()));
Detector.Builder detectorBuilder = new Detector.Builder(currentAnalysisConfig.getDetectors().get(dd.getDetectorIndex()));
if (dd.getDescription() != null) {
detectorbuilder.setDetectorDescription(dd.getDescription());
detectorBuilder.setDetectorDescription(dd.getDescription());
}
if (dd.getRules() != null) {
detectorbuilder.setRules(dd.getRules());
detectorBuilder.setRules(dd.getRules());
}
ac.getDetectors().set(dd.getDetectorIndex(), detectorbuilder.build());
}

AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(ac);
builder.setAnalysisConfig(acBuilder);
newAnalysisConfig.setDetector(dd.getDetectorIndex(), detectorBuilder.build());
}
}
if (modelPlotConfig != null) {
builder.setModelPlotConfig(modelPlotConfig);
Expand All @@ -422,9 +422,7 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
builder.setResultsRetentionDays(resultsRetentionDays);
}
if (categorizationFilters != null) {
AnalysisConfig.Builder analysisConfigBuilder = new AnalysisConfig.Builder(source.getAnalysisConfig());
analysisConfigBuilder.setCategorizationFilters(categorizationFilters);
builder.setAnalysisConfig(analysisConfigBuilder);
newAnalysisConfig.setCategorizationFilters(categorizationFilters);
}
if (customSettings != null) {
builder.setCustomSettings(customSettings);
Expand All @@ -446,6 +444,8 @@ public Job mergeWithJob(Job source, ByteSizeValue maxModelMemoryLimit) {
if (jobVersion != null) {
builder.setJobVersion(jobVersion);
}

builder.setAnalysisConfig(newAnalysisConfig);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public RuleScope() {
}

public RuleScope(Map<String, FilterRef> scope) {
this.scope = Objects.requireNonNull(scope);
this.scope = Collections.unmodifiableMap(scope);
}

public RuleScope(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ public void testMergeWithJob() {
jobBuilder.setAnalysisConfig(ac);
jobBuilder.setDataDescription(new DataDescription.Builder());
jobBuilder.setCreateTime(new Date());
Job job = jobBuilder.build();

Job updatedJob = update.mergeWithJob(jobBuilder.build(), new ByteSizeValue(0L));
Job updatedJob = update.mergeWithJob(job, new ByteSizeValue(0L));

assertEquals(update.getGroups(), updatedJob.getGroups());
assertEquals(update.getDescription(), updatedJob.getDescription());
Expand All @@ -172,12 +173,10 @@ public void testMergeWithJob() {
assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId());
assertEquals(update.getJobVersion(), updatedJob.getJobVersion());
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getDescription(),
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
assertNotNull(updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getDetectorDescription());
assertEquals(detectorUpdate.getRules(),
updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex()).getRules());
Detector updatedDetector = updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex());
assertNotNull(updatedDetector);
assertEquals(detectorUpdate.getDescription(), updatedDetector.getDetectorDescription());
assertEquals(detectorUpdate.getRules(), updatedDetector.getRules());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public void cleanUpTest() {
cleanUp();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/31916")
public void testCondition() throws Exception {
DetectionRule rule = new DetectionRule.Builder(Arrays.asList(
new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.LT, 100.0)
Expand Down

0 comments on commit 319f5af

Please sign in to comment.