Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] - Added support for new features and removed partial save mechanism #36713

Merged
merged 13 commits into from
Oct 3, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690]
- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668]
- Add CEL partial value debug function. {pull}36652[36652]
- Added support for new features & removed partial save mechanism in the Gcs input. {issue}35847[35847] {pull}36713[36713]
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

*Auditbeat*

Expand Down
99 changes: 97 additions & 2 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum av
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout
calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value.

NOTE: In the latest update the partial save mechanism has been removed for now due to concurrency issues, but will be added back in the future.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

*A Sample Response :-*
["source","json"]
----
Expand Down Expand Up @@ -167,6 +169,9 @@ Now let's explore the configuration attributes a bit more elaborately.
8. <<attrib-poll-gcs,poll>>
9. <<attrib-poll_interval-gcs,poll_interval>>
10. <<attrib-parse_json,parse_json>>
11. <<attrib-file_selectors-gcs,file_selectors>>
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>


[id="attrib-project-id"]
Expand Down Expand Up @@ -252,6 +257,96 @@ highly nested json data. If this is set to `false` the *gcs.storage.object.json_
applicable for json objects and has no effect on other types of objects. This attribute can be specified both at the root level of the configuration as well at the bucket level.
The bucket level values will always take priority and override the root level values if both are specified.

[id="attrib-file_selectors-gcs"]
[float]
==== `file_selectors`

If the Gcs buckets will have objects that correspond to files that {beatname_uc} shouldn't process, `file_selectors` can be used to limit
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
the files that are downloaded. This is a list of selectors which are based on a `regex` pattern. The `regex` should match the object name or should be a part of the object name (ideally a prefix). The `regex` syntax is the same as used in the Go programming language. Files that don't match any configured regex won't be processed.This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
file_selectors:
- regex: '/Monitoring/'
efd6 marked this conversation as resolved.
Show resolved Hide resolved
- regex: 'docs/'
- regex: '/Security-Logs/'
----

[id="attrib-expand_event_list_from_field-gcs"]
[float]
==== `expand_event_list_from_field`

If the file-set using this input expects to receive multiple messages bundled under a specific field or an array of objects then the config option for `expand_event_list_from_field` can be specified. This setting will be able to split the messages under the group value into separate events. For example, if
you have logs that are in JSON format and events are found under the JSON object "Records". To split the events into separate events, the config option `expand_event_list_from_field` can be set to "Records". This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

[source, json]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"region": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"region": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
----

