Skip to content

Commit

Permalink
Add correlation rule layer for events-correlation-engine (opensearch-…
Browse files Browse the repository at this point in the history
…project#7132)

Adds new correlation engine feature by way of a new 
`:plugins:events-correlation-engine` plugin. The endpoint is /_correlation 
and users can create new rules using the following example DSL:

{
   "name": "s3 to app logs",
   "correlate": [
     {
       "index": "s3_access_logs",
       "query": "aws.cloudtrail.eventName:ReplicateObject",
       "timestampField": "@timestamp",
       "tags": [
         "s3"
      ]
    }
  ]
}

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored and austintlee committed Apr 28, 2023
1 parent 492c855 commit ad509e8
Show file tree
Hide file tree
Showing 26 changed files with 1,905 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847))
- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
1 change: 1 addition & 0 deletions gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ configure([
configure([
project(":libs:opensearch-common"),
project(":libs:opensearch-core"),
project(":plugins:events-correlation-engine"),
project(":server")
]) {
project.tasks.withType(MissingJavadocTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,6 @@ public static class CommonFields {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField MISSING = new ParseField("missing");
public static final ParseField TIME_ZONE = new ParseField("time_zone");
public static final ParseField _META = new ParseField("_meta");
}
}
21 changes: 21 additions & 0 deletions plugins/events-correlation-engine/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.java-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'OpenSearch Events Correlation Engine.'
classname 'org.opensearch.plugin.correlation.EventsCorrelationPlugin'
}

dependencies {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation;

import org.apache.lucene.search.join.ScoreMode;
import org.junit.Assert;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.index.query.NestedQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleRequest;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleResponse;
import org.opensearch.plugin.correlation.rules.model.CorrelationQuery;
import org.opensearch.plugin.correlation.rules.model.CorrelationRule;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Transport Action tests for events-correlation-plugin
*/
public class EventsCorrelationPluginTransportIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(EventsCorrelationPlugin.class);
}

/**
* test events-correlation-plugin is installed
*/
public void testPluginsAreInstalled() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
.stream()
.flatMap(
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
)
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream()
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.correlation.EventsCorrelationPlugin"))
);
}

/**
* test creating a correlation rule
* @throws Exception Exception
*/
public void testCreatingACorrelationRule() throws Exception {
List<CorrelationQuery> correlationQueries = Arrays.asList(
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "@timestamp", List.of("s3")),
new CorrelationQuery("app_logs", "keywords:PermissionDenied", "@timestamp", List.of("others_application"))
);
CorrelationRule correlationRule = new CorrelationRule("s3 to app logs", correlationQueries);
IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST);

IndexCorrelationRuleResponse response = client().execute(IndexCorrelationRuleAction.INSTANCE, request).get();
Assert.assertEquals(RestStatus.CREATED, response.getStatus());

NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
"correlate",
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
ScoreMode.None
);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
searchSourceBuilder.fetchSource(true);

SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = client().search(searchRequest).get();
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
}

/**
* test filtering correlation rules
* @throws Exception Exception
*/
public void testFilteringCorrelationRules() throws Exception {
List<CorrelationQuery> correlationQueries1 = Arrays.asList(
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "@timestamp", List.of("s3")),
new CorrelationQuery("app_logs", "keywords:PermissionDenied", "@timestamp", List.of("others_application"))
);
CorrelationRule correlationRule1 = new CorrelationRule("s3 to app logs", correlationQueries1);
IndexCorrelationRuleRequest request1 = new IndexCorrelationRuleRequest(correlationRule1, RestRequest.Method.POST);
client().execute(IndexCorrelationRuleAction.INSTANCE, request1).get();

List<CorrelationQuery> correlationQueries2 = Arrays.asList(
new CorrelationQuery("windows", "host.hostname:EC2AMAZ*", "@timestamp", List.of("windows")),
new CorrelationQuery("app_logs", "endpoint:/customer_records.txt", "@timestamp", List.of("others_application"))
);
CorrelationRule correlationRule2 = new CorrelationRule("windows to app logs", correlationQueries2);
IndexCorrelationRuleRequest request2 = new IndexCorrelationRuleRequest(correlationRule2, RestRequest.Method.POST);
client().execute(IndexCorrelationRuleAction.INSTANCE, request2).get();

NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
"correlate",
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
ScoreMode.None
);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
searchSourceBuilder.fetchSource(true);

SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = client().search(searchRequest).get();
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
}

/**
* test creating a correlation rule with no timestamp field
* @throws Exception Exception
*/
@SuppressWarnings("unchecked")
public void testCreatingACorrelationRuleWithNoTimestampField() throws Exception {
List<CorrelationQuery> correlationQueries = Arrays.asList(
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", null, List.of("s3")),
new CorrelationQuery("app_logs", "keywords:PermissionDenied", null, List.of("others_application"))
);
CorrelationRule correlationRule = new CorrelationRule("s3 to app logs", correlationQueries);
IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST);

