Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] aws.vpcflow - use parse_aws_vpc_flow_log processor #33699

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Update `aws.vpcflow` dataset in AWS module have a configurable log `format` and to produce ECS 8.x fields. {pull}33699[33699]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ filebeat.modules:
# Configures the SSL settings, ie. set trusted CAs, ignore certificate verification....
#var.ssl:

# Specify a custom VPC flow log format.
#var.format:

#----------------------------- AWS Fargate Module -----------------------------
- module: awsfargate
log:
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/module/aws/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,6 @@

# Configures the SSL settings, ie. set trusted CAs, ignore certificate verification....
#var.ssl:

# Specify a custom VPC flow log format.
#var.format:
10 changes: 6 additions & 4 deletions x-pack/filebeat/module/aws/vpcflow/config/input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ ssl: {{ .ssl | tojson }}
{{ else if eq .input "file" }}

type: log
paths:
{{ range $i, $path := .paths }}
- {{$path}}
{{ end }}
paths: {{ .paths | tojson }}
exclude_files: [".gz$"]

{{ end }}
tags: {{.tags | tojson}}
publisher_pipeline.disable_host: {{ inList .tags "forwarded" }}

processors:
- parse_aws_vpc_flow_log:
format: {{ .format | tojson }}
- community_id: ~
232 changes: 23 additions & 209 deletions x-pack/filebeat/module/aws/vpcflow/ingest/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,171 +2,43 @@
description: Pipeline for AWS VPC Flow Logs

processors:
- drop:
if: 'ctx.message.startsWith("version") || ctx.message.startsWith("instance-id")'
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- set:
field: ecs.version
value: '1.12.0'
value: '8.0.0'
- rename:
field: message
target_field: event.original
ignore_missing: true
- set:
field: event.type
value: flow
field: event.kind
value: event
- set:
field: event.category
value: network_traffic
- drop:
if: 'ctx.event?.original.startsWith("version") || ctx.event?.original.startsWith("instance-id")'
- script:
lang: painless
if: ctx.event?.original != null
source: >-
ctx._temp_ = new HashMap();
ctx._temp_.message_token_count = ctx.event?.original.splitOnToken(" ").length;
- dissect:
field: event.original
pattern: '%{aws.vpcflow.version} %{aws.vpcflow.account_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.protocol} %{aws.vpcflow.packets} %{aws.vpcflow.bytes} %{aws.vpcflow.start} %{aws.vpcflow.end} %{aws.vpcflow.action} %{aws.vpcflow.log_status}'
if: ctx?._temp_?.message_token_count == 14
- dissect:
field: event.original
pattern: '%{aws.vpcflow.instance_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr}'
if: ctx?._temp_?.message_token_count == 6
- dissect:
field: event.original
pattern: '%{aws.vpcflow.version} %{aws.vpcflow.interface_id} %{aws.vpcflow.account_id} %{aws.vpcflow.vpc_id} %{aws.vpcflow.subnet_id} %{aws.vpcflow.instance_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.protocol} %{aws.vpcflow.tcp_flags} %{aws.vpcflow.type} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr} %{aws.vpcflow.action} %{aws.vpcflow.log_status}'
if: ctx?._temp_?.message_token_count == 17
- dissect:
field: event.original
pattern: '%{aws.vpcflow.version} %{aws.vpcflow.vpc_id} %{aws.vpcflow.subnet_id} %{aws.vpcflow.instance_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.account_id} %{aws.vpcflow.type} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr} %{aws.vpcflow.protocol} %{aws.vpcflow.bytes} %{aws.vpcflow.packets} %{aws.vpcflow.start} %{aws.vpcflow.end} %{aws.vpcflow.action} %{aws.vpcflow.tcp_flags} %{aws.vpcflow.log_status}'
if: ctx?._temp_?.message_token_count == 21

# Convert Unix epoch to timestamp
- date:
field: aws.vpcflow.end
target_field: '@timestamp'
ignore_failure: true
formats:
- UNIX
- date:
field: aws.vpcflow.start
target_field: event.start
ignore_failure: true
formats:
- UNIX
- date:
field: aws.vpcflow.end
target_field: event.end
ignore_failure: true
formats:
- UNIX
- remove:
field:
- aws.vpcflow.start
- aws.vpcflow.end
ignore_missing: true
- script:
lang: painless
ignore_failure: true
if: ctx.aws != null
source: >-
void handleMap(Map map) {
for (def x : map.values()) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
map.values().removeIf(v -> v instanceof String && v == "-");
}
void handleList(List list) {
for (def x : list) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
}
handleMap(ctx.aws);
value: [network]
- set:
field: event.outcome
value: allow
if: ctx.aws?.vpcflow?.action == "ACCEPT"
- set:
field: event.outcome
value: deny
if: ctx.aws?.vpcflow?.action == "REJECT"
- rename:
field: aws.vpcflow.srcaddr
target_field: source.address
ignore_missing: true
field: cloud.provider
value: aws

# parse_aws_vpc_flow_log removes the original values to avoid duplication
# but to avoid a breaking change continue to populate these fields.
- set:
field: source.ip
copy_from: source.address
if: ctx.source?.address != null
- convert:
field: aws.vpcflow.srcport
target_field: source.port
type: integer
ignore_missing: true
- rename:
field: aws.vpcflow.dstaddr
target_field: destination.address
ignore_missing: true
copy_from: cloud.account.id
field: aws.vpcflow.account_id
ignore_empty_value: true
- set:
field: destination.ip
copy_from: destination.address
if: ctx.destination?.address != null
- convert:
field: aws.vpcflow.dstport
target_field: destination.port
type: integer
copy_from: cloud.instance.id
field: aws.vpcflow.instance_id
ignore_empty_value: true
- uppercase:
field: event.action
target_field: aws.vpcflow.action
ignore_missing: true
- rename:
field: aws.vpcflow.protocol
target_field: network.iana_number
ignore_missing: true
- convert:
field: aws.vpcflow.packets
target_field: source.packets
type: long
ignore_missing: true
- convert:
field: aws.vpcflow.bytes
target_field: source.bytes
type: long
ignore_missing: true
- set:
field: network.bytes
copy_from: source.bytes
if: ctx.source?.bytes != null
- set:
field: network.packets
copy_from: source.packets
if: ctx.source?.packets != null
- set:
field: network.type
value: ipv4
if: 'ctx.source?.ip != null && ctx.source?.ip.contains(".")'
- set:
field: network.type
value: ipv6
if: 'ctx.source?.ip != null && ctx.source?.ip.contains(":")'
- set:
field: network.transport
value: tcp
if: ctx.network?.iana_number == "6"
- set:
field: network.transport
value: udp
if: ctx.network?.iana_number == "17"
- community_id:
target_field: network.community_id
ignore_failure: true

# IP Geolocation Lookup
- geoip:
field: source.ip
Expand All @@ -176,6 +48,7 @@ processors:
field: destination.ip
target_field: destination.geo
ignore_missing: true

# IP Autonomous System (AS) Lookup
- geoip:
database_file: GeoLite2-ASN.mmdb
Expand Down Expand Up @@ -209,66 +82,7 @@ processors:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
# Generate related.ip field
- append:
if: 'ctx.source?.ip != null && ctx.destination?.ip != null'
field: related.ip
value: ["{{source.ip}}", "{{destination.ip}}"]
- set:
field: cloud.provider
value: aws
- set:
if: ctx.aws?.vpcflow?.account_id != null
field: cloud.account.id
value: '{{aws.vpcflow.account_id}}'
- set:
if: 'ctx?.aws?.vpcflow?.instance_id != null && ctx.aws.vpcflow.instance_id != "-"'
field: cloud.instance.id
value: '{{aws.vpcflow.instance_id}}'
- set:
field: event.kind
value: event
- script:
lang: painless
ignore_failure: true
if: "ctx.aws?.vpcflow?.tcp_flags != null"
source: |
if (ctx.aws.vpcflow.tcp_flags_array == null) {
ArrayList al = new ArrayList();
ctx.aws.vpcflow.put("tcp_flags_array", al);
}

def flags = Integer.parseUnsignedInt(ctx.aws.vpcflow.tcp_flags);
if ((flags & 0x01) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('fin');
}
if ((flags & 0x02) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('syn');
}
if ((flags & 0x04) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('rst');
}
if ((flags & 0x08) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('psh');
}
if ((flags & 0x10) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('ack');
}
if ((flags & 0x20) != 0) {
ctx.aws.vpcflow.tcp_flags_array.add('urg');
}
- remove:
field:
- _temp_
- aws.vpcflow.srcaddr
- aws.vpcflow.srcport
- aws.vpcflow.dstaddr
- aws.vpcflow.dstport
- aws.vpcflow.bytes
- aws.vpcflow.packets
- aws.vpcflow.protocol
ignore_missing: true
- remove:
field: event.original
if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
Expand All @@ -277,4 +91,4 @@ processors:
on_failure:
- set:
field: 'error.message'
value: '{{ _ingest.on_failure_message }}'
value: '{{{ _ingest.on_failure_message }}}'
6 changes: 6 additions & 0 deletions x-pack/filebeat/module/aws/vpcflow/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ var:
- name: proxy_url
- name: max_number_of_messages
- name: ssl
- name: format
default:
- version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
- instance-id interface-id srcaddr dstaddr pkt-srcaddr pkt-dstaddr
- version interface-id account-id vpc-id subnet-id instance-id srcaddr dstaddr srcport dstport protocol tcp-flags type pkt-srcaddr pkt-dstaddr action log-status
- version vpc-id subnet-id instance-id interface-id account-id type srcaddr dstaddr srcport dstport pkt-srcaddr pkt-dstaddr protocol bytes packets start end action tcp-flags log-status

ingest_pipeline: ingest/pipeline.yml
input: config/input.yml
Loading