Skip to content

Commit

Permalink
aws.vpcflow - use parse_aws_vpc_flow_log processor (#33699)
Browse files Browse the repository at this point in the history
Update the aws.vpcflow dataset in the AWS module to use the parse_aws_vpc_flow_log processor.
I also updated the module to be aligned with ECS. It was not using valid event.category
and event.type values. It will now produce `event.category: [network]`. And `event.type: [connection]`
or `[connection, allowed]` or `[connection, denied]`.
  • Loading branch information
andrewkroh authored and chrisberkhout committed Jun 1, 2023
1 parent a0e5a8e commit 7692377
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 274 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Update `aws.vpcflow` dataset in AWS module have a configurable log `format` and to produce ECS 8.x fields. {pull}33699[33699]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ filebeat.modules:
# Configures the SSL settings, ie. set trusted CAs, ignore certificate verification....
#var.ssl:

# Specify a custom VPC flow log format.
#var.format:

#----------------------------- AWS Fargate Module -----------------------------
- module: awsfargate
log:
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/module/aws/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,6 @@

# Configures the SSL settings, ie. set trusted CAs, ignore certificate verification....
#var.ssl:

# Specify a custom VPC flow log format.
#var.format:
10 changes: 6 additions & 4 deletions x-pack/filebeat/module/aws/vpcflow/config/input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ ssl: {{ .ssl | tojson }}
{{ else if eq .input "file" }}

type: log
paths:
{{ range $i, $path := .paths }}
- {{$path}}
{{ end }}
paths: {{ .paths | tojson }}
exclude_files: [".gz$"]

{{ end }}
tags: {{.tags | tojson}}
publisher_pipeline.disable_host: {{ inList .tags "forwarded" }}

processors:
- parse_aws_vpc_flow_log:
format: {{ .format | tojson }}
- community_id: ~
232 changes: 23 additions & 209 deletions x-pack/filebeat/module/aws/vpcflow/ingest/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,171 +2,43 @@
description: Pipeline for AWS VPC Flow Logs

processors:
- drop:
if: 'ctx.message.startsWith("version") || ctx.message.startsWith("instance-id")'
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- set:
field: ecs.version
value: '1.12.0'
value: '8.0.0'
- rename:
field: message
target_field: event.original
ignore_missing: true
- set:
field: event.type
value: flow
field: event.kind
value: event
- set:
field: event.category
value: network_traffic
- drop:
if: 'ctx.event?.original.startsWith("version") || ctx.event?.original.startsWith("instance-id")'
- script:
lang: painless
if: ctx.event?.original != null
source: >-
ctx._temp_ = new HashMap();
ctx._temp_.message_token_count = ctx.event?.original.splitOnToken(" ").length;
- dissect:
field: event.original
pattern: '%{aws.vpcflow.version} %{aws.vpcflow.account_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.protocol} %{aws.vpcflow.packets} %{aws.vpcflow.bytes} %{aws.vpcflow.start} %{aws.vpcflow.end} %{aws.vpcflow.action} %{aws.vpcflow.log_status}'
if: ctx?._temp_?.message_token_count == 14
- dissect:
field: event.original
pattern: '%{aws.vpcflow.instance_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr}'
if: ctx?._temp_?.message_token_count == 6
- dissect:
field: event.original
pattern: '%{aws.vpcflow.version} %{aws.vpcflow.interface_id} %{aws.vpcflow.account_id} %{aws.vpcflow.vpc_id} %{aws.vpcflow.subnet_id} %{aws.vpcflow.instance_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.protocol} %{aws.vpcflow.tcp_flags} %{aws.vpcflow.type} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr} %{aws.vpcflow.action} %{aws.vpcflow.log_status}'
if: ctx?._temp_?.message_token_count == 17
- dissect:
field: event.original
pattern: '%{aws.vpcflow.version} %{aws.vpcflow.vpc_id} %{aws.vpcflow.subnet_id} %{aws.vpcflow.instance_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.account_id} %{aws.vpcflow.type} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr} %{aws.vpcflow.protocol} %{aws.vpcflow.bytes} %{aws.vpcflow.packets} %{aws.vpcflow.start} %{aws.vpcflow.end} %{aws.vpcflow.action} %{aws.vpcflow.tcp_flags} %{aws.vpcflow.log_status}'
if: ctx?._temp_?.message_token_count == 21

# Convert Unix epoch to timestamp
- date:
field: aws.vpcflow.end
target_field: '@timestamp'
ignore_failure: true
formats:
- UNIX
- date:
field: aws.vpcflow.start
target_field: event.start
ignore_failure: true
formats:
- UNIX
- date:
field: aws.vpcflow.end
target_field: event.end
ignore_failure: true
formats:
- UNIX
- remove:
field:
- aws.vpcflow.start
- aws.vpcflow.end
ignore_missing: true
- script:
lang: painless
ignore_failure: true
if: ctx.aws != null
source: >-
void handleMap(Map map) {
for (def x : map.values()) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
map.values().removeIf(v -> v instanceof String && v == "-");
}
void handleList(List list) {
for (def x : list) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
}
handleMap(ctx.aws);
value: [network]
- set:
field: event.outcome
value: allow
if: ctx.aws?.vpcflow?.action == "ACCEPT"
- set:
field: event.outcome
value: deny
if: ctx.aws?.vpcflow?.action == "REJECT"
- rename:
field: aws.vpcflow.srcaddr
target_field: source.address
ignore_missing: true
field: cloud.provider
value: aws

# parse_aws_vpc_flow_log removes the original values to avoid duplication
# but to avoid a breaking change continue to populate these fields.
- set:
field: source.ip
copy_from: source.address
if: ctx.source?.address != null
- convert:
field: aws.vpcflow.srcport
target_field: source.port
type: integer
ignore_missing: true
- rename:
field: aws.vpcflow.dstaddr
target_field: destination.address
ignore_missing: true
copy_from: cloud.account.id
field: aws.vpcflow.account_id
ignore_empty_value: true
- set:
field: destination.ip
copy_from: destination.address
if: ctx.destination?.address != null
- convert:
field: aws.vpcflow.dstport
target_field: destination.port
type: integer
copy_from: cloud.instance.id
field: aws.vpcflow.instance_id
ignore_empty_value: true
- uppercase:
field: event.action
target_field: aws.vpcflow.action
ignore_missing: true
- rename:
field: aws.vpcflow.protocol
target_field: network.iana_number
ignore_missing: true
- convert:
field: aws.vpcflow.packets
target_field: source.packets
type: long
ignore_missing: true
- convert:
field: aws.vpcflow.bytes
target_field: source.bytes
type: long
ignore_missing: true
- set:
field: network.bytes
copy_from: source.bytes
if: ctx.source?.bytes != null
- set:
field: network.packets
copy_from: source.packets
if: ctx.source?.packets != null
- set:
field: network.type
value: ipv4
if: 'ctx.source?.ip != null && ctx.source?.ip.contains(".")'
- set:
field: network.type
value: ipv6
if: 'ctx.source?.ip != null && ctx.source?.ip.contains(":")'
- set:
field: network.transport
value: tcp
if: ctx.network?.iana_number == "6"
- set:
field: network.transport
value: udp
if: ctx.network?.iana_number == "17"
- community_id:
target_field: network.community_id
ignore_failure: true

# IP Geolocation Lookup
- geoip:
field: source.ip
Expand All @@ -176,6 +48,7 @@ processors:
field: destination.ip
target_field: destination.geo
ignore_missing: true

# IP Autonomous System (AS) Lookup
- geoip:
database_file: GeoLite2-ASN.mmdb
Expand Down Expand Up @@ -209,66 +82,7 @@ processors:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
# Generate related.ip field
- append:
if: 'ctx.source?.ip != null && ctx.destination?.ip != null'
field: related.ip
value: ["{{source.ip}}", "{{destination.ip}}"]
- set:
field: cloud.provider
value: aws
- set:
if: ctx.aws?.vpcflow?.account_id != null
field: cloud.account.id
value: '{{aws.vpcflow.account_id}}'
- set:
if: 'ctx?.aws?.vpcflow?.instance_id != null && ctx.aws.vpcflow.instance_id != "-"'
field: cloud.instance.id
value: '{{aws.vpcflow.instance_id}}'
- set:
field: event.kind
value: event
- script:
lang: painless
ignore_failure: true
if: "ctx.aws?.vpcflow?.tcp_flags != null"
source: |
if (ctx.aws.vpcflow.tcp_flags_array == null) {
ArrayList al = new ArrayList();
ctx.aws.vpcflow.put("tcp_flags_array", al);
}

def flags = Integer.parseUnsignedInt(ctx.aws.vpcflow.tcp_flags);
if ((flags & 0x01) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('fin');
}
if ((flags & 0x02) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('syn');
}
if ((flags & 0x04) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('rst');
}
if ((flags & 0x08) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('psh');
}
if ((flags & 0x10) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('ack');
}
if ((flags & 0x20) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('urg');
}
- remove:
field:
- _temp_
- aws.vpcflow.srcaddr
- aws.vpcflow.srcport
- aws.vpcflow.dstaddr
- aws.vpcflow.dstport
- aws.vpcflow.bytes
- aws.vpcflow.packets
- aws.vpcflow.protocol
ignore_missing: true
- remove:
field: event.original
if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
Expand All @@ -277,4 +91,4 @@ processors:
on_failure:
- set:
field: 'error.message'
value: '{{ _ingest.on_failure_message }}'
value: '{{{ _ingest.on_failure_message }}}'
6 changes: 6 additions & 0 deletions x-pack/filebeat/module/aws/vpcflow/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ var:
- name: proxy_url
- name: max_number_of_messages
- name: ssl
- name: format
default:
- version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
- instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr
- version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status
- version vpc-id subnet-id instance-id interface-id account-id type srcaddr dstaddr srcport dstport pkt-srcaddr pkt-dstaddr protocol bytes packets start end action tcp-flags log-status

ingest_pipeline: ingest/pipeline.yml
input: config/input.yml
Loading

0 comments on commit 7692377

Please sign in to comment.