Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: allow elision of set and append failu…
Browse files Browse the repository at this point in the history
…re logging
  • Loading branch information
efd6 committed Jun 20, 2024
1 parent ac4090a commit 912cc26
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add user group membership support to Okta entity analytics provider. {issue}39814[39814] {pull}39815[39815]
- Add request trace support for Okta and EntraID entity analytics providers. {pull}39821[39821]
- Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940]
- Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Appends a value to an array. If the field does not exist, the first entry will c
- `default` defines the fallback value whenever `value` is empty or the template parsing fails. Default templates do not have access to any state, only to functions.
- `value_type` defines the type of the resulting value. Possible values are: `string`, `json`, and `int`. Default is `string`.
- `fail_on_template_error` if set to `true` an error will be returned and the request will be aborted when the template evaluation fails. Default is `false`.
- `do_not_log_failure` if set to true a `fail_on_template_error` will not be logged except at DEBUG level. This should be used when template failure is expected in normal operation as control flow.

[float]
==== `delete`
Expand Down Expand Up @@ -178,6 +179,7 @@ Sets a value.
- `default` defines the fallback value whenever `value` is empty or the template parsing fails. Default templates do not have access to any state, only to functions.
- `value_type` defines how the resulting value will be treated. Possible values are: `string`, `json`, and `int`. Default is `string`.
- `fail_on_template_error` if set to `true` an error will be returned and the request will be aborted when the template evaluation fails. Default is `false`.
- `do_not_log_failure` if set to true a `fail_on_template_error` will not be logged except at DEBUG level. This should be used when template failure is expected in normal operation as control flow.

[[value-templates]]
==== Value templates
Expand Down Expand Up @@ -619,6 +621,7 @@ does not exist at the root level, please use the clause `.first_response.` to ac
target: body.page
value: '[[ .last_response.body.page ]]'
fail_on_template_error: true
do_not_log_failure: true
chain:
- step:
request.url: http://xyz.com/services/data/v1.0/$.exportId/export_ids/$.files[:].id/info
Expand Down
69 changes: 69 additions & 0 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,44 @@ var testCases = []struct {
`{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`,
},
},
{
name: "pagination_not_log_fail",
setupServer: func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
server := httptest.NewServer(h)
config["request.url"] = server.URL
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": time.Millisecond,
"request.method": http.MethodGet,
"response.split": map[string]interface{}{
"target": "body.items",
"transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.page",
"value": "[[.last_response.page]]",
},
},
},
},
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.params.page",
"value": "[[.last_response.body.nextPageToken]]",
"fail_on_template_error": true,
"do_not_log_failure": true,
},
},
},
},
handler: paginationHandler(),
expected: []string{
`{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`,
`{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`,
},
},
{
name: "first_event",
setupServer: func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
Expand Down Expand Up @@ -767,6 +805,37 @@ var testCases = []struct {
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "pagination_when_used_with_chaining_not_log_fail",
setupServer: newChainPaginationTestServer(httptest.NewServer),
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
"do_not_log_failure": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.records[:].id",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "replace_with_clause_and_first_response_object",
setupServer: func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ func (p *chainProcessor) handleEvent(ctx context.Context, msg mapstr.M) {
// for each pagination response, we repeat all the chain steps / blocks
n, err := p.req.processChainPaginationEvents(ctx, p.trCtx, p.pub, &response, p.idx, p.req.log)
if err != nil {
if errors.Is(err, notLogged{}) {
p.req.log.Debugf("ignored error processing chain event: %w", err)
return
}
p.req.log.Errorf("error processing chain event: %w", err)
return
}
Expand All @@ -600,9 +604,23 @@ func (p *chainProcessor) handleEvent(ctx context.Context, msg mapstr.M) {
}

func (p *chainProcessor) handleError(err error) {
if errors.Is(err, notLogged{}) {
p.req.log.Debugf("ignored error processing response: %v", err)
return
}
p.req.log.Errorf("error processing response: %v", err)
}

// notLogged is an error that is not logged except at DEBUG.
type notLogged struct {
error
}

func (notLogged) Is(target error) bool {
_, ok := target.(notLogged)
return ok
}

// eventCount returns the number of events that have been processed.
func (p *chainProcessor) eventCount() int {
return p.n
Expand Down Expand Up @@ -776,6 +794,10 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) {

// handleError logs err.
func (p *publisher) handleError(err error) {
if errors.Is(err, notLogged{}) {
p.log.Debugf("ignored error processing response: %v", err)
return
}
p.log.Errorf("error processing response: %v", err)
}

Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/transform_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type appendConfig struct {
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
FailOnTemplateError bool `config:"fail_on_template_error"`
DoNotLogFailure bool `config:"do_not_log_failure"`
ValueType string `config:"value_type"`
}

Expand All @@ -27,6 +28,7 @@ type appendt struct {
value *valueTpl
defaultValue *valueTpl
failOnTemplateError bool
doNotLogFailure bool
valueType valueType

runFunc func(ctx *transformContext, transformable transformable, key string, val interface{}) error
Expand Down Expand Up @@ -114,13 +116,17 @@ func newAppend(cfg *conf.C, log *logp.Logger) (appendt, error) {
value: c.Value,
defaultValue: c.Default,
failOnTemplateError: c.FailOnTemplateError,
doNotLogFailure: c.DoNotLogFailure,
valueType: vt,
}, nil
}

func (append *appendt) run(ctx *transformContext, tr transformable) (transformable, error) {
value, err := append.value.Execute(ctx, tr, append.targetInfo.Name, append.defaultValue, append.log)
if err != nil && append.failOnTemplateError {
if append.doNotLogFailure {
err = notLogged{err}
}
return transformable{}, err
}
if value == "" {
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/transform_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type setConfig struct {
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
FailOnTemplateError bool `config:"fail_on_template_error"`
DoNotLogFailure bool `config:"do_not_log_failure"`
ValueType string `config:"value_type"`
}

Expand All @@ -31,6 +32,7 @@ type set struct {
value *valueTpl
defaultValue *valueTpl
failOnTemplateError bool
doNotLogFailure bool
valueType valueType

runFunc func(ctx *transformContext, transformable transformable, key string, val interface{}) error
Expand Down Expand Up @@ -100,13 +102,17 @@ func newSet(cfg *conf.C, log *logp.Logger) (set, error) {
value: c.Value,
defaultValue: c.Default,
failOnTemplateError: c.FailOnTemplateError,
doNotLogFailure: c.DoNotLogFailure,
valueType: vt,
}, nil
}

func (set *set) run(ctx *transformContext, tr transformable) (transformable, error) {
value, err := set.value.Execute(ctx, tr, set.targetInfo.Name, set.defaultValue, set.log)
if err != nil && set.failOnTemplateError {
if set.doNotLogFailure {
err = notLogged{err}
}
return transformable{}, err
}
if value == "" {
Expand Down

0 comments on commit 912cc26

Please sign in to comment.