From dce0794987154705d8d2bd4a41a4e284e951f21d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacob=20Baung=C3=A5rd=20Hansen?= Date: Thu, 20 Jul 2023 09:28:32 +0200 Subject: [PATCH] Query: Forward tenant information via StoreAPI (#6530) * Querier: Forward tenant information downstream With this commit we attach tenant information to each query request and forward it via the StoreAPI to any downstream Store Gateways and Queriers. We add the following command lines options which mimics the tenant functionality in Receive. The options are currently hidden, as they provide no real functionality yet. This will come in future steps. --query.tenant-header --query.default-tenant --query.tenant-certificate Signed-off-by: Jacob Baungard Hansen * Receive: Use CertificateField from Tenancy pkg These consts are now defined in the Tenancy package, so we should use those instead. Signed-off-by: Jacob Baungard Hansen --------- Signed-off-by: Jacob Baungard Hansen --- cmd/thanos/query.go | 14 ++ cmd/thanos/receive.go | 2 +- pkg/api/query/v1.go | 50 ++++- pkg/api/query/v1_test.go | 8 + pkg/query/querier.go | 4 + pkg/receive/handler.go | 7 - pkg/store/bucket.go | 19 ++ pkg/store/proxy.go | 46 +++- pkg/store/proxy_test.go | 44 +--- .../storepb/testutil/store_series_client.go | 51 +++++ pkg/tenancy/tenancy.go | 15 ++ pkg/tenancy/tenancy_test.go | 198 ++++++++++++++++++ 12 files changed, 404 insertions(+), 54 deletions(-) create mode 100644 pkg/store/storepb/testutil/store_series_client.go create mode 100644 pkg/tenancy/tenancy_test.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 944a2ccdbe..f7568f0313 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -55,6 +55,7 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/targets" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/ui" ) @@ -218,6 +219,10 @@ func registerQuery(app *extkingpin.App) { queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Float64List() queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Float64List() + tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).Hidden().String() + defaultTenant := cmd.Flag("query.default-tenant", "Name of the default tenant.").Default(tenancy.DefaultTenant).Hidden().String() + tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Hidden().Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) + var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -337,6 +342,9 @@ func registerQuery(app *extkingpin.App) { *defaultEngine, storeRateLimits, queryMode(*promqlQueryMode), + *tenantHeader, + *defaultTenant, + *tenantCertField, ) }) } @@ -413,6 +421,9 @@ func runQuery( defaultEngine string, storeRateLimits store.SeriesSelectLimits, queryMode queryMode, + tenantHeader string, + defaultTenant string, + tenantCertField string, ) error { if alertQueryURL == "" { lastColon := strings.LastIndex(httpBindAddr, ":") @@ -747,6 +758,9 @@ func runQuery( queryTelemetrySeriesQuantiles, ), reg, + tenantHeader, + defaultTenant, + tenantCertField, ) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e77af7b034..5f9405b4b4 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -876,7 +876,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(tenancy.DefaultTenantHeader).StringVar(&rc.tenantHeader) - cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName) + cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(tenancy.DefaultTenant).StringVar(&rc.defaultTenantID) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 9109cbb54e..5264328845 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -61,6 +61,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -161,6 +162,10 @@ type QueryAPI struct { queryRangeHist prometheus.Histogram seriesStatsAggregator seriesQueryPerformanceMetricsAggregator + + tenantHeader string + defaultTenant string + tenantCertField string } type seriesQueryPerformanceMetricsAggregator interface { @@ -196,6 +201,9 @@ func NewQueryAPI( gate gate.Gate, statsAggregator seriesQueryPerformanceMetricsAggregator, reg *prometheus.Registry, + tenantHeader string, + defaultTenant string, + tenantCertField string, ) *QueryAPI { if statsAggregator == nil { statsAggregator = &store.NoopSeriesStatsAggregator{} @@ -226,6 +234,9 @@ func NewQueryAPI( defaultMetadataTimeRange: defaultMetadataTimeRange, disableCORS: disableCORS, seriesStatsAggregator: statsAggregator, + tenantHeader: tenantHeader, + defaultTenant: defaultTenant, + tenantCertField: tenantCertField, queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_query_range_requested_timespan_duration_seconds", @@ -505,6 +516,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro lookbackDelta = lookbackDeltaFromReq } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() @@ -665,6 +683,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap lookbackDelta = lookbackDeltaFromReq } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + // Record the query range requested. qapi.queryRangeHist.Observe(end.Sub(start).Seconds()) @@ -770,6 +795,13 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A matcherSets = append(matcherSets, matchers) } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + q, err := qapi.queryableCreate( true, nil, @@ -866,6 +898,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr, func() {} } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "") + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant) + q, err := qapi.queryableCreate( enableDedup, replicaLabels, @@ -876,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr true, nil, query.NoopSeriesStatsReporter, - ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} @@ -926,6 +965,13 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap matcherSets = append(matcherSets, matchers) } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "") + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant) + q, err := qapi.queryableCreate( true, nil, @@ -936,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap true, nil, query.NoopSeriesStatsReporter, - ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 9fcf71c6fc..fec27108a3 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -203,6 +203,8 @@ func TestQueryEndpoints(t *testing.T) { Name: "query_range_hist", }), seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } start := time.Unix(0, 0) @@ -744,6 +746,8 @@ func TestMetadataEndpoints(t *testing.T) { Name: "query_range_hist", }), seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } apiWithLabelLookback := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ @@ -759,6 +763,8 @@ func TestMetadataEndpoints(t *testing.T) { Name: "query_range_hist", }), seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } var tests = []endpointTestCase{ @@ -1229,6 +1235,8 @@ func TestStoresEndpoint(t *testing.T) { }, } }, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } apiWithInvalidEndpoint := &QueryAPI{ endpointStatus: func() []query.EndpointStatus { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index cbc7ec39f5..8f66837432 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -340,6 +341,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. // Currently streaming won't help due to nature of the both PromQL engine which @@ -419,6 +421,7 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) if err != nil { @@ -452,6 +455,7 @@ func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) if err != nil { diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 569404630f..cc89748726 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -64,13 +64,6 @@ const ( labelError = "error" ) -// Allowed fields in client certificates. -const ( - CertificateFieldOrganization = "organization" - CertificateFieldOrganizationalUnit = "organizationalUnit" - CertificateFieldCommonName = "commonName" -) - var ( // errConflict is returned whenever an operation fails due to any conflict-type error. errConflict = errors.New("conflict") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d2bac75f3e..d53825c7f2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -58,6 +58,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -1229,6 +1230,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer s.queryGate.Done() } + tenant, err := tenancy.GetTenantFromGRPCMetadata(srv.Context()) + if err != nil { + level.Warn(s.logger).Log("msg", err) + } + level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant) + matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -1478,6 +1485,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error()) } + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + if err != nil { + level.Warn(s.logger).Log("msg", err) + } + level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant) + resHints := &hintspb.LabelNamesResponseHints{} var reqBlockMatchers []*labels.Matcher @@ -1666,6 +1679,12 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error()) } + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + if err != nil { + level.Warn(s.logger).Log("msg", err) + } + level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant) + resHints := &hintspb.LabelValuesResponseHints{} g, gctx := errgroup.WithContext(ctx) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 7ff18892d9..0b38ae736e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -21,6 +21,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" @@ -28,6 +29,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -302,10 +304,24 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, } + // We may arrive here either via the promql engine + // or as a result of a grpc call in layered queries + ctx := srv.Context() + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + if err != nil { + level.Debug(s.logger).Log("msg", "using tenant from context instead of metadata") + if ctx.Value(tenancy.TenantKey) != nil { + tenant = ctx.Value(tenancy.TenantKey).(string) + } + } + + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant) + stores := []Client{} for _, st := range s.stores() { // We might be able to skip the store if its meta information indicates it cannot have series matching our query. - if ok, reason := storeMatches(srv.Context(), st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok { + if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok { if s.debugLogging { storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason)) } @@ -328,7 +344,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) } - respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) + respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) if err != nil { level.Error(reqLogger).Log("err", err) @@ -449,6 +465,19 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques storeDebugMsgs []string ) + // We may arrive here either via the promql engine + // or as a result of a grpc call in layered queries + tenant, err := tenancy.GetTenantFromGRPCMetadata(gctx) + if err != nil { + level.Debug(s.logger).Log("msg", "using tenant from context instead of metadata") + if gctx.Value(tenancy.TenantKey) != nil { + tenant = gctx.Value(tenancy.TenantKey).(string) + } + } + + gctx = metadata.AppendToOutgoingContext(gctx, tenancy.DefaultTenantHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) + for _, st := range s.stores() { st := st @@ -515,6 +544,19 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ span opentracing.Span ) + // We may arrive here either via the promql engine + // or as a result of a grpc call in layered queries + tenant, err := tenancy.GetTenantFromGRPCMetadata(gctx) + if err != nil { + level.Debug(s.logger).Log("msg", "using tenant from context instead of metadata") + if gctx.Value(tenancy.TenantKey) != nil { + tenant = gctx.Value(tenancy.TenantKey).(string) + } + } + + gctx = metadata.AppendToOutgoingContext(gctx, tenancy.DefaultTenantHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in LabelValues()", "tenant", tenant) + for _, st := range s.stores() { st := st diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index f2c4aba47a..25f3e84102 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,7 +6,7 @@ package store import ( "context" "fmt" - "io" + "math" "math/rand" "os" @@ -1796,8 +1796,7 @@ func (s *mockedStoreAPI) Info(context.Context, *storepb.InfoRequest, ...grpc.Cal func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { s.LastSeriesReq = req - - return &StoreSeriesClient{injectedErrorIndex: s.injectedErrorIndex, injectedError: s.injectedError, ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError + return &storetestutil.StoreSeriesClient{InjectedErrorIndex: s.injectedErrorIndex, InjectedError: s.injectedError, Ctx: ctx, RespSet: s.RespSeries, RespDur: s.RespDuration, SlowSeriesIndex: s.SlowSeriesIndex}, s.RespError } func (s *mockedStoreAPI) LabelNames(_ context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -1812,45 +1811,6 @@ func (s *mockedStoreAPI) LabelValues(_ context.Context, req *storepb.LabelValues return s.RespLabelValues, s.RespError } -// StoreSeriesClient is test gRPC storeAPI series client. -type StoreSeriesClient struct { - // This field just exist to pseudo-implement the unused methods of the interface. - storepb.Store_SeriesClient - ctx context.Context - i int - respSet []*storepb.SeriesResponse - respDur time.Duration - slowSeriesIndex int - - injectedError error - injectedErrorIndex int -} - -func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { - if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) { - select { - case <-time.After(c.respDur): - case <-c.ctx.Done(): - return nil, c.ctx.Err() - } - } - if c.injectedError != nil && (c.injectedErrorIndex == c.i || c.injectedErrorIndex == 0) { - return nil, c.injectedError - } - - if c.i >= len(c.respSet) { - return nil, io.EOF - } - s := c.respSet[c.i] - c.i++ - - return s, nil -} - -func (c *StoreSeriesClient) Context() context.Context { - return c.ctx -} - // storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse { var s storepb.Series diff --git a/pkg/store/storepb/testutil/store_series_client.go b/pkg/store/storepb/testutil/store_series_client.go new file mode 100644 index 0000000000..3647901500 --- /dev/null +++ b/pkg/store/storepb/testutil/store_series_client.go @@ -0,0 +1,51 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storetestutil + +import ( + "context" + "io" + "time" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// StoreSeriesClient is test gRPC storeAPI series client. +type StoreSeriesClient struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesClient + Ctx context.Context + i int + RespSet []*storepb.SeriesResponse + RespDur time.Duration + SlowSeriesIndex int + + InjectedError error + InjectedErrorIndex int +} + +func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { + if c.RespDur != 0 && (c.SlowSeriesIndex == c.i || c.SlowSeriesIndex == 0) { + select { + case <-time.After(c.RespDur): + case <-c.Ctx.Done(): + return nil, c.Ctx.Err() + } + } + if c.InjectedError != nil && (c.InjectedErrorIndex == c.i || c.InjectedErrorIndex == 0) { + return nil, c.InjectedError + } + + if c.i >= len(c.RespSet) { + return nil, io.EOF + } + s := c.RespSet[c.i] + c.i++ + + return s, nil +} + +func (c *StoreSeriesClient) Context() context.Context { + return c.Ctx +} diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index 0b4885855c..46f91eddff 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -4,12 +4,17 @@ package tenancy import ( + "context" "net/http" "path" + "google.golang.org/grpc/metadata" + "github.com/pkg/errors" ) +type contextKey int + const ( // DefaultTenantHeader is the default header used to designate the tenant making a request. DefaultTenantHeader = "THANOS-TENANT" @@ -17,6 +22,8 @@ const ( DefaultTenant = "default-tenant" // DefaultTenantLabel is the default label-name with which the tenant is announced in stored metrics. DefaultTenantLabel = "tenant_id" + // This key is used to pass tenant information using Context. + TenantKey contextKey = 0 ) // Allowed fields in client certificates. @@ -94,3 +101,11 @@ func getTenantFromCertificate(r *http.Request, certTenantField string) (string, return tenant, nil } + +func GetTenantFromGRPCMetadata(ctx context.Context) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok || len(md.Get(DefaultTenantHeader)) == 0 { + return DefaultTenant, errors.Errorf("could not get tenant from grpc metadata, using default: %s", DefaultTenantHeader) + } + return md.Get(DefaultTenantHeader)[0], nil +} diff --git a/pkg/tenancy/tenancy_test.go b/pkg/tenancy/tenancy_test.go new file mode 100644 index 0000000000..5134a731ec --- /dev/null +++ b/pkg/tenancy/tenancy_test.go @@ -0,0 +1,198 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package tenancy_test + +import ( + "context" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" + + "github.com/pkg/errors" + + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" +) + +// mockedStoreAPI is test gRPC store API client. +type mockedStoreAPI struct { + RespSeries []*storepb.SeriesResponse + RespLabelValues *storepb.LabelValuesResponse + RespLabelNames *storepb.LabelNamesResponse + RespError error + RespDuration time.Duration + // Index of series in store to slow response. + SlowSeriesIndex int + + LastSeriesReq *storepb.SeriesRequest + LastLabelValuesReq *storepb.LabelValuesRequest + LastLabelNamesReq *storepb.LabelNamesRequest + + t *testing.T +} + +// storeSeriesServer is test gRPC storeAPI series server. +type storeSeriesServer struct { + storepb.Store_SeriesServer + + ctx context.Context +} + +func (s *storeSeriesServer) Context() context.Context { + return s.ctx +} + +const testTenant = "test-tenant" + +func getAndAssertTenant(ctx context.Context, t *testing.T) { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok || len(md.Get(tenancy.DefaultTenantHeader)) == 0 { + testutil.Ok(t, errors.Errorf("could not get tenant from grpc metadata, using default: %s", tenancy.DefaultTenantHeader)) + } + tenant := md.Get(tenancy.DefaultTenantHeader)[0] + testutil.Assert(t, tenant == testTenant) +} + +func (s *mockedStoreAPI) Info(context.Context, *storepb.InfoRequest, ...grpc.CallOption) (*storepb.InfoResponse, error) { + return nil, status.Error(codes.Unimplemented, "not implemented") +} + +func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { + getAndAssertTenant(ctx, s.t) + + return &storetestutil.StoreSeriesClient{Ctx: ctx, RespSet: s.RespSeries, RespDur: s.RespDuration, SlowSeriesIndex: s.SlowSeriesIndex}, s.RespError +} + +func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + getAndAssertTenant(ctx, s.t) + + return s.RespLabelNames, s.RespError +} + +func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + getAndAssertTenant(ctx, s.t) + + return s.RespLabelValues, s.RespError +} + +func TestTenantFromGRPC(t *testing.T) { + t.Run("tenant-present", func(t *testing.T) { + + ctx := context.Background() + md := metadata.New(map[string]string{tenancy.DefaultTenantHeader: testTenant}) + ctx = metadata.NewIncomingContext(ctx, md) + + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + testutil.Ok(t, err) + testutil.Assert(t, tenant == testTenant) + }) + t.Run("no-tenant", func(t *testing.T) { + + ctx := context.Background() + + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + testutil.NotOk(t, err) + testutil.Assert(t, tenant == tenancy.DefaultTenant) + }) +} + +func TestTenantProxyPassing(t *testing.T) { + // When a querier requests info from a store + // the tenant information is passed from the query api + // to the proxy via context. This test ensures + // proxy store correct places the tenant into the + // outgoing grpc metadata + t.Run("tenant-via-context", func(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctx = context.WithValue(ctx, tenancy.TenantKey, testTenant) + + mockedStore := &mockedStoreAPI{ + RespLabelValues: &storepb.LabelValuesResponse{ + Values: []string{"1", "2"}, + Warnings: []string{"warning"}, + }, + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + t: t, + } + + cls := []store.Client{ + &storetestutil.TestClient{StoreClient: mockedStore}, + } + + q := store.NewProxyStore(nil, + nil, + func() []store.Client { return cls }, + component.Query, + nil, 0*time.Second, store.EagerRetrieval, + ) + // We assert directly in the mocked store apis LabelValues/LabelNames/Series funcs + _, _ = q.LabelValues(ctx, &storepb.LabelValuesRequest{}) + _, _ = q.LabelNames(ctx, &storepb.LabelNamesRequest{}) + + seriesMatchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + } + + _ = q.Series(&storepb.SeriesRequest{Matchers: seriesMatchers}, &storeSeriesServer{ctx: ctx}) + }) + + // In the case of nested queriers, the 2nd querier + // will get the tenant via grpc metadata from the 1st querier. + // This test ensures that the proxy store of the 2nd querier + // correctly places the tenant information in the outgoing + // grpc metadata to be sent to its stores. + t.Run("tenant-via-grpc", func(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + md := metadata.New(map[string]string{tenancy.DefaultTenantHeader: testTenant}) + ctx = metadata.NewIncomingContext(ctx, md) + + mockedStore := &mockedStoreAPI{ + RespLabelValues: &storepb.LabelValuesResponse{ + Values: []string{"1", "2"}, + Warnings: []string{"warning"}, + }, + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + t: t, + } + + cls := []store.Client{ + &storetestutil.TestClient{StoreClient: mockedStore}, + } + + q := store.NewProxyStore(nil, + nil, + func() []store.Client { return cls }, + component.Query, + nil, 0*time.Second, store.EagerRetrieval, + ) + + // We assert directly in the mocked store apis LabelValues/LabelNames/Series funcs + _, _ = q.LabelValues(ctx, &storepb.LabelValuesRequest{}) + _, _ = q.LabelNames(ctx, &storepb.LabelNamesRequest{}) + + seriesMatchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + } + + _ = q.Series(&storepb.SeriesRequest{Matchers: seriesMatchers}, &storeSeriesServer{ctx: ctx}) + }) +}