Skip to content

Commit

Permalink
[Filebeat] Add parse_aws_vpc_flow_log processor (#33656)
Browse files Browse the repository at this point in the history
This is a processor for parsing AWS VPC flow logs. It requires a user specified log format. It can populate the original flow log fields, ECS fields, or both.

Usage:

```yaml
processors:
  - parse_aws_vpc_flow_log: 
      format: version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
  - community_id: ~
```

Benchmark:

```
goos: darwin
goarch: arm64
pkg: github.com/elastic/beats/v7/x-pack/filebeat/processors/aws_vpcflow
BenchmarkProcessorRun/original-mode-v5-message-10                2810948              2138 ns/op            2836 B/op         31 allocs/op
BenchmarkProcessorRun/ecs-mode-v5-message-10                     1914754              3107 ns/op            1908 B/op         41 allocs/op
BenchmarkProcessorRun/ecs_and_original-mode-v5-message-10        1693279              3538 ns/op            3076 B/op         41 allocs/op
```

Co-authored-by: Dan Kortschak <[email protected]>
  • Loading branch information
2 people authored and chrisberkhout committed Jun 1, 2023
1 parent 80f69de commit 50aedc8
Show file tree
Hide file tree
Showing 27 changed files with 3,343 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
Expand Down
1 change: 1 addition & 0 deletions auditbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:linux_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_script_processor:
:no_timestamp_processor:

Expand Down
1 change: 1 addition & 0 deletions heartbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:no_dashboards:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_timestamp_processor:

include::{libbeat-dir}/shared-beats-attributes.asciidoc[]
Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/processors-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ endif::[]
ifndef::no_rename_processor[]
* <<rename-fields,`rename`>>
endif::[]
ifndef::no_parse_aws_vpc_flow_log_processor[]
* <<processor-parse-aws-vpc-flow-log, `parse_aws_vpc_flow_log`>>
endif::[]
ifndef::no_script_processor[]
* <<processor-script,`script`>>
endif::[]
Expand Down Expand Up @@ -231,6 +234,9 @@ endif::[]
ifndef::no_rename_processor[]
include::{libbeat-processors-dir}/actions/docs/rename.asciidoc[]
endif::[]
ifndef::no_parse_aws_vpc_flow_log_processor[]
include::{x-filebeat-processors-dir}/aws_vpcflow/docs/parse_aws_vpc_flow_log.asciidoc[]
endif::[]
ifndef::no_script_processor[]
include::{libbeat-processors-dir}/script/docs/script.asciidoc[]
endif::[]
Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_timestamp_processor:

:kubernetes_default_indexers: {docdir}/kubernetes-default-indexers-matchers.asciidoc
Expand Down
1 change: 1 addition & 0 deletions packetbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_os:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:no_script_processor:
:no_timestamp_processor:

Expand Down
1 change: 1 addition & 0 deletions winlogbeat/docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ include::{asciidoc-dir}/../../shared/attributes.asciidoc[]
:win_only:
:no_decode_cef_processor:
:no_decode_csv_fields_processor:
:no_parse_aws_vpc_flow_log_processor:
:include_translate_sid_processor:
:export_pipeline:

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

140 changes: 140 additions & 0 deletions x-pack/filebeat/processors/aws_vpcflow/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws_vpcflow

import (
"errors"
"fmt"
"strings"
)

// mode represents the processing mode (original, ecs, ecs_and_original).
type mode uint8

const (
originalMode mode = iota // originalMode generates the fields specified in the format string.
ecsMode // ecsMode maps the original fields to ECS and removes the original field if it was mapped.
ecsAndOriginalMode // ecsAndOriginalMode maps the original fields to ECS and retains all the original fields.
)

var modeStrings = map[mode]string{
originalMode: "original",
ecsMode: "ecs",
ecsAndOriginalMode: "ecs_and_original",
}

func (m *mode) Unpack(s string) error {
for modeConst, modeStr := range modeStrings {
if strings.EqualFold(modeStr, s) {
*m = modeConst
return nil
}
}
return fmt.Errorf("invalid mode type %q for "+procName, s)
}

func (m *mode) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}
return m.Unpack(str)
}

func (m *mode) String() string {
if m == nil {
return "<nil>"
}
if s, found := modeStrings[*m]; found {
return s
}
return "unknown mode"
}

