diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt new file mode 100644 index 000000000..e10b1a41c --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.elasticapi.instant +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException +import java.time.Instant + +/** + * A wrapper of the log event that enriches the event by also including information about the monitor it triggered. + */ +class Finding( + val id: String = NO_ID, + val logEvent: Map, + val monitorId: String, + val monitorName: String, + val queryId: String = NO_ID, + val queryTags: List, + val severity: String, + val timestamp: Instant, + val triggerId: String, + val triggerName: String +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + logEvent = suppressWarning(sin.readMap()), + monitorId = sin.readString(), + monitorName = sin.readString(), + queryId = sin.readString(), + queryTags = sin.readStringList(), + severity = sin.readString(), + timestamp = sin.readInstant(), + triggerId = sin.readString(), + triggerName = sin.readString() + ) + + fun asTemplateArg(): Map { + return mapOf( + FINDING_ID_FIELD to id, + LOG_EVENT_FIELD to logEvent, + MONITOR_ID_FIELD to monitorId, + MONITOR_NAME_FIELD to monitorName, + QUERY_ID_FIELD to queryId, + QUERY_TAGS_FIELD to queryTags, + SEVERITY_FIELD to severity, + TIMESTAMP_FIELD to timestamp.toEpochMilli(), + TRIGGER_ID_FIELD to triggerId, + TRIGGER_NAME_FIELD to triggerName + ) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(FINDING_ID_FIELD, id) + .field(LOG_EVENT_FIELD, logEvent) + .field(MONITOR_ID_FIELD, monitorId) + .field(MONITOR_NAME_FIELD, monitorName) + .field(QUERY_ID_FIELD, queryId) + .field(QUERY_TAGS_FIELD, queryTags.toTypedArray()) + .field(SEVERITY_FIELD, severity) + .field(TIMESTAMP_FIELD, timestamp) + .field(TRIGGER_ID_FIELD, triggerId) + .field(TRIGGER_NAME_FIELD, triggerName) + builder.endObject() + return builder + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeMap(logEvent) + out.writeString(monitorId) + out.writeString(monitorName) + out.writeString(queryId) + out.writeStringCollection(queryTags) + out.writeString(severity) + out.writeInstant(timestamp) + out.writeString(triggerId) + out.writeString(triggerName) + } + + companion object { + const val FINDING_ID_FIELD = "id" + const val LOG_EVENT_FIELD = "log_event" + const val MONITOR_ID_FIELD = "monitor_id" + const val MONITOR_NAME_FIELD = "monitor_name" + const val QUERY_ID_FIELD = "query_id" + const val QUERY_TAGS_FIELD = "query_tags" + const val SEVERITY_FIELD = "severity" + const val TIMESTAMP_FIELD = "timestamp" + const val TRIGGER_ID_FIELD = "trigger_id" + const val TRIGGER_NAME_FIELD = "trigger_name" + const val NO_ID = "" + + @JvmStatic @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, id: String = NO_ID): Finding { + var logEvent: Map = mapOf() + lateinit var monitorId: String + lateinit var monitorName: String + var queryId: String = NO_ID + val queryTags: MutableList = mutableListOf() + lateinit var severity: String + lateinit var timestamp: Instant + lateinit var triggerId: String + lateinit var triggerName: String + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + LOG_EVENT_FIELD -> logEvent = xcp.map() + MONITOR_ID_FIELD -> monitorId = xcp.text() + MONITOR_NAME_FIELD -> monitorName = xcp.text() + QUERY_ID_FIELD -> queryId = xcp.text() + QUERY_TAGS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + queryTags.add(xcp.text()) + } + } + SEVERITY_FIELD -> severity = xcp.text() + TIMESTAMP_FIELD -> timestamp = requireNotNull(xcp.instant()) + TRIGGER_ID_FIELD -> triggerId = xcp.text() + TRIGGER_NAME_FIELD -> triggerName = xcp.text() + } + } + + return Finding( + id = id, + logEvent = logEvent, + monitorId = monitorId, + monitorName = monitorName, + queryId = queryId, + queryTags = queryTags, + severity = severity, + timestamp = timestamp, + triggerId = triggerId, + triggerName = triggerName + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Finding { + return Finding(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelMonitorInput.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelMonitorInput.kt new file mode 100644 index 000000000..a1ac19a46 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelMonitorInput.kt @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model.docLevelInput + +import org.opensearch.alerting.core.model.Input +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException + +data class DocLevelMonitorInput( + val description: String = NO_DESCRIPTION, + val indices: List, + val queries: List +) : Input { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // description + sin.readStringList(), // indices + sin.readList(::DocLevelQuery) // docLevelQueries + ) + + fun asTemplateArg(): Map { + return mapOf( + DESCRIPTION_FIELD to description, + INDICES_FIELD to indices, + QUERIES_FIELD to queries.map { it.asTemplateArg() } + ) + } + + override fun name(): String { + return DOC_LEVEL_INPUT_FIELD + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(description) + out.writeStringCollection(indices) + out.writeCollection(queries) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(DOC_LEVEL_INPUT_FIELD) + .field(DESCRIPTION_FIELD, description) + .field(INDICES_FIELD, indices.toTypedArray()) + .field(QUERIES_FIELD, queries.toTypedArray()) + .endObject() + .endObject() + return builder + } + + companion object { + const val DESCRIPTION_FIELD = "description" + const val INDICES_FIELD = "indices" + const val DOC_LEVEL_INPUT_FIELD = "doc_level_input" + const val QUERIES_FIELD = "queries" + + const val NO_DESCRIPTION = "" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(DOC_LEVEL_INPUT_FIELD), CheckedFunction { parse(it) }) + + @JvmStatic @Throws(IOException::class) + fun parse(xcp: XContentParser): DocLevelMonitorInput { + var description: String = NO_DESCRIPTION + val indices: MutableList = mutableListOf() + val docLevelQueries: MutableList = mutableListOf() + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + DESCRIPTION_FIELD -> description = xcp.text() + INDICES_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + indices.add(xcp.text()) + } + } + QUERIES_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + docLevelQueries.add(DocLevelQuery.parse(xcp)) + } + } + } + } + + return DocLevelMonitorInput(description = description, indices = indices, queries = docLevelQueries) + } + + @JvmStatic @Throws(IOException::class) + fun readFrom(sin: StreamInput): DocLevelMonitorInput { + return DocLevelMonitorInput(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelQuery.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelQuery.kt new file mode 100644 index 000000000..f1d0cf367 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/docLevelInput/DocLevelQuery.kt @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model.docLevelInput + +import org.opensearch.alerting.model.action.Action +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException + +data class DocLevelQuery( + val id: String = NO_ID, + val query: String, + val severity: String, + val tags: List = mutableListOf(), + val actions: List = mutableListOf() +) : Writeable, ToXContentObject { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // query + sin.readString(), // severity + sin.readStringList(), // tags + sin.readList(::Action) // actions + ) + + fun asTemplateArg(): Map { + return mapOf( + QUERY_ID_FIELD to id, + QUERY_FIELD to query, + SEVERITY_FIELD to severity, + TAGS_FIELD to tags, + ACTIONS_FIELD to actions.map { it.asTemplateArg() } + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(query) + out.writeString(severity) + out.writeStringCollection(tags) + out.writeCollection(actions) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(QUERY_ID_FIELD, id) + .field(QUERY_FIELD, query) + .field(SEVERITY_FIELD, severity) + .field(TAGS_FIELD, tags.toTypedArray()) + .field(ACTIONS_FIELD, actions.toTypedArray()) + .endObject() + return builder + } + + companion object { + const val QUERY_ID_FIELD = "id" + const val QUERY_FIELD = "query" + const val SEVERITY_FIELD = "severity" + const val TAGS_FIELD = "tags" + const val ACTIONS_FIELD = "actions" + + const val NO_ID = "" + + @JvmStatic @Throws(IOException::class) + fun parse(xcp: XContentParser): DocLevelQuery { + var id: String = NO_ID + lateinit var query: String + lateinit var severity: String + val tags: MutableList = mutableListOf() + val actions: MutableList = mutableListOf() + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + QUERY_ID_FIELD -> id = xcp.text() + QUERY_FIELD -> query = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + TAGS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + tags.add(xcp.text()) + } + } + ACTIONS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + } + } + + return DocLevelQuery( + id = id, + query = query, + severity = severity, + tags = tags, + actions = actions + ) + } + + @JvmStatic @Throws(IOException::class) + fun readFrom(sin: StreamInput): DocLevelMonitorInput { + return DocLevelMonitorInput(sin) + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 830219f8a..40cc162ab 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -2,6 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ + package org.opensearch.alerting import junit.framework.TestCase.assertNull @@ -20,6 +21,7 @@ import org.opensearch.alerting.model.AggregationResultBucket import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.BucketLevelTriggerRunResult +import org.opensearch.alerting.model.Finding import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.MonitorRunResult @@ -36,6 +38,8 @@ import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailEntry import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput +import org.opensearch.alerting.model.docLevelInput.DocLevelQuery import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.client.Request import org.opensearch.client.RequestOptions @@ -270,6 +274,52 @@ fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { ) } +fun randomDocLevelQuery( + id: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + query: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + severity: String = "${randomInt(5)}", + tags: List = mutableListOf(0..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) }, + actions: List = mutableListOf(0..randomInt(10)).map { randomAction() } +): DocLevelQuery { + return DocLevelQuery(id = id, query = query, severity = severity, tags = tags, actions = actions) +} + +fun randomDocLevelMonitorInput( + description: String = OpenSearchRestTestCase.randomAlphaOfLength(randomInt(10)), + indices: List = listOf(1..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) }, + queries: List = listOf(1..randomInt(10)).map { randomDocLevelQuery() } +): DocLevelMonitorInput { + return DocLevelMonitorInput(description = description, indices = indices, queries = queries) +} + +fun randomFinding( + id: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + logEvent: Map = mapOf( + OpenSearchRestTestCase.randomAlphaOfLength(5) to OpenSearchRestTestCase.randomAlphaOfLength(5) + ), + monitorId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + monitorName: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + queryId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + queryTags: MutableList = mutableListOf(), + severity: String = "${randomInt(5)}", + timestamp: Instant = Instant.now(), + triggerId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + triggerName: String = OpenSearchRestTestCase.randomAlphaOfLength(10) +): Finding { + return Finding( + id = id, + logEvent = logEvent, + monitorId = monitorId, + monitorName = monitorName, + queryId = queryId, + queryTags = queryTags, + severity = severity, + timestamp = timestamp, + triggerId = triggerId, + triggerName = triggerName + ) +} + fun randomAlertWithAggregationResultBucket(monitor: Monitor = randomBucketLevelMonitor()): Alert { val trigger = randomBucketLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt new file mode 100644 index 000000000..7f85b1895 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/DocLevelMonitorInputTests.kt @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.model.action.Action +import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput +import org.opensearch.alerting.model.docLevelInput.DocLevelQuery +import org.opensearch.alerting.randomDocLevelMonitorInput +import org.opensearch.alerting.randomDocLevelQuery +import org.opensearch.test.OpenSearchTestCase + +class DocLevelMonitorInputTests : OpenSearchTestCase() { + fun `testing DocLevelQuery asTemplateArgs`() { + // GIVEN + val query = randomDocLevelQuery() + + // WHEN + val templateArgs = query.asTemplateArg() + + // THEN + assertEquals("Template args 'id' field does not match:", templateArgs[DocLevelQuery.QUERY_ID_FIELD], query.id) + assertEquals("Template args 'query' field does not match:", templateArgs[DocLevelQuery.QUERY_FIELD], query.query) + assertEquals("Template args 'severity' field does not match:", templateArgs[DocLevelQuery.SEVERITY_FIELD], query.severity) + assertEquals("Template args 'tags' field does not match:", templateArgs[DocLevelQuery.TAGS_FIELD], query.tags) + + val expectedActions = query.actions.map { mapOf(Action.NAME_FIELD to it.name) } + assertEquals("Template args 'actions' field does not match:", templateArgs[DocLevelQuery.ACTIONS_FIELD], expectedActions) + } + + fun `testing DocLevelMonitorInput asTemplateArgs`() { + // GIVEN + val input = randomDocLevelMonitorInput() + + // WHEN + val templateArgs = input.asTemplateArg() + + // THEN + assertEquals("Template args 'description' field does not match:", templateArgs[DocLevelMonitorInput.DESCRIPTION_FIELD], input.description) + assertEquals("Template args 'indices' field does not match:", templateArgs[DocLevelMonitorInput.INDICES_FIELD], input.indices) + assertEquals("Template args 'queries' field does not contain the expected number of queries:", input.queries.size, (templateArgs[DocLevelMonitorInput.QUERIES_FIELD] as List<*>).size) + input.queries.forEach { + assertTrue("Template args 'queries' field does not match:", (templateArgs[DocLevelMonitorInput.QUERIES_FIELD] as List<*>).contains(it.asTemplateArg())) + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt new file mode 100644 index 000000000..4e0cde443 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.randomFinding +import org.opensearch.test.OpenSearchTestCase + +class FindingTests : OpenSearchTestCase() { + fun `test finding asTemplateArgs`() { + // GIVEN + val finding = randomFinding() + + // WHEN + val templateArgs = finding.asTemplateArg() + + // THEN + assertEquals("Template args 'id' field does not match:", templateArgs[Finding.FINDING_ID_FIELD], finding.id) + assertEquals("Template args 'logEvent' field does not match:", templateArgs[Finding.LOG_EVENT_FIELD], finding.logEvent) + assertEquals("Template args 'monitorId' field does not match:", templateArgs[Finding.MONITOR_ID_FIELD], finding.monitorId) + assertEquals("Template args 'monitorName' field does not match:", templateArgs[Finding.MONITOR_NAME_FIELD], finding.monitorName) + assertEquals("Template args 'queryId' field does not match:", templateArgs[Finding.QUERY_ID_FIELD], finding.queryId) + assertEquals("Template args 'queryTags' field does not match:", templateArgs[Finding.QUERY_TAGS_FIELD], finding.queryTags) + assertEquals("Template args 'severity' field does not match:", templateArgs[Finding.SEVERITY_FIELD], finding.severity) + assertEquals("Template args 'timestamp' field does not match:", templateArgs[Finding.TIMESTAMP_FIELD], finding.timestamp.toEpochMilli()) + assertEquals("Template args 'triggerId' field does not match:", templateArgs[Finding.TRIGGER_ID_FIELD], finding.triggerId) + assertEquals("Template args 'triggerName' field does not match:", templateArgs[Finding.TRIGGER_NAME_FIELD], finding.triggerName) + } +}