Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/cel: new input #31233

Merged
merged 8 commits into from
Nov 15, 2022
Merged

x-pack/filebeat/input/cel: new input #31233

merged 8 commits into from
Nov 15, 2022

Conversation

efd6
Copy link
Contributor

@efd6 efd6 commented Apr 11, 2022

What does this PR do?

This adds a new input to filebeat that allows processing of datastreams using the Common Expression Language.

Why is it important?

It provides a consistent framework for generalised input processing.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Apr 11, 2022
@botelastic
Copy link

botelastic bot commented Apr 11, 2022

This pull request doesn't have a Team:<team> label.

@mergify mergify bot assigned efd6 Apr 11, 2022
@elasticmachine
Copy link
Collaborator

elasticmachine commented Apr 11, 2022

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2022-11-14T22:52:48.891+0000

  • Duration: 65 min 36 sec

❕ Flaky test report

No test was executed to be analysed.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@efd6 efd6 force-pushed the mito branch 26 times, most recently from 97dc2ff to 682519e Compare April 15, 2022 09:55
@efd6
Copy link
Contributor Author

efd6 commented Nov 2, 2022

Thanks. I'll take a look.

  • fix expected src type — error in constructor

  • fix repeated requests — this is working as intended, though this may need revisiting

    When a non-empty set of events is received from the CEL evaluation, the input assumes there will be more events still available from the API. To signal that no further events are wanted from a periodic run of the input the program should return a state with a field want_more set to false.

    So the following will repeatedly hit api.ipify.org more frequently than the config interval suggests.

      filebeat.inputs:
        - type: cel
          interval: 1m
          resource.url: https://api.ipify.org/?format=json
          program: |
            bytes(get(state.url).Body).as(body, {
                "events": [body.decode_json()],
            })
    
      output.console.pretty: true
    

    but with the addition of the "want_more": false, line, only a single request will be made per minute.

      filebeat.inputs:
        - type: cel
          interval: 1m
          resource.url: https://api.ipify.org/?format=json
          program: |
            bytes(get(state.url).Body).as(body, {
                "events": [body.decode_json()],
                "want_more": false,
            })
    
      output.console.pretty: true
    

    It is important to be able to re-query in a single period, so some behaviour like this is required, but maybe we can mitigate the footgun a little.

    Possible options to mitigate footgun:

    • invert the default so that an absent want_more means do not repeat and the program must explicitly request more with "want_more": true.
    • split the repeat request behaviour into a tri-state:
      1. absent — current behaviour up to a configurable limit of requests defaulting to some small number
      2. true — always repeat
      3. false — never repeat
  • fix cancelation — cel.Program.ContextEval does not return context errors

    Prior to the fix, running an instance of filebeat with the following
    configuration would result in an unstoppable instance.

      filebeat.inputs:
        - type: cel
          interval: 1m
          resource.url: https://api.ipify.org/?format=json
          program: |
            bytes(get(state.url).Body).as(body, {
                "events": [body.decode_json()]
            })
    
      output.console.pretty: true
    

    This happens because the cel program evaluation method does not return
    the context cancellation error when a context is cancelled. We also
    don't check for cancellation except in the case that we have events or
    we have a limit policy in place, so add a check immediately after the
    return of the evaluation.

@efd6 efd6 force-pushed the mito branch 2 times, most recently from fc58cb1 to 1573167 Compare November 3, 2022 06:17
@mergify
Copy link
Contributor

mergify bot commented Nov 8, 2022

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

@efd6 efd6 force-pushed the mito branch 2 times, most recently from d2c0551 to b8c73a0 Compare November 8, 2022 09:29
@mergify
Copy link
Contributor

mergify bot commented Nov 8, 2022

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b mito upstream/mito
git merge upstream/main
git push upstream mito

Prior to this change, running an instance of filebeat with the following
configuration would result in an unstoppable instance.

    filebeat.inputs:
      - type: cel
        interval: 1m
        resource.url: https://api.ipify.org/?format=json
        program: |
          bytes(get(state.url).Body).as(body, {
              "events": [body.decode_json()]
          })

    output.console.pretty: true

This happens because the cel program evaluation method does not return
the context cancellation error when a context is cancelled. We also
don't check for cancellation except in the case that we have events or
we have a limit policy in place, so add a check immediately after the
return of the evaluation.
Copy link
Member

