From cd547c688521c6475120200fecd4cca29a05eb5b Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Fri, 18 Nov 2022 21:44:42 +1030 Subject: [PATCH] x-pack/filebeat/input/cel: add support for http+{unix,npipe} schemes (#33712) Also explicitly add cel to elastic/security-external-integrations ownership list. --- .github/CODEOWNERS | 1 + CHANGELOG.next.asciidoc | 3 +- .../filebeat/docs/inputs/input-cel.asciidoc | 3 ++ x-pack/filebeat/input/cel/input.go | 51 +++++++++++++++++-- x-pack/filebeat/input/cel/transport_other.go | 21 ++++++++ .../filebeat/input/cel/transport_windows.go | 23 +++++++++ 6 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 x-pack/filebeat/input/cel/transport_other.go create mode 100644 x-pack/filebeat/input/cel/transport_windows.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d84e95fa67b..0969e1b475f 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -76,6 +76,7 @@ CHANGELOG* /x-pack/filebeat/docs/ # Listed without an owner to avoid maintaining doc ownership for each input and module. /x-pack/filebeat/input/awscloudwatch/ @elastic/obs-cloud-monitoring /x-pack/filebeat/input/awss3/ @elastic/obs-cloud-monitoring +/x-pack/filebeat/input/cel/ @elastic/security-external-integrations /x-pack/filebeat/input/gcppubsub/ @elastic/security-external-integrations /x-pack/filebeat/input/http_endpoint/ @elastic/security-external-integrations /x-pack/filebeat/input/httpjson/ @elastic/security-external-integrations diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d851c22b8d9..20948ff8ac5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -164,7 +164,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658] - Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673] - Add Common Expression Language input. {pull}31233[31233] -- Add support for http+unix and http+npipe schemes. {issue}33571[33571] {pull}33610[33610] +- Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610] +- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index 9dab81bd52f..04a168193ec 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -425,6 +425,9 @@ with `auth.oauth2.google.jwt_file` or `auth.oauth2.google.jwt_json`. The URL of the HTTP API. Required. +The API endpoint may be accessed via unix socket and Windows named pipes by adding `+unix` or `+npipe` +to the URL scheme, for example, `http+unix:///var/socket/`. + [float] ==== `resource.timeout` diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 0cd36097161..9833ff74283 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -47,6 +47,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" + "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/useragent" "github.com/elastic/go-concert/ctxtool" @@ -640,10 +641,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, if !wantClient(cfg) { return nil, nil } - c, err := cfg.Resource.Transport.Client( - httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, - ) + c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL)...) if err != nil { return nil, err } @@ -684,7 +682,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, } func wantClient(cfg config) bool { - switch cfg.Resource.URL.Scheme { + switch scheme, _, _ := strings.Cut(cfg.Resource.URL.Scheme, "+"); scheme { case "http", "https": return true default: @@ -692,6 +690,49 @@ func wantClient(cfg config) bool { } } +// clientOption returns constructed client configuration options, including +// setting up http+unix and http+npipe transports if requested. +func clientOptions(u *url.URL) []httpcommon.TransportOption { + scheme, trans, ok := strings.Cut(u.Scheme, "+") + var dialer transport.Dialer + switch { + default: + fallthrough + case !ok: + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithKeepaliveSettings{Disable: true}, + } + + // We set the host for the unix socket and Windows named + // pipes schemes because the http.Transport expects to + // have a host and will error out if it is not present. + // The values here are just non-zero with a helpful name. + // They are not used in any logic. + case trans == "unix": + u.Host = "unix-socket" + dialer = socketDialer{u.Path} + case trans == "npipe": + u.Host = "windows-npipe" + dialer = npipeDialer{u.Path} + } + u.Scheme = scheme + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + httpcommon.WithKeepaliveSettings{Disable: true}, + httpcommon.WithBaseDialer(dialer), + } +} + +// socketDialer implements transport.Dialer to a constant socket path. +type socketDialer struct { + path string +} + +func (d socketDialer) Dial(_, _ string) (net.Conn, error) { + return net.Dial("unix", d.path) +} + func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, []*http.Request) error { return func(req *http.Request, via []*http.Request) error { log.Debug("http client: checking redirect") diff --git a/x-pack/filebeat/input/cel/transport_other.go b/x-pack/filebeat/input/cel/transport_other.go new file mode 100644 index 00000000000..387d2f92ea5 --- /dev/null +++ b/x-pack/filebeat/input/cel/transport_other.go @@ -0,0 +1,21 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !windows + +package cel + +import ( + "errors" + "net" +) + +// npipeDialer implements transport.Dialer. +type npipeDialer struct { + path string +} + +func (npipeDialer) Dial(_, _ string) (net.Conn, error) { + return nil, errors.New("named pipe only available on windows") +} diff --git a/x-pack/filebeat/input/cel/transport_windows.go b/x-pack/filebeat/input/cel/transport_windows.go new file mode 100644 index 00000000000..7ab3ffc664b --- /dev/null +++ b/x-pack/filebeat/input/cel/transport_windows.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build windows + +package cel + +import ( + "net" + "path/filepath" + + "github.com/Microsoft/go-winio" +) + +// npipeDialer implements transport.Dialer to a constant named pipe path. +type npipeDialer struct { + path string +} + +func (d npipeDialer) Dial(_, _ string) (net.Conn, error) { + return winio.DialPipe(`\\.\pipe`+filepath.FromSlash(d.path), nil) +}