Skip to content

Commit

Permalink
[k174] Propagate trace ID with HTTP gRPC request. (#11251) (#11268)
Browse files Browse the repository at this point in the history
The changes in #10688 did not
propage the trace ID from the context. `Frontend.RoundTripGRPC` would
inject the trace ID into the request. That's not done in `Frontend.Do`.
This changes extends the `codec.EncodeRequest` to inject the trace ID
there. This is more inline with other metadata.

(cherry picked from commit 8d34f85)

---------

Co-authored-by: Karsten Jeschkies <[email protected]>
  • Loading branch information
trevorwhitney and jeschkies authored Nov 20, 2023
1 parent 82e0183 commit 5ab5b8f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
internalMiddlewares := []queryrangebase.Middleware{
serverutil.RecoveryMiddleware,
queryrange.Instrument{Metrics: t.Metrics},
queryrange.Tracer{},
}
if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled {
internalMiddlewares = append(
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,22 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
}
}

// Add org id
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
header.Set(user.OrgIDHeaderName, orgID)

// Propagate trace context in request.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
carrier := opentracing.HTTPHeadersCarrier(header)
if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
return nil, err
}
}

switch request := r.(type) {
case *LokiRequest:
params := url.Values{
Expand Down Expand Up @@ -725,6 +741,7 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
}
}

// nolint:goconst
func (c Codec) Path(r queryrangebase.Request) string {
switch request := r.(type) {
case *LokiRequest:
Expand Down
46 changes: 28 additions & 18 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
Expand All @@ -41,6 +42,9 @@ var (
)

func Test_codec_EncodeDecodeRequest(t *testing.T) {

ctx := user.InjectOrgID(context.Background(), "1")

tests := []struct {
name string
reqBuilder func() (*http.Request, error)
Expand Down Expand Up @@ -108,7 +112,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
}, NewLabelRequest(start, end, `{foo="bar"}`, "test", "/label/test/values"),
false},
{"index_stats", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(context.Background(), &logproto.IndexStatsRequest{
return DefaultCodec.EncodeRequest(ctx, &logproto.IndexStatsRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
Expand All @@ -119,7 +123,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
Matchers: `{job="foo"}`,
}, false},
{"volume", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
Expand All @@ -138,7 +142,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
AggregateBy: "labels",
}, false},
{"volume_default_limit", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
Expand All @@ -152,7 +156,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
AggregateBy: "series",
}, false},
{"volume_range", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
Expand All @@ -170,7 +174,7 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
AggregateBy: "series",
}, false},
{"volume_range_default_limit", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(ctx, &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
Expand Down Expand Up @@ -585,7 +589,8 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
require.NoError(t, err)

// queryrange.Response -> JSON
httpResp, err := codec.EncodeResponse(context.TODO(), httpReq, resp)
ctx := user.InjectOrgID(context.Background(), "1")
httpResp, err := codec.EncodeResponse(ctx, httpReq, resp)
require.NoError(t, err)

body, _ := io.ReadAll(httpResp.Body)
Expand All @@ -596,11 +601,11 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {

func Test_codec_EncodeRequest(t *testing.T) {
// we only accept LokiRequest.
got, err := DefaultCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{})
ctx := user.InjectOrgID(context.Background(), "1")
got, err := DefaultCodec.EncodeRequest(ctx, &queryrangebase.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)

ctx := context.Background()
toEncode := &LokiRequest{
Query: `{foo="bar"}`,
Limit: 200,
Expand Down Expand Up @@ -637,11 +642,11 @@ func Test_codec_EncodeRequest(t *testing.T) {
}

func Test_codec_series_EncodeRequest(t *testing.T) {
got, err := DefaultCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{})
ctx := user.InjectOrgID(context.Background(), "1")
got, err := DefaultCodec.EncodeRequest(ctx, &queryrangebase.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)

ctx := context.Background()
toEncode := &LokiSeriesRequest{
Match: []string{`{foo="bar"}`},
Path: "/series",
Expand All @@ -666,7 +671,7 @@ func Test_codec_series_EncodeRequest(t *testing.T) {
}

func Test_codec_labels_EncodeRequest(t *testing.T) {
ctx := context.Background()
ctx := user.InjectOrgID(context.Background(), "1")
toEncode := NewLabelRequest(start, end, "", "", "/loki/api/v1/labels")
got, err := DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
Expand Down Expand Up @@ -703,7 +708,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
}

func Test_codec_labels_DecodeRequest(t *testing.T) {
ctx := context.Background()
ctx := user.InjectOrgID(context.Background(), "1")
u, err := url.Parse(`/loki/api/v1/label/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

Expand Down Expand Up @@ -732,7 +737,8 @@ func Test_codec_index_stats_EncodeRequest(t *testing.T) {
Through: through,
Matchers: `{job="foo"}`,
}
got, err := DefaultCodec.EncodeRequest(context.Background(), toEncode)
ctx := user.InjectOrgID(context.Background(), "1")
got, err := DefaultCodec.EncodeRequest(ctx, toEncode)
require.Nil(t, err)
require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end"))
Expand All @@ -749,7 +755,8 @@ func Test_codec_seriesVolume_EncodeRequest(t *testing.T) {
Step: 30 * 1e6,
TargetLabels: []string{"foo", "bar"},
}
got, err := DefaultCodec.EncodeRequest(context.Background(), toEncode)
ctx := user.InjectOrgID(context.Background(), "1")
got, err := DefaultCodec.EncodeRequest(ctx, toEncode)
require.Nil(t, err)
require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end"))
Expand All @@ -760,14 +767,15 @@ func Test_codec_seriesVolume_EncodeRequest(t *testing.T) {
}

func Test_codec_seriesVolume_DecodeRequest(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
t.Run("instant queries set a step of 0", func(t *testing.T) {

req := httptest.NewRequest(http.MethodGet, "/loki/api/v1/index/volume"+
"?start=0"+
"&end=1"+
"&step=42"+
"&query=%7Bfoo%3D%22bar%22%7D", nil)
got, err := DefaultCodec.DecodeRequest(context.Background(), req, nil)
got, err := DefaultCodec.DecodeRequest(ctx, req, nil)
require.NoError(t, err)

require.Equal(t, int64(0), got.(*logproto.VolumeRequest).Step)
Expand All @@ -779,7 +787,7 @@ func Test_codec_seriesVolume_DecodeRequest(t *testing.T) {
"&end=1"+
"&step=42"+
"&query=%7Bfoo%3D%22bar%22%7D", nil)
got, err := DefaultCodec.DecodeRequest(context.Background(), req, nil)
got, err := DefaultCodec.DecodeRequest(ctx, req, nil)
require.NoError(t, err)

require.Equal(t, (42 * time.Second).Milliseconds(), got.(*logproto.VolumeRequest).Step)
Expand All @@ -790,7 +798,7 @@ func Test_codec_seriesVolume_DecodeRequest(t *testing.T) {
"?start=0"+
"&end=1"+
"&query=%7Bfoo%3D%22bar%22%7D", nil)
got, err := DefaultCodec.DecodeRequest(context.Background(), req, nil)
got, err := DefaultCodec.DecodeRequest(ctx, req, nil)
require.NoError(t, err)

require.Equal(t, time.Second.Milliseconds(), got.(*logproto.VolumeRequest).Step)
Expand Down Expand Up @@ -925,7 +933,8 @@ func Test_codec_EncodeResponse(t *testing.T) {
URL: u,
Header: h,
}
got, err := DefaultCodec.EncodeResponse(context.TODO(), req, tt.res)
ctx := user.InjectOrgID(context.Background(), "1")
got, err := DefaultCodec.EncodeResponse(ctx, req, tt.res)
if (err != nil) != tt.wantErr {
t.Errorf("codec.EncodeResponse() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -1424,6 +1433,7 @@ func (badResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader { ret
func (b badResponse) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
return b
}
func (badResponse) SetHeader(string, string) {}

type badReader struct{}

Expand Down
16 changes: 16 additions & 0 deletions pkg/querier/queryrange/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/middleware"
"github.com/opentracing/opentracing-go"

"github.com/grafana/dskit/server"

Expand Down Expand Up @@ -52,3 +53,18 @@ func (i Instrument) observe(ctx context.Context, route string, err error, durati
}
instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(method, route, respStatus, "false"), duration.Seconds())
}

type Tracer struct{}

var _ queryrangebase.Middleware = Tracer{}

// Wrap implements the queryrangebase.Middleware
func (t Tracer) Wrap(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
route := DefaultCodec.Path(r)
route = middleware.MakeLabelValue(route)
span, ctx := opentracing.StartSpanFromContext(ctx, route)
defer span.Finish()
return next.Do(ctx, r)
})
}

0 comments on commit 5ab5b8f

Please sign in to comment.