// config contains the configuration options for the processor.
type config struct {
Format formats `config:"format" validate:"required"` // VPC flow log format. In config, it can accept a string or list of strings. Each format must have a unique number of fields to enable matching it to a flow log message.
Mode mode `config:"mode"` // Mode controls what fields are generated.
Field string `config:"field"` // Source field containing the VPC flow log message.
TargetField string `config:"target_field"` // Target field for the VPC flow log object. This applies only to the original VPC flow log fields. ECS fields are written to the standard location.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures while parsing and transforming the flow log message.
ID string `config:"id"` // Instance ID for debugging purposes.
}

// Validate validates the format strings. Each format must have a unique number
// of fields.
func (c *config) Validate() error {
counts := map[int]struct{}{}
for _, format := range c.Format {
fields, err := parseFormat(format)
if err != nil {
return err
}

_, found := counts[len(fields)]
if found {
return fmt.Errorf("each format must have a unique number of fields")
}
counts[len(fields)] = struct{}{}
}
return nil
}

func defaultConfig() config {
return config{
Mode: ecsMode,
Field: "message",
TargetField: "aws.vpcflow",
}
}

// parseFormat parses VPC flow log format string and returns an ordered list of
// the expected fields.
func parseFormat(format string) ([]vpcFlowField, error) {
tokens := strings.Fields(format)
if len(tokens) == 0 {
return nil, errors.New("format must contain at lease one field")
}

fields := make([]vpcFlowField, 0, len(tokens))
for _, token := range tokens {
// Elastic uses underscores in field names rather than dashes.
underscoreToken := strings.ReplaceAll(token, "-", "_")

field, found := nameToFieldMap[underscoreToken]
if !found {
return nil, fmt.Errorf("unknown field %q", token)
}

fields = append(fields, field)
}

return fields, nil
}

type formats []string

func (f *formats) Unpack(value interface{}) error {
switch v := value.(type) {
case string:
*f = []string{v}
case []string:
*f = v
case []interface{}:
list := make([]string, 0, len(v))
for _, ifc := range v {
s, ok := ifc.(string)
if !ok {
return fmt.Errorf("format values must be strings, got %T", ifc)
}
list = append(list, s)
}
*f = list
default:
return fmt.Errorf("format must be a string or list of strings, got %T", v)
}
return nil
}
101 changes: 101 additions & 0 deletions x-pack/filebeat/processors/aws_vpcflow/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package aws_vpcflow

import (
"strconv"
"testing"

"github.com/stretchr/testify/require"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestConfigUnpack(t *testing.T) {
testCases := []struct {
yamlConfig string
error bool
}{
{
yamlConfig: `
---
mode: ecs_and_original
id: us-east-vpcflow
format: instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr
`,
},
{
yamlConfig: `
---
mode: original
format: version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status
`,
},
{
yamlConfig: `
---
mode: ecs
format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status
`,
},
{
yamlConfig: `
---
mode: ecs
format: version srcaddr dstaddr srcport dstport protocol start end type packets bytes account-id vpc-id subnet-id instance-id interface-id region az-id sublocation-type sublocation-id action tcp-flags pkt-srcaddr pkt-dstaddr pkt-src-aws-service pkt-dst-aws-service traffic-path flow-direction log-status
`,
},
{
error: true,
yamlConfig: `
---
mode: invalid
format: version
`,
},
{
error: false,
yamlConfig: `
---
mode: ecs
format:
- version srcaddr dstaddr
- version srcaddr dstaddr srcport dstport protocol
`,
},
{
// Each format must have a unique token count.
error: true,
yamlConfig: `
---
mode: ecs
format:
- version srcaddr dstaddr
- srcport dstport protocol
`,
},
}

for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
rawConfig := conf.MustNewConfigFrom(tc.yamlConfig)

c := defaultConfig()
err := rawConfig.Unpack(&c)
if tc.error {
require.Error(t, err, "config: %v", tc.yamlConfig)
t.Log("Error:", err)
return
}
require.NoError(t, err)

// Make sure valid configs produce processors.
p, err := New(rawConfig)
require.NoError(t, err)
require.NotNil(t, p)
})
}
}
Loading

0 comments on commit 50aedc8

Please sign in to comment.