Skip to content

Commit

Permalink
Include request headers in client.Info (#4547)
Browse files Browse the repository at this point in the history
* Include request headers in client.Info

* Add GRPC metadata

* Add tests

* Add changelog

* Clone headers

* Add config option

* Update config/confighttp/clientinfohandler.go

Co-authored-by: Juraci Paixão Kröhling <[email protected]>

* Update config/confighttp/confighttp.go

Co-authored-by: Juraci Paixão Kröhling <[email protected]>

* Update config/configgrpc/configgrpc.go

Co-authored-by: Juraci Paixão Kröhling <[email protected]>

* oops

* Fix the test

* Immutable metadata

* Really immutable metadata

* Really return copy, add test

* Optimize metadata getter

* Move changelog entry to unreleased

* Add experimental remarks per CR

Co-authored-by: Kemal Hadimli <[email protected]>
Co-authored-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2022
1 parent 0bb3c8c commit 87365b7
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 💡 Enhancements 💡

- `confighttp` and `configgrpc`: New config option `include_metadata` to persist request metadata/headers in `client.Info.Metadata` (experimental) (#4547)

## 🛑 Breaking changes 🛑

- Change configmapprovider.Provider to accept a location for retrieve (#4657)
Expand Down
29 changes: 29 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ type Info struct {
// configauth.ServerAuthenticator implementations tied to the receiver for
// this connection.
Auth AuthData

// Metadata is the request metadata from the client connecting to this connector.
// Experimental: *NOTE* this structure is subject to change or removal in the future.
Metadata Metadata
}

// Metadata is an immutable map, meant to contain request metadata.
type Metadata struct {
data map[string][]string
}

// AuthData represents the authentication data as seen by authenticators tied to
Expand Down Expand Up @@ -137,3 +146,23 @@ func FromContext(ctx context.Context) Info {
}
return c
}

// NewMetadata creates a new Metadata object to use in Info. md is used as-is.
func NewMetadata(md map[string][]string) Metadata {
return Metadata{
data: md,
}
}

// Get gets the value of the key from metadata, returning a copy.
func (m Metadata) Get(key string) []string {
vals := m.data[key]
if len(vals) == 0 {
return nil
}

ret := make([]string, len(vals))
copy(ret, vals)

return ret
}
13 changes: 13 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,16 @@ func TestFromContext(t *testing.T) {
})
}
}

func TestMetadata(t *testing.T) {
source := map[string][]string{"test-key": {"test-val"}}
md := NewMetadata(source)
assert.Equal(t, []string{"test-val"}, md.Get("test-key"))

// test if copy. In regular use, source cannot change
val := md.Get("test-key")
source["test-key"][0] = "abc"
assert.Equal(t, []string{"test-val"}, val)

assert.Empty(t, md.Get("non-existent-key"))
}
31 changes: 22 additions & 9 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ type GRPCServerSettings struct {

// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Include propagates the incoming connection's metadata to downstream consumers.
// Experimental: *NOTE* this option is subject to change or removal in the future.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
}

// SanitizedEndpoint strips the prefix of either http:// or https:// from configgrpc.GRPCClientSettings.Endpoint.
Expand Down Expand Up @@ -348,8 +352,8 @@ func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings comp
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))

uInterceptors = append(uInterceptors, enhanceWithClientInformation)
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation)
uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata))
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata))

opts = append(opts, grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

Expand All @@ -372,23 +376,32 @@ func getGRPCCompressionName(compressionType middleware.CompressionType) (string,

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx), req)
func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx, includeMetadata), req)
}
}

func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context())
return handler(srv, wrapped)
func enhanceStreamWithClientInformation(includeMetadata bool) func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context(), includeMetadata)
return handler(srv, wrapped)
}
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(ctx context.Context) context.Context {
func contextWithClient(ctx context.Context, includeMetadata bool) context.Context {
cl := client.FromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
if includeMetadata {
if md, ok := metadata.FromIncomingContext(ctx); ok {
cl.Metadata = client.NewMetadata(md.Copy())
}
}
return client.NewContext(ctx, cl)
}

Expand Down
40 changes: 35 additions & 5 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,10 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input context.Context
expected client.Info
desc string
input context.Context
doMetadata bool
expected client.Info
}{
{
desc: "no peer information, empty client",
Expand Down Expand Up @@ -695,10 +696,39 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "existing client with metadata",
input: client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
}),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context, no metadata processing",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
expected: client.Info{},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cl := client.FromContext(contextWithClient(tC.input))
cl := client.FromContext(contextWithClient(tC.input, tC.doMetadata))
assert.Equal(t, tC.expected, cl)
})
}
Expand All @@ -721,7 +751,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) {
}

// test
err := enhanceStreamWithClientInformation(nil, stream, nil, handler)
err := enhanceStreamWithClientInformation(false)(nil, stream, nil, handler)

// verify
assert.NoError(t, err)
Expand Down
11 changes: 9 additions & 2 deletions config/confighttp/clientinfohandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ var _ http.Handler = (*clientInfoHandler)(nil)
// clientInfoHandler is an http.Handler that enhances the incoming request context with client.Info.
type clientInfoHandler struct {
next http.Handler

// include client metadata or not
includeMetadata bool
}

// ServeHTTP intercepts incoming HTTP requests, replacing the request's context with one that contains
// a client.Info containing the client's IP address.
func (h *clientInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(contextWithClient(req))
req = req.WithContext(contextWithClient(req, h.includeMetadata))
h.next.ServeHTTP(w, req)
}

// contextWithClient attempts to add the client IP address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(req *http.Request) context.Context {
func contextWithClient(req *http.Request, includeMetadata bool) context.Context {
cl := client.FromContext(req.Context())

ip := parseIP(req.RemoteAddr)
if ip != nil {
cl.Addr = ip
}

if includeMetadata {
cl.Metadata = client.NewMetadata(req.Header.Clone())
}

ctx := client.NewContext(req.Context(), cl)
return ctx
}
Expand Down
7 changes: 6 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ type HTTPServerSettings struct {

// Auth for this receiver
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// IncludeMetadata propagates the client metadata from the incoming requests to the downstream consumers
// Experimental: *NOTE* this option is subject to change or removal in the future.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
}

// ToListener creates a net.Listener.
Expand Down Expand Up @@ -284,7 +288,8 @@ func (hss *HTTPServerSettings) ToServer(host component.Host, settings component.

// wrap the current handler in an interceptor that will add client.Info to the request's context
handler = &clientInfoHandler{
next: handler,
next: handler,
includeMetadata: hss.IncludeMetadata,
}

return &http.Server{
Expand Down
31 changes: 25 additions & 6 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,17 +738,18 @@ func TestHttpHeaders(t *testing.T) {

func TestContextWithClient(t *testing.T) {
testCases := []struct {
desc string
input *http.Request
expected client.Info
desc string
input *http.Request
doMetadata bool
expected client.Info
}{
{
desc: "request without client IP",
desc: "request without client IP or headers",
input: &http.Request{},
expected: client.Info{},
},
{
desc: "request without client IP",
desc: "request with client IP",
input: &http.Request{
RemoteAddr: "1.2.3.4:55443",
},
Expand All @@ -758,10 +759,28 @@ func TestContextWithClient(t *testing.T) {
},
},
},
{
desc: "request with client headers, no metadata processing",
input: &http.Request{
Header: map[string][]string{"x-test-header": {"test-value"}},
},
doMetadata: false,
expected: client.Info{},
},
{
desc: "request with client headers",
input: &http.Request{
Header: map[string][]string{"x-test-header": {"test-value"}},
},
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"x-test-header": {"test-value"}}),
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
ctx := contextWithClient(tC.input)
ctx := contextWithClient(tC.input, tC.doMetadata)
assert.Equal(t, tC.expected, client.FromContext(ctx))
})
}
Expand Down

0 comments on commit 87365b7

Please sign in to comment.