Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/cel: fix eval state return on error #33996

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]

*Heartbeat*
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
Expand Down
28 changes: 12 additions & 16 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
log.Debugw("request state", logp.Namespace("cel"), "state", state)
metrics.executions.Add(1)
start := time.Now()
state, err = evalWith(ctx, prg, map[string]interface{}{
root: state,
})
state, err = evalWith(ctx, prg, state)
log.Debugw("response state", logp.Namespace("cel"), "state", state)
if err != nil {
switch {
Expand Down Expand Up @@ -427,9 +425,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
// Replace the last known good cursor.
state["cursor"] = goodCursor

// Avoid explicit type assertion. This is safe as long as the value is
// Go-comparable.
if state["want_more"] == false {
if more, _ := state["want_more"].(bool); !more {
return nil
}
}
Expand Down Expand Up @@ -847,31 +843,31 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi
return prg, nil
}

func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, input)
func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state})
if e := ctx.Err(); e != nil {
err = e
}
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return input, fmt.Errorf("failed eval: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return state, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{}))
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)}
return input, fmt.Errorf("failed proto conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)}
return state, fmt.Errorf("failed proto conversion: %w", err)
}
b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message))
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)}
return input, fmt.Errorf("failed native conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)}
return state, fmt.Errorf("failed native conversion: %w", err)
}
var res map[string]interface{}
err = json.Unmarshal(b, &res)
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)}
return input, fmt.Errorf("failed json conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)}
return state, fmt.Errorf("failed json conversion: %w", err)
}
return res, nil
}
Expand Down