IndexCorrelationRuleResponse response = client().execute(IndexCorrelationRuleAction.INSTANCE, request).get();
Assert.assertEquals(RestStatus.CREATED, response.getStatus());

NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
"correlate",
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
ScoreMode.None
);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
searchSourceBuilder.fetchSource(true);

SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = client().search(searchRequest).get();
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
Assert.assertEquals(
"_timestamp",
((List<Map<String, Object>>) (searchResponse.getHits().getHits()[0].getSourceAsMap().get("correlate"))).get(0)
.get("timestampField")
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation;

import org.junit.Assert;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.test.rest.OpenSearchRestTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Rest Action tests for events-correlation-plugin
*/
public class EventsCorrelationPluginRestIT extends OpenSearchRestTestCase {

/**
* test events-correlation-plugin is installed
* @throws IOException IOException
*/
@SuppressWarnings("unchecked")
public void testPluginsAreInstalled() throws IOException {
Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json");
Response response = client().performRequest(request);
List<Object> pluginsList = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).list();
Assert.assertTrue(
pluginsList.stream()
.map(o -> (Map<String, Object>) o)
.anyMatch(plugin -> plugin.get("component").equals("events-correlation-engine"))
);
}

/**
* test creating a correlation rule
* @throws IOException IOException
*/
public void testCreatingACorrelationRule() throws IOException {
Request request = new Request("POST", "/_correlation/rules");
request.setJsonEntity(sampleCorrelationRule());
Response response = client().performRequest(request);

Assert.assertEquals(201, response.getStatusLine().getStatusCode());

Map<String, Object> responseMap = entityAsMap(response);
String id = responseMap.get("_id").toString();

request = new Request("POST", "/.opensearch-correlation-rules-config/_search");
request.setJsonEntity(matchIdQuery(id));
response = client().performRequest(request);

Assert.assertEquals(200, response.getStatusLine().getStatusCode());
SearchResponse searchResponse = SearchResponse.fromXContent(
createParser(JsonXContent.jsonXContent, response.getEntity().getContent())
);
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
}

/**
* test creating a correlation rule with no timestamp field
* @throws IOException IOException
*/
@SuppressWarnings("unchecked")
public void testCreatingACorrelationRuleWithNoTimestampField() throws IOException {
Request request = new Request("POST", "/_correlation/rules");
request.setJsonEntity(sampleCorrelationRuleWithNoTimestamp());
Response response = client().performRequest(request);

Assert.assertEquals(201, response.getStatusLine().getStatusCode());

Map<String, Object> responseMap = entityAsMap(response);
String id = responseMap.get("_id").toString();

request = new Request("POST", "/.opensearch-correlation-rules-config/_search");
request.setJsonEntity(matchIdQuery(id));
response = client().performRequest(request);

Assert.assertEquals(200, response.getStatusLine().getStatusCode());
SearchResponse searchResponse = SearchResponse.fromXContent(
createParser(JsonXContent.jsonXContent, response.getEntity().getContent())
);
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
Assert.assertEquals(
"_timestamp",
((List<Map<String, Object>>) (searchResponse.getHits().getHits()[0].getSourceAsMap().get("correlate"))).get(0)
.get("timestampField")
);
}

private String sampleCorrelationRule() {
return "{\n"
+ " \"name\": \"s3 to app logs\",\n"
+ " \"correlate\": [\n"
+ " {\n"
+ " \"index\": \"s3_access_logs\",\n"
+ " \"query\": \"aws.cloudtrail.eventName:ReplicateObject\",\n"
+ " \"timestampField\": \"@timestamp\",\n"
+ " \"tags\": [\n"
+ " \"s3\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"index\": \"app_logs\",\n"
+ " \"query\": \"keywords:PermissionDenied\",\n"
+ " \"timestampField\": \"@timestamp\",\n"
+ " \"tags\": [\n"
+ " \"others_application\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}";
}

private String sampleCorrelationRuleWithNoTimestamp() {
return "{\n"
+ " \"name\": \"s3 to app logs\",\n"
+ " \"correlate\": [\n"
+ " {\n"
+ " \"index\": \"s3_access_logs\",\n"
+ " \"query\": \"aws.cloudtrail.eventName:ReplicateObject\",\n"
+ " \"tags\": [\n"
+ " \"s3\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"index\": \"app_logs\",\n"
+ " \"query\": \"keywords:PermissionDenied\",\n"
+ " \"tags\": [\n"
+ " \"others_application\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}";
}

private String matchIdQuery(String id) {
return "{\n" + " \"query\" : {\n" + " \"match\":{\n" + " \"_id\": \"" + id + "\"\n" + " }\n" + " }\n" + "}";
}
}
Loading

0 comments on commit ad509e8

Please sign in to comment.