[source,yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
expand_event_list_from_field: Records
----

NOTE: The `parse_json` setting does not work with `expand_event_list_from_field`. If enabled it will be ignored. This attribute is only applicable for JSON file formats. You do not require to specify this attribute if the file has an array of objects at the root level. Root level array of objects are automatically split into separate events. If failures occur or the input crashes due to some unexpected error, the processing will resume from the last successfully processed file/object.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved


[id="attrib-timestamp_epoch-gcs"]
[float]
==== `timestamp_epoch`

This attribute can be used to filter out files/object which have a timestamp older than the specified value. The value of this attribute should be in unix `epoch` (seconds) format. The timestamp value is compared with the `object.Updated` field obtained from the object metadata. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 15s
bucket_timeout: 60s
timestamp_epoch: 1630444800
----

[id="bucket-overrides"]
*The sample configs below will explain the bucket level overriding of attributes a bit further :-*
Expand All @@ -260,7 +355,7 @@ The bucket level values will always take priority and override the root level va

Here `bucket_1` is using root level attributes while `bucket_2` overrides the values :

["source","yaml",subs="attributes"]
[source, yml]
----
filebeat.inputs:
- type: gcs
Expand Down Expand Up @@ -288,7 +383,7 @@ of using the root values.

Here both `bucket_1` and `bucket_2` overrides the root values :

["source","yaml",subs="attributes"]
[source, yml]
----
filebeat.inputs:
- type: gcs
Expand Down
49 changes: 33 additions & 16 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package gcs

import (
"time"

"github.com/elastic/beats/v7/libbeat/common/match"
)

// MaxWorkers - Defines the maximum number of go routines that will be spawned.
Expand All @@ -15,30 +17,45 @@ import (
// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out.
// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to
// false, since it can get expensive dealing with highly nested json data.
// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON can be configured at a global level,
// which applies to all buckets, as well as at the bucket level.
// FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket.
// TimeStampEpoch - Defines the epoch time in seconds, which is used to filter out objects that are older than the specified timestamp.
// ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events.
// MaxWorkers, Poll, PollInterval, BucketTimeOut, ParseJSON, FileSelectors, TimeStampEpoch & ExpandEventListFromField
// can be configured at a global level, which applies to all buckets, as well as at the bucket level.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
// Bucket level configurations will always override global level values.
type config struct {
ProjectId string `config:"project_id" validate:"required"`
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Buckets []bucket `config:"buckets" validate:"required"`
ProjectId string `config:"project_id" validate:"required"`
Auth authConfig `config:"auth" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Buckets []bucket `config:"buckets" validate:"required"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
// This field is only used for system test purposes, to override the HTTP endpoint.
AlternativeHost string `config:"alternative_host,omitempty"`
}

// bucket contains the config for each specific object storage bucket in the root account
type bucket struct {
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
Name string `config:"name" validate:"required"`
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
Poll *bool `config:"poll,omitempty"`
PollInterval *time.Duration `config:"poll_interval,omitempty"`
ParseJSON *bool `config:"parse_json,omitempty"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
TimeStampEpoch *int64 `config:"timestamp_epoch"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
}

// fileSelectorConfig helps filter out gcs objects based on a regex pattern
type fileSelectorConfig struct {
Regex *match.Matcher `config:"regex" validate:"required"`
// TODO: Add support for reader config in future
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}

type authConfig struct {
Expand Down
48 changes: 36 additions & 12 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package gcs

import (
"context"
"fmt"
"time"

"cloud.google.com/go/storage"
Expand All @@ -22,6 +23,12 @@ type gcsInput struct {
config config
}

// defines the valid range for Unix timestamps for 64 bit integers
var (
minTimestamp = time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
maxTimestamp = time.Date(3000, time.January, 1, 0, 0, 0, 0, time.UTC).Unix()
)

const (
inputName = "gcs"
)
Expand All @@ -47,18 +54,24 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
if err := cfg.Unpack(&config); err != nil {
return nil, nil, err
}

//nolint:prealloc // No need to preallocate the slice
var sources []cursor.Source
for _, b := range config.Buckets {
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
bucket := tryOverrideOrDefault(config, b)
if bucket.TimeStampEpoch != nil && !isValidUnixTimestamp(*bucket.TimeStampEpoch) {
return nil, nil, fmt.Errorf("invalid timestamp epoch: %d", *bucket.TimeStampEpoch)
}
sources = append(sources, &Source{
ProjectId: config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
ProjectId: config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
TimeStampEpoch: bucket.TimeStampEpoch,
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
})
}

Expand All @@ -76,42 +89,53 @@ func tryOverrideOrDefault(cfg config, b bucket) bucket {
}
b.MaxWorkers = &maxWorkers
}

if b.Poll == nil {
var poll bool
if cfg.Poll != nil {
poll = *cfg.Poll
}
b.Poll = &poll
}

if b.PollInterval == nil {
interval := time.Second * 300
if cfg.PollInterval != nil {
interval = *cfg.PollInterval
}
b.PollInterval = &interval
}

if b.ParseJSON == nil {
parse := false
if cfg.ParseJSON != nil {
parse = *cfg.ParseJSON
}
b.ParseJSON = &parse
}

if b.BucketTimeOut == nil {
timeOut := time.Second * 50
if cfg.BucketTimeOut != nil {
timeOut = *cfg.BucketTimeOut
}
b.BucketTimeOut = &timeOut
}
if b.TimeStampEpoch == nil {
b.TimeStampEpoch = cfg.TimeStampEpoch
}
if b.ExpandEventListFromField == "" {
b.ExpandEventListFromField = cfg.ExpandEventListFromField
}
if len(b.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 {
b.FileSelectors = cfg.FileSelectors
}

return b
}

// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp
func isValidUnixTimestamp(timestamp int64) bool {
// checks if the timestamp is within the valid range
return minTimestamp <= timestamp && timestamp <= maxTimestamp
}

func (input *gcsInput) Name() string {
return inputName
}
Expand Down
29 changes: 19 additions & 10 deletions x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"cloud.google.com/go/storage"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/sync/errgroup"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -47,16 +48,20 @@ func (pub statelessPublisher) Publish(event beat.Event, _ interface{}) error {
func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher, client *storage.Client) error {
pub := statelessPublisher{wrapped: publisher}
var source cursor.Source
var g errgroup.Group
for _, b := range in.config.Buckets {
bucket := tryOverrideOrDefault(in.config, b)
source = &Source{
ProjectId: in.config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
ProjectId: in.config.ProjectId,
BucketName: bucket.Name,
BucketTimeOut: *bucket.BucketTimeOut,
MaxWorkers: *bucket.MaxWorkers,
Poll: *bucket.Poll,
PollInterval: *bucket.PollInterval,
ParseJSON: *bucket.ParseJSON,
TimeStampEpoch: bucket.TimeStampEpoch,
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
}

st := newState()
Expand All @@ -80,8 +85,12 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
)

scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log)

return scheduler.schedule(ctx)
// allows multiple containers to be scheduled concurrently while testing
// the stateless input is triggered only while testing and till now it did not mimic
// the real world concurrent execution of multiple containers. This fix allows it to do so.
g.Go(func() error {
return scheduler.schedule(ctx)
})
}
return nil
return g.Wait()
}
Loading
Loading