Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/cel: do not request more work after an eval failure #37161

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957]
- Made GCS input GA and updated docs accordingly. {pull}37127[37127]
- Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160]
- Prevent CEL input from re-entering the eval loop when an evaluation failed. {pull}37161[37161]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ The field should be an array, but in the case of an error condition in the CEL p

<3> If `rate_limit` is present it must be a map with numeric fields `rate` and `burst`. The `rate_limit` field may also have a string `error` field and other fields which will be logged. If it has an `error` field, the `rate` and `burst` will not be used to set rate limit behavior. The {mito_docs}/lib#Limit[Limit], and {mito_docs}/lib#OktaRateLimit[Okta Rate Limit policy] and {mito_docs}/lib#DraftRateLimit[Draft Rate Limit policy] documentation show how to construct this field.

<4> The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned.
<4> The evaluation is repeated with the new state, after removing the events field, if the "want_more" field is present and true, and a non-zero events array is returned. If the "want_more" field is present after a failed evaluation, it is set to false.

The `status_code`, `header` and `rate_limit` values may be omitted if the program is not interacting with an HTTP API end-point and so will not be needed to contribute to program control.

Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,12 +978,14 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}
}
if err != nil {
state["events"] = errorMessage(fmt.Sprintf("failed eval: %v", err))
clearWantMore(state)
return state, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Struct)(nil)))
if err != nil {
state["events"] = errorMessage(fmt.Sprintf("failed proto conversion: %v", err))
clearWantMore(state)
return state, fmt.Errorf("failed proto conversion: %w", err)
}
switch v := v.(type) {
Expand All @@ -993,10 +995,21 @@ func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}
// This should never happen.
errMsg := fmt.Sprintf("unexpected native conversion type: %T", v)
state["events"] = errorMessage(errMsg)
clearWantMore(state)
return state, errors.New(errMsg)
}
}

// clearWantMore sets the state to not request additional work in a periodic evaluation.
// It leaves state intact if there is no "want_more" element, and sets the element to false
// if there is. This is necessary instead of just doing delete(state, "want_more") as
// client CEL code may expect the want_more field to be present.
func clearWantMore(state map[string]interface{}) {
if _, ok := state["want_more"]; ok {
state["want_more"] = false
}
}

func errorMessage(msg string) map[string]interface{} {
return map[string]interface{}{"error": map[string]interface{}{"message": msg}}
}
Expand Down
Loading