diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 62a4d503..47160911 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -94,6 +94,7 @@ import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestGetAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestIndexAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.rest.RestPreviewAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorInfoAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyResultAction; @@ -129,6 +130,8 @@ import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction; import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction; @@ -232,14 +235,11 @@ public List getRestHandlers( RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction(); RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction(); RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(); - RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction( - settings, - clusterService, - anomalyDetectorRunner - ); + RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(settings, clusterService); RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter); RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(settings, clusterService); RestSearchAnomalyDetectorInfoAction searchAnomalyDetectorInfoAction = new RestSearchAnomalyDetectorInfoAction(); + RestPreviewAnomalyDetectorAction previewAnomalyDetectorAction = new RestPreviewAnomalyDetectorAction(); return ImmutableList .of( @@ -251,7 +251,8 @@ public List getRestHandlers( executeAnomalyDetectorAction, anomalyDetectorJobAction, statsAnomalyDetectorAction, - searchAnomalyDetectorInfoAction + searchAnomalyDetectorInfoAction, + previewAnomalyDetectorAction ); } @@ -613,7 +614,8 @@ public List getNamedXContent() { new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class), new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class), new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class), - new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class) + new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class), + new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java index 05c57cc3..f72a7226 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java @@ -15,10 +15,8 @@ package com.amazon.opendistroforelasticsearch.ad.rest; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_FEATURES; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID; -import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PREVIEW; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.RUN; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -29,35 +27,23 @@ import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.action.RestActionListener; import org.elasticsearch.rest.action.RestToXContentListener; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; -import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput; import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting; import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest; -import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import com.google.common.collect.ImmutableList; /** @@ -66,21 +52,14 @@ public class RestExecuteAnomalyDetectorAction extends BaseRestHandler { public static final String DETECT_DATA_ACTION = "execute_anomaly_detector"; - public static final String ANOMALY_RESULT = "anomaly_result"; - public static final String ANOMALY_DETECTOR = "anomaly_detector"; - private final AnomalyDetectorRunner anomalyDetectorRunner; // TODO: apply timeout config private volatile TimeValue requestTimeout; - private volatile Integer maxAnomalyFeatures; private final Logger logger = LogManager.getLogger(RestExecuteAnomalyDetectorAction.class); - public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService, AnomalyDetectorRunner anomalyDetectorRunner) { - this.anomalyDetectorRunner = anomalyDetectorRunner; + public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService) { this.requestTimeout = REQUEST_TIMEOUT.get(settings); - maxAnomalyFeatures = MAX_ANOMALY_FEATURES.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it); - clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ANOMALY_FEATURES, it -> maxAnomalyFeatures = it); } @Override @@ -102,42 +81,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return; } - if (rawPath.endsWith(PREVIEW)) { - if (input.getDetector() != null) { - error = validateDetector(input.getDetector()); - if (StringUtils.isNotBlank(error)) { - channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error)); - return; - } - anomalyDetectorRunner - .executeDetector( - input.getDetector(), - input.getPeriodStart(), - input.getPeriodEnd(), - getPreviewDetectorActionListener(channel, input.getDetector()) - ); - } else { - preivewAnomalyDetector(client, channel, input); - } - } else if (rawPath.endsWith(RUN)) { - AnomalyResultRequest getRequest = new AnomalyResultRequest( - input.getDetectorId(), - input.getPeriodStart().toEpochMilli(), - input.getPeriodEnd().toEpochMilli() - ); - client.execute(AnomalyResultAction.INSTANCE, getRequest, new RestToXContentListener<>(channel)); - } + AnomalyResultRequest getRequest = new AnomalyResultRequest( + input.getDetectorId(), + input.getPeriodStart().toEpochMilli(), + input.getPeriodEnd().toEpochMilli() + ); + client.execute(AnomalyResultAction.INSTANCE, getRequest, new RestToXContentListener<>(channel)); }; } - private String validateDetector(AnomalyDetector detector) { - if (detector.getFeatureAttributes().isEmpty()) { - return "Can't preview detector without feature"; - } else { - return RestHandlerUtils.validateAnomalyDetector(detector, maxAnomalyFeatures); - } - } - private AnomalyDetectorExecutionInput getAnomalyDetectorExecutionInput(RestRequest request) throws IOException { String detectorId = null; if (request.hasParam(DETECTOR_ID)) { @@ -166,75 +118,6 @@ private String validateAdExecutionInput(AnomalyDetectorExecutionInput input) { return null; } - private void preivewAnomalyDetector(NodeClient client, RestChannel channel, AnomalyDetectorExecutionInput input) { - if (!StringUtils.isBlank(input.getDetectorId())) { - GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(input.getDetectorId()); - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - client.get(getRequest, onGetAnomalyDetectorResponse(channel, input)); - } catch (Exception e) { - logger.error("Fail to get detector for preview", e); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); - } - } else { - channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, "Wrong input, no detector id")); - } - } - - private RestActionListener onGetAnomalyDetectorResponse(RestChannel channel, AnomalyDetectorExecutionInput input) { - return new RestActionListener(channel) { - @Override - protected void processResponse(GetResponse response) throws Exception { - if (!response.isExists()) { - XContentBuilder message = channel - .newErrorBuilder() - .startObject() - .field("message", "Can't find anomaly detector with id:" + response.getId()) - .endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, message)); - return; - } - XContentParser parser = XContentType.JSON - .xContent() - .createParser( - channel.request().getXContentRegistry(), - LoggingDeprecationHandler.INSTANCE, - response.getSourceAsBytesRef().streamInput() - ); - - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); - - anomalyDetectorRunner - .executeDetector( - detector, - input.getPeriodStart(), - input.getPeriodEnd(), - getPreviewDetectorActionListener(channel, detector) - ); - } - }; - } - - private ActionListener getPreviewDetectorActionListener(RestChannel channel, AnomalyDetector detector) { - return ActionListener.wrap(anomalyResult -> { - XContentBuilder builder = channel - .newBuilder() - .startObject() - .field(ANOMALY_RESULT, anomalyResult) - .field(ANOMALY_DETECTOR, detector) - .endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); - }, exception -> { - logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception); - try { - XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder)); - } catch (IOException e) { - logger.error("Fail to send back exception message" + detector.getDetectorId(), exception); - } - }); - } - @Override public List routes() { return ImmutableList @@ -243,11 +126,6 @@ public List routes() { new Route( RestRequest.Method.POST, String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, RUN) - ), - // preview detector - new Route( - RestRequest.Method.POST, - String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, PREVIEW) ) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestPreviewAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestPreviewAnomalyDetectorAction.java new file mode 100644 index 00000000..b958ec7b --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestPreviewAnomalyDetectorAction.java @@ -0,0 +1,129 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.rest; + +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PREVIEW; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; + +import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput; +import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting; +import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorRequest; +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; +import com.google.common.collect.ImmutableList; + +public class RestPreviewAnomalyDetectorAction extends BaseRestHandler { + + public static final String PREVIEW_ANOMALY_DETECTOR_ACTION = "preview_anomaly_detector"; + + private static final Logger logger = LogManager.getLogger(RestPreviewAnomalyDetectorAction.class); + + public RestPreviewAnomalyDetectorAction() {} + + @Override + public String getName() { + return PREVIEW_ANOMALY_DETECTOR_ACTION; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, org.elasticsearch.client.node.NodeClient client) throws IOException { + if (!EnabledSetting.isADPluginEnabled()) { + throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG); + } + + AnomalyDetectorExecutionInput input = getAnomalyDetectorExecutionInput(request); + + return channel -> { + String rawPath = request.rawPath(); + String error = validateAdExecutionInput(input); + if (StringUtils.isNotBlank(error)) { + channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error)); + return; + } + PreviewAnomalyDetectorRequest previewRequest = new PreviewAnomalyDetectorRequest( + input.getDetector(), + input.getDetectorId(), + input.getPeriodStart(), + input.getPeriodEnd() + ); + client.execute(PreviewAnomalyDetectorAction.INSTANCE, previewRequest, new RestToXContentListener<>(channel)); + }; + } + + private AnomalyDetectorExecutionInput getAnomalyDetectorExecutionInput(RestRequest request) throws IOException { + String detectorId = null; + if (request.hasParam(RestHandlerUtils.DETECTOR_ID)) { + detectorId = request.param(RestHandlerUtils.DETECTOR_ID); + } + + XContentParser parser = request.contentParser(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + AnomalyDetectorExecutionInput input = AnomalyDetectorExecutionInput.parse(parser, detectorId); + if (detectorId != null) { + input.setDetectorId(detectorId); + } + return input; + } + + private String validateAdExecutionInput(AnomalyDetectorExecutionInput input) { + if (StringUtils.isBlank(input.getDetectorId())) { + return "Must set anomaly detector id"; + } + if (input.getPeriodStart() == null || input.getPeriodEnd() == null) { + return "Must set both period start and end date with epoch of milliseconds"; + } + if (!input.getPeriodStart().isBefore(input.getPeriodEnd())) { + return "Period start date should be before end date"; + } + return null; + } + + @Override + public List routes() { + return ImmutableList + .of( + // preview detector + new Route( + RestRequest.Method.POST, + String + .format( + Locale.ROOT, + "%s/{%s}/%s", + AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, + RestHandlerUtils.DETECTOR_ID, + PREVIEW + ) + ) + ); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorAction.java new file mode 100644 index 00000000..3dbaef57 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorAction.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionType; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; + +public class PreviewAnomalyDetectorAction extends ActionType { + // External Action which used for public facing RestAPIs. + public static final String NAME = CommonValue.EXTERNAL_ACTION_PREFIX + "detector/preview"; + public static final PreviewAnomalyDetectorAction INSTANCE = new PreviewAnomalyDetectorAction(); + + private PreviewAnomalyDetectorAction() { + super(NAME, PreviewAnomalyDetectorResponse::new); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorRequest.java new file mode 100644 index 00000000..1e813664 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorRequest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; +import java.time.Instant; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; + +public class PreviewAnomalyDetectorRequest extends ActionRequest { + + private AnomalyDetector detector; + private String detectorId; + private Instant startTime; + private Instant endTime; + + public PreviewAnomalyDetectorRequest(StreamInput in) throws IOException { + super(in); + detector = new AnomalyDetector(in); + detectorId = in.readOptionalString(); + startTime = in.readInstant(); + endTime = in.readInstant(); + } + + public PreviewAnomalyDetectorRequest(AnomalyDetector detector, String detectorId, Instant startTime, Instant endTime) + throws IOException { + super(); + this.detector = detector; + this.detectorId = detectorId; + this.startTime = startTime; + this.endTime = endTime; + } + + public AnomalyDetector getDetector() { + return detector; + } + + public String getDetectorId() { + return detectorId; + } + + public Instant getStartTime() { + return startTime; + } + + public Instant getEndTime() { + return endTime; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + detector.writeTo(out); + out.writeOptionalString(detectorId); + out.writeInstant(startTime); + out.writeInstant(endTime); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorResponse.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorResponse.java new file mode 100644 index 00000000..80daaf0f --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorResponse.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; +import java.util.List; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; + +public class PreviewAnomalyDetectorResponse extends ActionResponse implements ToXContentObject { + public static final String ANOMALY_RESULT = "anomaly_result"; + public static final String ANOMALY_DETECTOR = "anomaly_detector"; + private List anomalyResult; + private AnomalyDetector detector; + + public PreviewAnomalyDetectorResponse(StreamInput in) throws IOException { + super(in); + anomalyResult = in.readList(AnomalyResult::new); + detector = new AnomalyDetector(in); + } + + public PreviewAnomalyDetectorResponse(List anomalyResult, AnomalyDetector detector) { + this.anomalyResult = anomalyResult; + this.detector = detector; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(anomalyResult); + detector.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(ANOMALY_RESULT, anomalyResult).field(ANOMALY_DETECTOR, detector).endObject(); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorTransportAction.java new file mode 100644 index 00000000..9fe6a746 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorTransportAction.java @@ -0,0 +1,174 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_FEATURES; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; + +public class PreviewAnomalyDetectorTransportAction extends + HandledTransportAction { + private final Logger logger = LogManager.getLogger(PreviewAnomalyDetectorTransportAction.class); + private final AnomalyDetectorRunner anomalyDetectorRunner; + private final Client client; + private final NamedXContentRegistry xContentRegistry; + private volatile Integer maxAnomalyFeatures; + + @Inject + public PreviewAnomalyDetectorTransportAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client, + AnomalyDetectorRunner anomalyDetectorRunner, + NamedXContentRegistry xContentRegistry + ) { + super(PreviewAnomalyDetectorAction.NAME, transportService, actionFilters, PreviewAnomalyDetectorRequest::new); + this.client = client; + this.anomalyDetectorRunner = anomalyDetectorRunner; + this.xContentRegistry = xContentRegistry; + maxAnomalyFeatures = MAX_ANOMALY_FEATURES.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ANOMALY_FEATURES, it -> maxAnomalyFeatures = it); + } + + @Override + protected void doExecute(Task task, PreviewAnomalyDetectorRequest request, ActionListener listener) { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + AnomalyDetector detector = request.getDetector(); + String detectorId = request.getDetectorId(); + Instant startTime = request.getStartTime(); + Instant endTime = request.getEndTime(); + if (detector != null) { + String error = validateDetector(detector); + if (StringUtils.isNotBlank(error)) { + listener.onFailure(new ElasticsearchException(error, RestStatus.BAD_REQUEST)); + return; + } + anomalyDetectorRunner.executeDetector(detector, startTime, endTime, getPreviewDetectorActionListener(listener, detector)); + } else { + previewAnomalyDetector(listener, detectorId, startTime, endTime); + } + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } + } + + private String validateDetector(AnomalyDetector detector) { + if (detector.getFeatureAttributes().isEmpty()) { + return "Can't preview detector without feature"; + } else { + return RestHandlerUtils.validateAnomalyDetector(detector, maxAnomalyFeatures); + } + } + + private ActionListener> getPreviewDetectorActionListener( + ActionListener listener, + AnomalyDetector detector + ) { + return ActionListener.wrap(new CheckedConsumer, Exception>() { + @Override + public void accept(List anomalyResult) throws Exception { + PreviewAnomalyDetectorResponse response = new PreviewAnomalyDetectorResponse(anomalyResult, detector); + listener.onResponse(response); + } + }, exception -> { + logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception); + listener + .onFailure( + new ElasticsearchException( + "Unexpected error running anomaly detector " + detector.getDetectorId(), + RestStatus.INTERNAL_SERVER_ERROR + ) + ); + }); + } + + private void previewAnomalyDetector( + ActionListener listener, + String detectorId, + Instant startTime, + Instant endTime + ) { + if (!StringUtils.isBlank(detectorId)) { + GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(detectorId); + client.get(getRequest, onGetAnomalyDetectorResponse(listener, startTime, endTime)); + } else { + listener.onFailure(new ElasticsearchException("Wrong input, no detector id", RestStatus.BAD_REQUEST)); + } + } + + private ActionListener onGetAnomalyDetectorResponse( + ActionListener listener, + Instant startTime, + Instant endTime + ) { + return ActionListener.wrap(new CheckedConsumer() { + @Override + public void accept(GetResponse response) throws Exception { + if (!response.isExists()) { + listener + .onFailure( + new ElasticsearchException("Can't find anomaly detector with id:" + response.getId(), RestStatus.NOT_FOUND) + ); + return; + } + + try { + XContentParser parser = RestHandlerUtils + .createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef()); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); + + anomalyDetectorRunner + .executeDetector(detector, startTime, endTime, getPreviewDetectorActionListener(listener, detector)); + } catch (IOException e) { + listener.onFailure(e); + } + } + }, exception -> { listener.onFailure(new ElasticsearchException("Could not execute get query to find detector")); }); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index 8b958768..82144629 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -113,6 +114,8 @@ import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; +import com.amazon.opendistroforelasticsearch.ad.feature.Features; +import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingResult; import com.amazon.opendistroforelasticsearch.ad.model.ADTask; import com.amazon.opendistroforelasticsearch.ad.model.ADTaskState; import com.amazon.opendistroforelasticsearch.ad.model.ADTaskType; @@ -477,6 +480,25 @@ public static Feature randomFeature(String featureName, String aggregationName, return new Feature(randomAlphaOfLength(5), featureName, enabled, testAggregation); } + public static Features randomFeatures() { + List> ranges = Arrays.asList(new AbstractMap.SimpleEntry<>(0L, 1L)); + double[][] unprocessed = new double[][] { { randomDouble(), randomDouble() } }; + double[][] processed = new double[][] { { randomDouble(), randomDouble() } }; + + return new Features(ranges, unprocessed, processed); + } + + public static List randomThresholdingResults() { + double grade = 1.; + double confidence = 0.5; + double score = 1.; + + ThresholdingResult thresholdingResult = new ThresholdingResult(grade, confidence, score); + List results = new ArrayList<>(); + results.add(thresholdingResult); + return results; + } + public static User randomUser() { return new User( randomAlphaOfLength(8), diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorActionTests.java new file mode 100644 index 00000000..cb57a0c7 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorActionTests.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.time.Instant; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.Assert; +import org.junit.Test; + +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class PreviewAnomalyDetectorActionTests extends ESSingleNodeTestCase { + + @Override + protected NamedWriteableRegistry writableRegistry() { + return getInstanceFromNode(NamedWriteableRegistry.class); + } + + @Test + public void testPreviewRequest() throws Exception { + BytesStreamOutput out = new BytesStreamOutput(); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of("testKey", "testValue"), Instant.now()); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest( + detector, + "1234", + Instant.now().minusSeconds(60), + Instant.now() + ); + request.writeTo(out); + NamedWriteableAwareStreamInput input = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), writableRegistry()); + PreviewAnomalyDetectorRequest newRequest = new PreviewAnomalyDetectorRequest(input); + Assert.assertEquals(request.getDetectorId(), newRequest.getDetectorId()); + Assert.assertEquals(request.getStartTime(), newRequest.getStartTime()); + Assert.assertEquals(request.getEndTime(), newRequest.getEndTime()); + Assert.assertNotNull(newRequest.getDetector()); + Assert.assertNull(newRequest.validate()); + } + + @Test + public void testPreviewResponse() throws Exception { + BytesStreamOutput out = new BytesStreamOutput(); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of("testKey", "testValue"), Instant.now()); + AnomalyResult result = TestHelpers.randomMultiEntityAnomalyDetectResult(0.8d, 0d); + PreviewAnomalyDetectorResponse response = new PreviewAnomalyDetectorResponse(ImmutableList.of(result), detector); + response.writeTo(out); + NamedWriteableAwareStreamInput input = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), writableRegistry()); + PreviewAnomalyDetectorResponse newResponse = new PreviewAnomalyDetectorResponse(input); + Assert.assertNotNull(newResponse.toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS)); + } + + @Test + public void testPreviewAction() throws Exception { + Assert.assertNotNull(PreviewAnomalyDetectorAction.INSTANCE.name()); + Assert.assertEquals(PreviewAnomalyDetectorAction.INSTANCE.name(), PreviewAnomalyDetectorAction.NAME); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java new file mode 100644 index 00000000..7f410e7a --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java @@ -0,0 +1,335 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.feature.FeatureManager; +import com.amazon.opendistroforelasticsearch.ad.feature.Features; +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; +import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; +import com.google.common.collect.ImmutableMap; + +public class PreviewAnomalyDetectorTransportActionTests extends ESSingleNodeTestCase { + private ActionListener response; + private PreviewAnomalyDetectorTransportAction action; + private AnomalyDetectorRunner runner; + private ClusterService clusterService; + private FeatureManager featureManager; + private ModelManager modelManager; + private Task task; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + task = mock(Task.class); + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.MAX_ANOMALY_FEATURES))) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + featureManager = mock(FeatureManager.class); + modelManager = mock(ModelManager.class); + runner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS); + action = new PreviewAnomalyDetectorTransportAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(ActionFilters.class), + client(), + runner, + xContentRegistry() + ); + } + + @SuppressWarnings("unchecked") + @Test + public void testPreviewTransportAction() throws IOException, InterruptedException { + final CountDownLatch inProgressLatch = new CountDownLatch(1); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of("testKey", "testValue"), Instant.now()); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest( + detector, + detector.getDetectorId(), + Instant.now(), + Instant.now() + ); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + try { + XContentBuilder previewBuilder = response.toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS); + Assert.assertNotNull(previewBuilder); + Map map = TestHelpers.XContentBuilderToMap(previewBuilder); + List results = (List) map.get("anomaly_result"); + Assert.assertNotNull(results); + Assert.assertTrue(results.size() > 0); + inProgressLatch.countDown(); + } catch (IOException e) { + // Should not reach here + Assert.assertTrue(false); + } + } + + @Override + public void onFailure(Exception e) { + // onFailure should not be called + Assert.assertTrue(false); + } + }; + + doReturn(TestHelpers.randomThresholdingResults()).when(modelManager).getPreviewResults(any()); + + doAnswer(responseMock -> { + Long startTime = responseMock.getArgument(1); + ActionListener listener = responseMock.getArgument(3); + listener.onResponse(TestHelpers.randomFeatures()); + return null; + }).when(featureManager).getPreviewFeatures(anyObject(), anyLong(), anyLong(), any()); + action.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testPreviewTransportActionWithNoFeature() throws IOException, InterruptedException { + // Detector with no feature, Preview should fail + final CountDownLatch inProgressLatch = new CountDownLatch(1); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(Collections.emptyList()); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest( + detector, + detector.getDetectorId(), + Instant.now(), + Instant.now() + ); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + Assert.assertTrue(false); + } + + @Override + public void onFailure(Exception e) { + Assert.assertTrue(e.getMessage().contains("Can't preview detector without feature")); + inProgressLatch.countDown(); + } + }; + action.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testPreviewTransportActionWithNoDetector() throws IOException, InterruptedException { + // When detectorId is null, preview should fail + final CountDownLatch inProgressLatch = new CountDownLatch(1); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest(null, "", Instant.now(), Instant.now()); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + Assert.assertTrue(false); + } + + @Override + public void onFailure(Exception e) { + Assert.assertTrue(e.getMessage().contains("Wrong input, no detector id")); + inProgressLatch.countDown(); + } + }; + action.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testPreviewTransportActionWithDetectorID() throws IOException, InterruptedException { + // When AD index does not exist, cannot query the detector + final CountDownLatch inProgressLatch = new CountDownLatch(1); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest(null, "1234", Instant.now(), Instant.now()); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + Assert.assertTrue(false); + } + + @Override + public void onFailure(Exception e) { + Assert.assertTrue(e.getMessage().contains("Could not execute get query to find detector")); + inProgressLatch.countDown(); + } + }; + action.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testPreviewTransportActionWithIndex() throws IOException, InterruptedException { + // When AD index exists, and detector does not exist + final CountDownLatch inProgressLatch = new CountDownLatch(1); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest(null, "1234", Instant.now(), Instant.now()); + Settings indexSettings = Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build(); + CreateIndexRequest indexRequest = new CreateIndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, indexSettings); + client().admin().indices().create(indexRequest).actionGet(); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + Assert.assertTrue(false); + } + + @Override + public void onFailure(Exception e) { + Assert.assertTrue(e.getMessage().contains("Can't find anomaly detector with id:1234")); + inProgressLatch.countDown(); + } + }; + action.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @Test + public void testPreviewTransportActionNoContext() throws IOException, InterruptedException { + final CountDownLatch inProgressLatch = new CountDownLatch(1); + Client client = mock(Client.class); + PreviewAnomalyDetectorTransportAction previewAction = new PreviewAnomalyDetectorTransportAction( + Settings.EMPTY, + mock(TransportService.class), + clusterService, + mock(ActionFilters.class), + client, + runner, + xContentRegistry() + ); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of("testKey", "testValue"), Instant.now()); + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest( + detector, + detector.getDetectorId(), + Instant.now(), + Instant.now() + ); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + Assert.assertTrue(false); + } + + @Override + public void onFailure(Exception e) { + Assert.assertTrue(e.getClass() == NullPointerException.class); + inProgressLatch.countDown(); + } + }; + previewAction.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unchecked") + @Test + public void testPreviewTransportActionWithDetector() throws IOException, InterruptedException { + final CountDownLatch inProgressLatch = new CountDownLatch(1); + CreateIndexResponse createResponse = TestHelpers + .createIndex(client().admin(), AnomalyDetector.ANOMALY_DETECTORS_INDEX, AnomalyDetectionIndices.getAnomalyDetectorMappings()); + Assert.assertNotNull(createResponse); + + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of("testKey", "testValue"), Instant.now()); + IndexRequest indexRequest = new IndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(detector.toXContent(XContentFactory.jsonBuilder(), RestHandlerUtils.XCONTENT_WITH_TYPE)); + IndexResponse indexResponse = client().index(indexRequest).actionGet(5_000); + assertEquals(RestStatus.CREATED, indexResponse.status()); + + PreviewAnomalyDetectorRequest request = new PreviewAnomalyDetectorRequest( + null, + indexResponse.getId(), + Instant.now(), + Instant.now() + ); + ActionListener previewResponse = new ActionListener() { + @Override + public void onResponse(PreviewAnomalyDetectorResponse response) { + try { + XContentBuilder previewBuilder = response.toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS); + Assert.assertNotNull(previewBuilder); + Map map = TestHelpers.XContentBuilderToMap(previewBuilder); + List results = (List) map.get("anomaly_result"); + Assert.assertNotNull(results); + Assert.assertTrue(results.size() > 0); + inProgressLatch.countDown(); + } catch (IOException e) { + // Should not reach here + Assert.assertTrue(false); + } + } + + @Override + public void onFailure(Exception e) { + // onFailure should not be called + Assert.assertTrue(false); + } + }; + doReturn(TestHelpers.randomThresholdingResults()).when(modelManager).getPreviewResults(any()); + + doAnswer(responseMock -> { + Long startTime = responseMock.getArgument(1); + ActionListener listener = responseMock.getArgument(3); + listener.onResponse(TestHelpers.randomFeatures()); + return null; + }).when(featureManager).getPreviewFeatures(anyObject(), anyLong(), anyLong(), any()); + action.doExecute(task, request, previewResponse); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } +}