Skip to content

Commit

Permalink
Add support for Cloudtrail digest files
Browse files Browse the repository at this point in the history
- allow file matching with file_selectors in s3 input
- update cloudtrail pipeline
- update cloudtrail config to use file_selectors
- add cloudtrail digest fields

Closes elastic#20943
  • Loading branch information
leehinman committed Sep 14, 2020
1 parent af6222d commit 6a7c864
Show file tree
Hide file tree
Showing 17 changed files with 546 additions and 36 deletions.
105 changes: 105 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,111 @@ type: flattened

--

[float]
=== digest

Fields from Cloudtrail Digest Logs


*`aws.cloudtrail.digest.log_files`*::
+
--
A list of Logfiles contained in the digest

type: nested

--

*`aws.cloudtrail.digest.start_time`*::
+
--
The starting UTC time range that the digest file covers, taking as a reference the time in which log files have been delivered by CloudTrail.

type: date

--

*`aws.cloudtrail.digest.end_time`*::
+
--
The ending UTC time range that the digest file covers, taking as a reference the time in which log files have been delivered by CloudTrail.

type: date

--

*`aws.cloudtrail.digest.s3_bucket`*::
+
--
The name of the Amazon S3 bucket to which the current digest file has been delivered.

type: keyword

--

*`aws.cloudtrail.digest.s3_object`*::
+
--
The Amazon S3 object key (that is, the Amazon S3 bucket location) of the current digest file.

type: keyword

--

*`aws.cloudtrail.digest.newest_event_time`*::
+
--
The UTC time of the most recent event among all of the events in the log files in the digest.

type: date

--

*`aws.cloudtrail.digest.oldest_event_time`*::
+
--
The UTC time of the oldest event among all of the events in the log files in the digest.

type: date

--

*`aws.cloudtrail.digest.previous_s3_bucket`*::
+
--
The Amazon S3 bucket to which the previous digest file was delivered.

type: keyword

--

*`aws.cloudtrail.digest.previous_hash_algorithm`*::
+
--
The name of the hash algorithm that was used to hash the previous digest file.

type: keyword

--

*`aws.cloudtrail.digest.public_key_fingerprint`*::
+
--
The hexadecimal encoded fingerprint of the public key that matches the private key used to sign this digest file.

type: keyword

--

*`aws.cloudtrail.digest.signature_algorithm`*::
+
--
The algorithm used to sign the digest file.

type: keyword

--

[float]
=== cloudwatch

Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ type will not be checked.
If a file has "application/json" content-type, `expand_event_list_from_field`
becomes required to read the json file.

[float]
==== `file_selectors`

If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn't process `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of `regex` and `expand_event_list_from_field` options. The
`regex` should match the S3 object key in the SQS message, and the
optional `expand_event_list_from_field` is the same as the global
setting. Regex syntax is the same as the Go language. Files that
don't match one of the regexes won't be processed.

["source", "yml"]
----
file_selectors:
- regex: '^AWSLogs/\d+/CloudTrail/'
expand_event_list_from_field: 'Records'
- regex: '^AWSLogs/\d+/CloudTrail-Digest'
```
----


[float]
==== `api_timeout`

Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false

# Process CloudTrail Digest logs
# default true, set to false to skip CloudTrail Digest logs
# var.process_digest_logs: false

# Process CloudTrail Insight logs
# default true, set to false to skip CloudTrail Insight logs
# var.process_insight_logs: false

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
16 changes: 16 additions & 0 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package s3

import (
"fmt"
"regexp"
"time"

"github.com/elastic/beats/v7/filebeat/harvester"
Expand All @@ -19,6 +20,14 @@ type config struct {
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
APITimeout time.Duration `config:"api_timeout"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
type FileSelectorCfg struct {
RegexString string `config:"regex"`
Regex *regexp.Regexp `config:",ignore"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

func defaultConfig() config {
Expand All @@ -40,5 +49,12 @@ func (c *config) Validate() error {
return fmt.Errorf("api timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.APITimeout)
}
for i := range c.FileSelectors {
r, err := regexp.Compile(c.FileSelectors[i].RegexString)
if err != nil {
return err
}
c.FileSelectors[i].Regex = r
}
return nil
}
70 changes: 45 additions & 25 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ type s3Input struct {
}

type s3Info struct {
name string
key string
region string
arn string
name string
key string
region string
arn string
expandEventListFromField string
}

type bucket struct {
Expand Down Expand Up @@ -252,7 +253,7 @@ func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityT
func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, wg *sync.WaitGroup, errC chan error) {
defer wg.Done()

s3Infos, err := handleSQSMessage(message)
s3Infos, err := p.handleSQSMessage(message)
if err != nil {
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
Expand Down Expand Up @@ -352,7 +353,7 @@ func getRegionFromQueueURL(queueURL string) (string, error) {
}

// handle message
func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
func (p *s3Input) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
msg := sqsMessage{}
err := json.Unmarshal([]byte(*m.Body), &msg)
if err != nil {
Expand All @@ -361,21 +362,40 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {

var s3Infos []s3Info
for _, record := range msg.Records {
if record.EventSource == "aws:s3" && strings.HasPrefix(record.EventName, "ObjectCreated:") {
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}
if record.EventSource != "aws:s3" || !strings.HasPrefix(record.EventName, "ObjectCreated:") {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
}
// Unescape substrings from s3 log name. For example, convert "%3D" back to "="
filename, err := url.QueryUnescape(record.S3.object.Key)
if err != nil {
return nil, errors.Wrapf(err, "url.QueryUnescape failed for '%s'", record.S3.object.Key)
}

if len(p.config.FileSelectors) == 0 {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: p.config.ExpandEventListFromField,
})
} else {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
continue
}

for _, fs := range p.config.FileSelectors {
if fs.Regex == nil {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
})
break
}
}
}
return s3Infos, nil
Expand Down Expand Up @@ -456,7 +476,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" {
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
decoder := json.NewDecoder(reader)
err := p.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
Expand Down Expand Up @@ -537,10 +557,10 @@ func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3
func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
err := errors.Errorf("key '%s' not found", s3Info.expandEventListFromField)
p.logger.Error(err)
return offset, err
}
Expand All @@ -555,10 +575,10 @@ func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash
return offset, nil
}
case map[string]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if s3Info.expandEventListFromField != "" {
textValues, ok := f[s3Info.expandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
err := errors.Errorf("key '%s' not found", s3Info.expandEventListFromField)
p.logger.Error(err)
return offset, err
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ func TestHandleMessage(t *testing.T) {
},
}

p := &s3Input{context: &channelContext{}}
for _, c := range casesPositive {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
s3Info, err := p.handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestHandleMessage(t *testing.T) {

for _, c := range casesNegative {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
s3Info, err := p.handleSQSMessage(c.message)
assert.Error(t, err)
assert.Nil(t, s3Info)
})
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/module/aws/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false

# Process CloudTrail Digest logs
# default true, set to false to skip CloudTrail Digest logs
# var.process_digest_logs: false

# Process CloudTrail Insight logs
# default true, set to false to skip CloudTrail Insight logs
# var.process_insight_logs: false

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
Loading

0 comments on commit 6a7c864

Please sign in to comment.