Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: add support for http+{unix,npipe} sch…
Browse files Browse the repository at this point in the history
…emes (#33610)

No testing at this stage since test infrastructure assumes a TCP transport.
  • Loading branch information
efd6 authored and chrisberkhout committed Jun 1, 2023
1 parent e976233 commit a0e5a8e
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
- Add support for http+unix and http+npipe schemes. {issue}33571[33571] {pull}33610[33610]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ with `auth.oauth2.google.jwt_file` or `auth.oauth2.google.jwt_json`.

The URL of the HTTP API. Required.

The API endpoint may be accessed via unix socket and Windows named pipes by adding `+unix` or `+npipe`
to the URL scheme, for example, `http+unix:///var/socket/`.

[float]
==== `request.method`

Expand Down
21 changes: 21 additions & 0 deletions x-pack/filebeat/input/httpjson/client_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !windows

package httpjson

import (
"errors"
"net"
)

// npipeDialer implements transport.Dialer.
type npipeDialer struct {
path string
}

func (npipeDialer) Dial(_, _ string) (net.Conn, error) {
return nil, errors.New("named pipe only available on windows")
}
23 changes: 23 additions & 0 deletions x-pack/filebeat/input/httpjson/client_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build windows

package httpjson

import (
"net"
"path/filepath"

"github.com/Microsoft/go-winio"
)

// npipeDialer implements transport.Dialer to a constant named pipe path.
type npipeDialer struct {
path string
}

func (d npipeDialer) Dial(_, _ string) (net.Conn, error) {
return winio.DialPipe(`\\.\pipe`+filepath.FromSlash(d.path), nil)
}
50 changes: 46 additions & 4 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"net/http"
"net/url"
"strings"
"time"

retryablehttp "github.com/hashicorp/go-retryablehttp"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"
"github.com/elastic/go-concert/ctxtool"
Expand Down Expand Up @@ -155,10 +157,7 @@ func run(

func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := config.Request.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
)
netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,6 +199,49 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
return &httpClient{client: client.StandardClient(), limiter: limiter}, nil
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL) []httpcommon.TransportOption {
scheme, trans, ok := strings.Cut(u.Scheme, "+")
var dialer transport.Dialer
switch {
default:
fallthrough
case !ok:
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
}

// We set the host for the unix socket and Windows named
// pipes schemes because the http.Transport expects to
// have a host and will error out if it is not present.
// The values here are just non-zero with a helpful name.
// They are not used in any logic.
case trans == "unix":
u.Host = "unix-socket"
dialer = socketDialer{u.Path}
case trans == "npipe":
u.Host = "windows-npipe"
dialer = npipeDialer{u.Path}
}
u.Scheme = scheme
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
httpcommon.WithBaseDialer(dialer),
}
}

// socketDialer implements transport.Dialer to a constant socket path.
type socketDialer struct {
path string
}

func (d socketDialer) Dial(_, _ string) (net.Conn, error) {
return net.Dial("unix", d.path)
}

func checkRedirect(config *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
log.Debug("http client: checking redirect")
Expand Down
6 changes: 1 addition & 5 deletions x-pack/filebeat/input/httpjson/request_chain_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

const (
Expand All @@ -33,10 +32,7 @@ const (

func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := requestCfg.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
)
netHTTPClient, err := requestCfg.Transport.Client(clientOptions(requestCfg.URL.URL)...)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/httpjson/request_chain_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"bytes"
"context"
"net/url"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -16,6 +17,7 @@ import (

func Test_newChainHTTPClient(t *testing.T) {
cfg := defaultChainConfig()
cfg.Request.URL = &urlConfig{URL: &url.URL{}}
ctx := context.Background()
log := logp.NewLogger("newChainClientTestLogger")

Expand Down Expand Up @@ -119,7 +121,6 @@ func Test_evaluateResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

expression := &valueTpl{}
err := expression.Unpack(tt.args.expression)
assert.NoError(t, err)
Expand Down

0 comments on commit a0e5a8e

Please sign in to comment.