Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Store ml job configurations in the new .ml-config index #36698

Merged
merged 63 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
0fc87c8
[ML] Job and datafeed mappings with index template (#32719)
davidkyle Aug 9, 2018
44322b1
[ML] Job config document CRUD operations (#32738)
davidkyle Aug 13, 2018
2e6c9ed
[ML] Datafeed config CRUD operations (#32854)
davidkyle Aug 16, 2018
e4eb4d7
[ML] Change JobManager to work with Job config in index (#33064)
davidkyle Aug 29, 2018
3b98b3a
Fix check style and compilation error after rebase
davidkyle Aug 29, 2018
07cceaf
[ML] Change Datafeed actions to read config from the config index (#3…
davidkyle Sep 18, 2018
3703fbb
[ML] Allocate jobs based on JobParams rather than cluster state confi…
davidkyle Sep 27, 2018
0803d09
[ML] Return missing job error when .ml-config is does not exist (#34177)
davidkyle Oct 1, 2018
afb238b
[ML] Close job in index (#34217)
davidkyle Oct 18, 2018
47787b3
[ML] Adjust finalize job action to work with documents (#34226)
davidkyle Oct 16, 2018
9874b2f
[ML] Job in index: Datafeed node selector (#34218)
davidkyle Oct 17, 2018
7c8b98e
[ML] Job in Index: Stop and preview datafeed (#34605)
davidkyle Oct 19, 2018
2794d12
[ML] Delete job document (#34595)
davidkyle Oct 19, 2018
33cf46e
[ML] Convert job data remover to work with index configs (#34532)
davidkyle Oct 18, 2018
6d36bb8
[ML] Job in index: delete filter action (#34642)
dimitris-athanasiou Oct 19, 2018
a646f8e
[ML] Job in index: Get datafeed and job stats from index (#34645)
davidkyle Oct 22, 2018
1cac6ba
[ML] Job in Index: Convert get calendar events to index docs (#34710)
davidkyle Oct 23, 2018
344b2ab
Merge branch '6.x' into feature-jindex-6x
davidkyle Oct 26, 2018
625e675
[ML] Job in Index: Enable integ tests (#34851)
davidkyle Oct 29, 2018
61b178a
Merge branch '6.x' into feature-jindex-6x
davidkyle Oct 30, 2018
3a0a5e7
Merge branch '6.x' into feature-jindex-6x
davidkyle Nov 7, 2018
2b6cd7a
[ML] Reimplement established model memory (#35263)
droberts195 Nov 13, 2018
64ba62a
[ML] Job In Index: Enable GET APIS in mixed state (#35344)
davidkyle Nov 13, 2018
99798d5
[ML] Need to wait for shards to replicate in distributed test (#35541)
droberts195 Nov 14, 2018
0046d55
[ML] Job in index: Restore ability to update cluster state jobs (#35539)
davidkyle Nov 14, 2018
89ca6f9
Merge branch '6.x' into feature-jindex-6x
davidkyle Nov 15, 2018
971681e
Merge branch '6.x' into feature-jindex-6x
davidkyle Nov 16, 2018
eb6b75e
[ML] Job in index: Enable delete actions for clusterstate config (#35…
davidkyle Nov 16, 2018
49781b9
Merge branch '6.x' into feature-jindex-6x
davidkyle Nov 19, 2018
15a9fdf
[ML] Job in index: Enable get and update actions for clusterstate job…
davidkyle Nov 19, 2018
f003557
Merge branch '6.x' into feature-jindex-6x
davidkyle Nov 21, 2018
4fd00d6
[ML] Jindex: Rolling upgrade tests (#35700)
davidkyle Nov 23, 2018
6aadcbf
[ML] Correct order of actions in test
davidkyle Nov 26, 2018
a3ce149
[ML] Jindex: Prefer index config documents to cluster state config (#…
davidkyle Nov 28, 2018
4d6f556
Merge branch '6.x' into feature-jindex-6x
davidkyle Nov 28, 2018
4e98158
[ML] Prefer cluster state config to index documents (#36014)
davidkyle Nov 29, 2018
1a58e69
[ML] Replace Version.CURRENT in streaming functions (#36118)
davidkyle Dec 3, 2018
93044e2
[ML] Job In Index: Migrate config from the clusterstate (#35834)
davidkyle Dec 5, 2018
dfc5dd4
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 5, 2018
cd9282d
[ML] Use 'anomaly-detector' in job config doc name (#36254)
davidkyle Dec 5, 2018
5a69842
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 6, 2018
42500a9
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 6, 2018
a2543ec
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 7, 2018
a3769aa
[ML] JIndex: Job exists and get job should read cluster state first. …
davidkyle Dec 10, 2018
fb1e90a
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 11, 2018
bad7351
[ML] Default search size for configs
davidkyle Dec 12, 2018
4bc9c98
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 12, 2018
fdf48aa
[ML][TEST] Fix NPE in JobManagerTests
davidkyle Dec 12, 2018
2d3de5d
[ML] JIndex: Prevent updates to migrating configs and upgrade tests (…
davidkyle Dec 13, 2018
2c872a5
Merge branch '6.x' into feature-jindex-6x
droberts195 Dec 13, 2018
76858bb
[ML][TEST] fix naming
davidkyle Dec 13, 2018
2e2495c
[ML] Full cluster restart tests for migration (#36593)
davidkyle Dec 14, 2018
e0a58c9
[ML] Adapt to periodic persistent task refresh (#36494)
droberts195 Dec 14, 2018
a2c53ab
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 14, 2018
8ad6379
Fix TooManyJobsIT.testMultipleNodes
droberts195 Dec 14, 2018
2c3e70a
[ML] Remove unreliable test assertions
davidkyle Dec 14, 2018
ed26ae8
Use execute() instead of submit() in MlMemoryTracker
droberts195 Dec 14, 2018
28b99f0
[ML] JIindex: Limit the size of bulk migrations (#36481)
davidkyle Dec 16, 2018
5d01a32
Merge branch '6.x' into feature-jindex-6x
davidkyle Dec 17, 2018
88b14bd
[FEATURE][ML] Add cluster setting that enables/disables config migra…
dimitris-athanasiou Dec 17, 2018
ec4601e
[ML] Snapshot ml configs before migrating (#36645)
davidkyle Dec 17, 2018
b6b1f0d
[FEATURE][ML] Split in batches and migrate all jobs and datafeeds (#3…
dimitris-athanasiou Dec 17, 2018
fb7e3f5
Merge branch '6.x' into feature-jindex-6x
droberts195 Dec 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testIndexTemplatesCreated() throws Exception {
if (masterIsNewVersion()) {
// Everything else waits until the master is upgraded to create its templates
expectedTemplates.add(".ml-anomalies-");
expectedTemplates.add(".ml-config");
expectedTemplates.add(".ml-meta");
expectedTemplates.add(".ml-notifications");
expectedTemplates.add(".ml-state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.xpack.core.logstash.LogstashFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
Expand Down Expand Up @@ -363,9 +364,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
// ML - Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new),
// ML - Task states
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
Expand Down Expand Up @@ -433,9 +434,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
// ML - Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
StartDatafeedAction.DatafeedParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
OpenJobAction.JobParams::fromXContent),
// ML - Task states
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public final class MlMetaIndex {
*/
public static final String INDEX_NAME = ".ml-meta";

public static final String INCLUDE_TYPE_KEY = "include_type";

public static final String TYPE = "doc";

private MlMetaIndex() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
Expand Down Expand Up @@ -87,8 +86,13 @@ public boolean isGroupOrJob(String id) {
return groupOrJobLookup.isGroupOrJob(id);
}

public Set<String> expandJobIds(String expression, boolean allowNoJobs) {
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
public Set<String> expandJobIds(String expression) {
return groupOrJobLookup.expandJobIds(expression);
}

// Matches only groups
public Set<String> expandGroupIds(String expression) {
return groupOrJobLookup.expandGroupIds(expression);
}

public boolean isJobDeleting(String jobId) {
Expand All @@ -108,9 +112,9 @@ public Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst();
}

public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds) {
return NameResolver.newUnaliased(datafeeds.keySet(), ExceptionsHelper::missingDatafeedException)
.expand(expression, allowNoDatafeeds);
public Set<String> expandDatafeedIds(String expression) {
return NameResolver.newUnaliased(datafeeds.keySet())
.expand(expression);
}

@Override
Expand Down Expand Up @@ -146,7 +150,6 @@ public MlMetadata(StreamInput in) throws IOException {
datafeeds.put(in.readString(), new DatafeedConfig(in));
}
this.datafeeds = datafeeds;

this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
}

Expand All @@ -167,7 +170,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
DelegatingMapParams extendedParams =
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
return builder;
Expand Down Expand Up @@ -196,9 +199,14 @@ public MlMetadataDiff(StreamInput in) throws IOException {
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readSchedulerDiffFrom);
MlMetadataDiff::readDatafeedDiffFrom);
}

/**
* Merge the diff with the ML metadata.
* @param part The current ML metadata.
* @return The new ML metadata.
*/
@Override
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
Expand All @@ -221,7 +229,7 @@ static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(Job::new, in);
}

static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
static Diff<DatafeedConfig> readDatafeedDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
}
}
Expand Down Expand Up @@ -295,7 +303,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {

public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
if (datafeeds.containsKey(datafeedConfig.getId())) {
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
}
String jobId = datafeedConfig.getJobId();
checkJobIsAvailableForDatafeed(jobId);
Expand Down Expand Up @@ -369,14 +377,14 @@ private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, Per
}
}

private Builder putJobs(Collection<Job> jobs) {
public Builder putJobs(Collection<Job> jobs) {
for (Job job : jobs) {
putJob(job, true);
}
return this;
}

private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
for (DatafeedConfig datafeed : datafeeds) {
this.datafeeds.put(datafeed.getId(), datafeed);
}
Expand Down Expand Up @@ -421,8 +429,6 @@ void checkJobHasNoDatafeed(String jobId) {
}
}



public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
if (mlMetadata == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,19 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class MlTasks {

public static final String JOB_TASK_NAME = "xpack/ml/job";
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";

private static final String JOB_TASK_ID_PREFIX = "job-";
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";

private MlTasks() {
}

Expand All @@ -22,15 +33,15 @@ private MlTasks() {
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
return JOB_TASK_ID_PREFIX + jobId;
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
*/
public static String datafeedTaskId(String datafeedId) {
return "datafeed-" + datafeedId;
return DATAFEED_TASK_ID_PREFIX + datafeedId;
}

@Nullable
Expand Down Expand Up @@ -67,4 +78,64 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
return DatafeedState.STOPPED;
}
}

/**
* The job Ids of anomaly detector job tasks.
* All anomaly detector jobs are returned regardless of the status of the
* task (OPEN, CLOSED, FAILED etc).
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The job Ids of anomaly detector job tasks
*/
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* The datafeed Ids of started datafeed tasks
*
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The Ids of running datafeed tasks
*/
public static Set<String> startedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return True if the job has a task
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
}

/**
* Read the active anomaly detector job tasks.
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
*
* @param tasks Persistent tasks
* @return The job tasks excluding closed and failed jobs
*/
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

Expand All @@ -35,7 +37,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, AcknowledgedRes

public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/open";
public static final String TASK_NAME = "xpack/ml/job";


private OpenJobAction() {
super(NAME);
Expand Down Expand Up @@ -136,15 +138,16 @@ public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {

/** TODO Remove in 7.0.0 */
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");

public static final ParseField TIMEOUT = new ParseField("timeout");
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, JobParams::new);
public static final ParseField JOB = new ParseField("job");

public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareObject(JobParams::setJob, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOB);
}

public static JobParams fromXContent(XContentParser parser) {
Expand All @@ -163,6 +166,7 @@ public static JobParams parseRequest(String jobId, XContentParser parser) {
// A big state can take a while to restore. For symmetry with the _close endpoint any
// changes here should be reflected there too.
private TimeValue timeout = MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT;
private Job job;

JobParams() {
}
Expand All @@ -178,6 +182,9 @@ public JobParams(StreamInput in) throws IOException {
in.readBoolean();
}
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
job = in.readOptionalWriteable(Job::new);
}
}

public String getJobId() {
Expand All @@ -196,9 +203,18 @@ public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

@Nullable
public Job getJob() {
return job;
}

public void setJob(Job job) {
this.job = job;
}

@Override
public String getWriteableName() {
return TASK_NAME;
return MlTasks.JOB_TASK_NAME;
}

@Override
Expand All @@ -209,20 +225,27 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
}
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalWriteable(job);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
if (job != null) {
builder.field("job", job);
}
builder.endObject();
// The job field is streamed but not persisted
return builder;
}

@Override
public int hashCode() {
return Objects.hash(jobId, timeout);
return Objects.hash(jobId, timeout, job);
}

@Override
Expand All @@ -235,7 +258,8 @@ public boolean equals(Object obj) {
}
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(timeout, other.timeout);
Objects.equals(timeout, other.timeout) &&
Objects.equals(job, other.job);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
datafeed.doXContentBody(builder, params);
builder.endObject();
datafeed.toXContent(builder, params);
return builder;
}

Expand Down
Loading