@andrewkroh andrewkroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I retested with want_more and things worked as expected. 👍

}

// Process a set of event requests.
log.Debugw("request state", "state", state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider namespacing the logger's structured data with "cel" (e.g. iirc .Debugw("message", logp.Namespace("cel"), "state", state)). This way all of the structured data will get logged like {"cel": {"state: ...}}. That could help minimize conflicts if someone was ingesting this data.

Copy link
Contributor Author

@efd6 efd6 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not log := env.Logger.Named("cel").With("input_url", cfg.Resource.URL) at line 104?

I've looked into this and this is already decorated by the input cursor caller here

inpCtx := ctx
inpCtx.ID = ctx.ID + "::" + source.Name()
inpCtx.Logger = ctx.Logger.With("input_source", source.Name())
if err = inp.runSource(inpCtx, inp.manager.store, source, pipeline); err != nil {
cancel()
}
.

Is this still something that you would like done?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The namespace has a different purpose than the name. And what I’m suggesting is not strictly necessary and nor is it a general solution to the problem of using a consistent schema across log messages, but it might help make someone’s job easier.

Basically by adding logp.Namespace("cel") it will be creating cel.state.* fields in the resulting JSON from the logger as opposed to state.*. In case something else is already logging state, say as a string, then it will be easier to consume the log data.

Copy link
Contributor Author

@efd6 efd6 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Is that something that should be added to httpjson as well? No, no it isn't, since state is never logged there.

Done

_, ok = state["url"]
if !ok && goodURL != "" {
state["url"] = goodURL
log.Infow("adding missing url", "state", mapstr.M(state), "url", goodURL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be helpful to clarify how it's correcting the issue and explain why it's happening. I'm thinking something like

adding the current URL to the returned state because it did not include a 'url'

I expect some use cases that only hit a single static URL will omit adding the url from their CEL expression. Should we make this debug level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, it feels like INFO, but in the case that people don't return a URL, it would spam the logs with this, though that would be easily fixed by following the documentation.

state, err = evalWith(ctx, prg, map[string]interface{}{
root: state,
})
log.Debugw("response state", "state", state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would state include anything sensitive that should not be logged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a note to the documentation that when logging with debug that the complete state after evaluation is logged and so this should not be used in production. The situation is only slightly worse than the case for logging with debug and seeing all published events, but worth noting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option; I think this would make a good enhancement, but not for now because I'd like to spend some more time thinking about the best way to do it, would be to have either a config.mask or state.mask array of string field that is used to remove/clobber fields in a copy of the state before logging state.

- note security concern with logging at debug level and make all state
  logging at this level.
- add note in debug log explaining missing url.
@efd6
Copy link
Contributor Author

efd6 commented Nov 14, 2022

The linter errors may be related to #33649.

@cmacknz
Copy link
Member

cmacknz commented Nov 14, 2022

Stumbled on this through the linked linter PR:

Cool! I am excited to see this implemented. I have used CEL in the past for configurable processing and have been wondering if we could make use of it. I am particularly interested to see if CEL stays contained to this one one input after we get some experience with it, or if we'll want to use it in more places.

@efd6
Copy link
Contributor Author

efd6 commented Nov 14, 2022

/test

@efd6
Copy link
Contributor Author

efd6 commented Nov 14, 2022

@cmacknz I can see value in having CEL processing available elsewhere and this is initially an experiment to see how well it will work as an analogue of the httpjson input. You may want to take a look at github.com/elastic/mito which provides the CEL extensions used here.

@efd6
Copy link
Contributor Author

efd6 commented Nov 15, 2022

E2E failure is unrelated.

@efd6 efd6 merged commit adc85ee into elastic:main Nov 15, 2022
chrisberkhout pushed a commit that referenced this pull request Jun 1, 2023
This adds a new filebeat input that enables processing a resource that is
either filesystem-local or an HTTP API endpoint.

The input uses the Common Expression Language to convert arbitrarily formatted
data into a set of objects that are then published by filebeat. Documentation
for CEL and the CEL extensions that are made available through the input is
available from the CEL project's pages and an Elastic repository that extends
the standard language's features: github.com/elasti/mito.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
8.6 candidate backport-skip Skip notification from the automated backport with mergify enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants