Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: do not request more work after an eval fai…
Browse files Browse the repository at this point in the history
…lure (#37161)

The logic for want_more is that there must be at least one event
published for a true want_more field to result in an additional loop of
evaluation. This condition can currently be true in the case that the
evaluation fails. In the failure case it is likely that we do not want
to re-eval, so set want_more to false in this situation.
  • Loading branch information
efd6 authored Nov 22, 2023
1 parent 867be3b commit 199075c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957]
- Made GCS input GA and updated docs accordingly. {pull}37127[37127]
- Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160]
- Prevent CEL input from re-entering the eval loop when an evaluation failed. {pull}37161[37161]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ The field should be an array, but in the case of an error condition in the CEL p

<3> If `rate_limit` is present it must be a map with numeric fields `rate` and `burst`. The `rate_limit` field may also have a string `error` field and other fields which will be logged. If it has an `error` field, the `rate` and `burst` will not be used to set rate limit behavior. The {mito_docs}/lib#Limit[Limit], and {mito_docs}/lib#OktaRateLimit[Okta Rate Limit policy] and {mito_docs}/lib#DraftRateLimit[Draft Rate Limit policy] documentation show how to construct this field.

<4> The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned.
<4> The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned. If the "want_more" field is present after a failed evaluation, it is set to false.

The `status_code`, `header` and `rate_limit` values may be omitted if the program is not interacting with an HTTP API end-point and so will not be needed to contribute to program control.

Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,12 +978,14 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}
}
if err != nil {
state["events"] = errorMessage(fmt.Sprintf("failed eval: %v", err))
clearWantMore(state)
return state, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Struct)(nil)))
if err != nil {
state["events"] = errorMessage(fmt.Sprintf("failed proto conversion: %v", err))
clearWantMore(state)
return state, fmt.Errorf("failed proto conversion: %w", err)
}
switch v := v.(type) {
Expand All @@ -993,10 +995,21 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}
// This should never happen.
errMsg := fmt.Sprintf("unexpected native conversion type: %T", v)
state["events"] = errorMessage(errMsg)
clearWantMore(state)
return state, errors.New(errMsg)
}
}

// clearWantMore sets the state to not request additional work in a periodic evaluation.
// It leaves state intact if there is no "want_more" element, and sets the element to false
// if there is. This is necessary instead of just doing delete(state, "want_more") as
// client CEL code may expect the want_more field to be present.
func clearWantMore(state map[string]interface{}) {
if _, ok := state["want_more"]; ok {
state["want_more"] = false
}
}

func errorMessage(msg string) map[string]interface{} {
return map[string]interface{}{"error": map[string]interface{}{"message": msg}}
}
Expand Down

0 comments on commit 199075c

Please sign in to comment.