Skip to content

Commit

Permalink
fix cancellation handling
Browse files Browse the repository at this point in the history
Prior to this change, running an instance of filebeat with the following
configuration would result in an unstoppable instance.

    filebeat.inputs:
      - type: cel
        interval: 1m
        resource.url: https://api.ipify.org/?format=json
        program: |
          bytes(get(state.url).Body).as(body, {
              "events": [body.decode_json()]
          })

    output.console.pretty: true

This happens because the cel program evaluation method does not return
the context cancellation error when a context is cancelled. We also
don't check for cancellation except in the case that we have events or
we have a limit policy in place, so add a check immediately after the
return of the evaluation.
  • Loading branch information
efd6 committed Nov 8, 2022
1 parent e0b6fdd commit a404e43
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
})
log.Debugw("response state", "state", state)
if err != nil {
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
}
log.Errorw("failed evaluation", "error", err)
}
metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
Expand Down Expand Up @@ -804,6 +808,11 @@ func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return input, fmt.Errorf("failed eval: %w", err)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{}))
if err != nil {
Expand Down

0 comments on commit a404e43

Please sign in to comment.