diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 944a2ccdbe..f7568f0313 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -55,6 +55,7 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/targets" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/ui" ) @@ -218,6 +219,10 @@ func registerQuery(app *extkingpin.App) { queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Float64List() queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Float64List() + tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).Hidden().String() + defaultTenant := cmd.Flag("query.default-tenant", "Name of the default tenant.").Default(tenancy.DefaultTenant).Hidden().String() + tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Hidden().Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) + var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -337,6 +342,9 @@ func registerQuery(app *extkingpin.App) { *defaultEngine, storeRateLimits, queryMode(*promqlQueryMode), + *tenantHeader, + *defaultTenant, + *tenantCertField, ) }) } @@ -413,6 +421,9 @@ func runQuery( defaultEngine string, storeRateLimits store.SeriesSelectLimits, queryMode queryMode, + tenantHeader string, + defaultTenant string, + tenantCertField string, ) error { if alertQueryURL == "" { lastColon := strings.LastIndex(httpBindAddr, ":") @@ -747,6 +758,9 @@ func runQuery( queryTelemetrySeriesQuantiles, ), reg, + tenantHeader, + defaultTenant, + tenantCertField, ) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e77af7b034..5f9405b4b4 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -876,7 +876,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(tenancy.DefaultTenantHeader).StringVar(&rc.tenantHeader) - cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName) + cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(tenancy.DefaultTenant).StringVar(&rc.defaultTenantID) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 9109cbb54e..5264328845 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -61,6 +61,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -161,6 +162,10 @@ type QueryAPI struct { queryRangeHist prometheus.Histogram seriesStatsAggregator seriesQueryPerformanceMetricsAggregator + + tenantHeader string + defaultTenant string + tenantCertField string } type seriesQueryPerformanceMetricsAggregator interface { @@ -196,6 +201,9 @@ func NewQueryAPI( gate gate.Gate, statsAggregator seriesQueryPerformanceMetricsAggregator, reg *prometheus.Registry, + tenantHeader string, + defaultTenant string, + tenantCertField string, ) *QueryAPI { if statsAggregator == nil { statsAggregator = &store.NoopSeriesStatsAggregator{} @@ -226,6 +234,9 @@ func NewQueryAPI( defaultMetadataTimeRange: defaultMetadataTimeRange, disableCORS: disableCORS, seriesStatsAggregator: statsAggregator, + tenantHeader: tenantHeader, + defaultTenant: defaultTenant, + tenantCertField: tenantCertField, queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "thanos_query_range_requested_timespan_duration_seconds", @@ -505,6 +516,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro lookbackDelta = lookbackDeltaFromReq } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + // We are starting promQL tracing span here, because we have no control over promQL code. span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() @@ -665,6 +683,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap lookbackDelta = lookbackDeltaFromReq } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + // Record the query range requested. qapi.queryRangeHist.Observe(end.Sub(start).Seconds()) @@ -770,6 +795,13 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A matcherSets = append(matcherSets, matchers) } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField) + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) + q, err := qapi.queryableCreate( true, nil, @@ -866,6 +898,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr, func() {} } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "") + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant) + q, err := qapi.queryableCreate( enableDedup, replicaLabels, @@ -876,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr true, nil, query.NoopSeriesStatsReporter, - ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} @@ -926,6 +965,13 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap matcherSets = append(matcherSets, matchers) } + tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "") + if err != nil { + apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} + return nil, nil, apiErr, func() {} + } + ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant) + q, err := qapi.queryableCreate( true, nil, @@ -936,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap true, nil, query.NoopSeriesStatsReporter, - ).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 9fcf71c6fc..fec27108a3 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -203,6 +203,8 @@ func TestQueryEndpoints(t *testing.T) { Name: "query_range_hist", }), seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } start := time.Unix(0, 0) @@ -744,6 +746,8 @@ func TestMetadataEndpoints(t *testing.T) { Name: "query_range_hist", }), seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } apiWithLabelLookback := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ @@ -759,6 +763,8 @@ func TestMetadataEndpoints(t *testing.T) { Name: "query_range_hist", }), seriesStatsAggregator: &store.NoopSeriesStatsAggregator{}, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } var tests = []endpointTestCase{ @@ -1229,6 +1235,8 @@ func TestStoresEndpoint(t *testing.T) { }, } }, + tenantHeader: "thanos-tenant", + defaultTenant: "default-tenant", } apiWithInvalidEndpoint := &QueryAPI{ endpointStatus: func() []query.EndpointStatus { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index cbc7ec39f5..8f66837432 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -340,6 +341,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. // Currently streaming won't help due to nature of the both PromQL engine which @@ -419,6 +421,7 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) if err != nil { @@ -452,6 +455,7 @@ func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) + ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) if err != nil { diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 569404630f..cc89748726 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -64,13 +64,6 @@ const ( labelError = "error" ) -// Allowed fields in client certificates. -const ( - CertificateFieldOrganization = "organization" - CertificateFieldOrganizationalUnit = "organizationalUnit" - CertificateFieldCommonName = "commonName" -) - var ( // errConflict is returned whenever an operation fails due to any conflict-type error. errConflict = errors.New("conflict") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d2bac75f3e..d53825c7f2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -58,6 +58,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -1229,6 +1230,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer s.queryGate.Done() } + tenant, err := tenancy.GetTenantFromGRPCMetadata(srv.Context()) + if err != nil { + level.Warn(s.logger).Log("msg", err) + } + level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant) + matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -1478,6 +1485,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error()) } + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + if err != nil { + level.Warn(s.logger).Log("msg", err) + } + level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant) + resHints := &hintspb.LabelNamesResponseHints{} var reqBlockMatchers []*labels.Matcher @@ -1666,6 +1679,12 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error()) } + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + if err != nil { + level.Warn(s.logger).Log("msg", err) + } + level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant) + resHints := &hintspb.LabelValuesResponseHints{} g, gctx := errgroup.WithContext(ctx) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 7ff18892d9..0b38ae736e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -21,6 +21,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" @@ -28,6 +29,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" + "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -302,10 +304,24 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, } + // We may arrive here either via the promql engine + // or as a result of a grpc call in layered queries + ctx := srv.Context() + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + if err != nil { + level.Debug(s.logger).Log("msg", "using tenant from context instead of metadata") + if ctx.Value(tenancy.TenantKey) != nil { + tenant = ctx.Value(tenancy.TenantKey).(string) + } + } + + ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant) + stores := []Client{} for _, st := range s.stores() { // We might be able to skip the store if its meta information indicates it cannot have series matching our query. - if ok, reason := storeMatches(srv.Context(), st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok { + if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok { if s.debugLogging { storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason)) } @@ -328,7 +344,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) } - respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) + respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) if err != nil { level.Error(reqLogger).Log("err", err) @@ -449,6 +465,19 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques storeDebugMsgs []string ) + // We may arrive here either via the promql engine + // or as a result of a grpc call in layered queries + tenant, err := tenancy.GetTenantFromGRPCMetadata(gctx) + if err != nil { + level.Debug(s.logger).Log("msg", "using tenant from context instead of metadata") + if gctx.Value(tenancy.TenantKey) != nil { + tenant = gctx.Value(tenancy.TenantKey).(string) + } + } + + gctx = metadata.AppendToOutgoingContext(gctx, tenancy.DefaultTenantHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant) + for _, st := range s.stores() { st := st @@ -515,6 +544,19 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ span opentracing.Span ) + // We may arrive here either via the promql engine + // or as a result of a grpc call in layered queries + tenant, err := tenancy.GetTenantFromGRPCMetadata(gctx) + if err != nil { + level.Debug(s.logger).Log("msg", "using tenant from context instead of metadata") + if gctx.Value(tenancy.TenantKey) != nil { + tenant = gctx.Value(tenancy.TenantKey).(string) + } + } + + gctx = metadata.AppendToOutgoingContext(gctx, tenancy.DefaultTenantHeader, tenant) + level.Debug(s.logger).Log("msg", "Tenant info in LabelValues()", "tenant", tenant) + for _, st := range s.stores() { st := st diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index f2c4aba47a..25f3e84102 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,7 +6,7 @@ package store import ( "context" "fmt" - "io" + "math" "math/rand" "os" @@ -1796,8 +1796,7 @@ func (s *mockedStoreAPI) Info(context.Context, *storepb.InfoRequest, ...grpc.Cal func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { s.LastSeriesReq = req - - return &StoreSeriesClient{injectedErrorIndex: s.injectedErrorIndex, injectedError: s.injectedError, ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError + return &storetestutil.StoreSeriesClient{InjectedErrorIndex: s.injectedErrorIndex, InjectedError: s.injectedError, Ctx: ctx, RespSet: s.RespSeries, RespDur: s.RespDuration, SlowSeriesIndex: s.SlowSeriesIndex}, s.RespError } func (s *mockedStoreAPI) LabelNames(_ context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -1812,45 +1811,6 @@ func (s *mockedStoreAPI) LabelValues(_ context.Context, req *storepb.LabelValues return s.RespLabelValues, s.RespError } -// StoreSeriesClient is test gRPC storeAPI series client. -type StoreSeriesClient struct { - // This field just exist to pseudo-implement the unused methods of the interface. - storepb.Store_SeriesClient - ctx context.Context - i int - respSet []*storepb.SeriesResponse - respDur time.Duration - slowSeriesIndex int - - injectedError error - injectedErrorIndex int -} - -func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { - if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) { - select { - case <-time.After(c.respDur): - case <-c.ctx.Done(): - return nil, c.ctx.Err() - } - } - if c.injectedError != nil && (c.injectedErrorIndex == c.i || c.injectedErrorIndex == 0) { - return nil, c.injectedError - } - - if c.i >= len(c.respSet) { - return nil, io.EOF - } - s := c.respSet[c.i] - c.i++ - - return s, nil -} - -func (c *StoreSeriesClient) Context() context.Context { - return c.ctx -} - // storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse { var s storepb.Series diff --git a/pkg/store/storepb/testutil/store_series_client.go b/pkg/store/storepb/testutil/store_series_client.go new file mode 100644 index 0000000000..3647901500 --- /dev/null +++ b/pkg/store/storepb/testutil/store_series_client.go @@ -0,0 +1,51 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storetestutil + +import ( + "context" + "io" + "time" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// StoreSeriesClient is test gRPC storeAPI series client. +type StoreSeriesClient struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesClient + Ctx context.Context + i int + RespSet []*storepb.SeriesResponse + RespDur time.Duration + SlowSeriesIndex int + + InjectedError error + InjectedErrorIndex int +} + +func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { + if c.RespDur != 0 && (c.SlowSeriesIndex == c.i || c.SlowSeriesIndex == 0) { + select { + case <-time.After(c.RespDur): + case <-c.Ctx.Done(): + return nil, c.Ctx.Err() + } + } + if c.InjectedError != nil && (c.InjectedErrorIndex == c.i || c.InjectedErrorIndex == 0) { + return nil, c.InjectedError + } + + if c.i >= len(c.RespSet) { + return nil, io.EOF + } + s := c.RespSet[c.i] + c.i++ + + return s, nil +} + +func (c *StoreSeriesClient) Context() context.Context { + return c.Ctx +} diff --git a/pkg/tenancy/tenancy.go b/pkg/tenancy/tenancy.go index 0b4885855c..46f91eddff 100644 --- a/pkg/tenancy/tenancy.go +++ b/pkg/tenancy/tenancy.go @@ -4,12 +4,17 @@ package tenancy import ( + "context" "net/http" "path" + "google.golang.org/grpc/metadata" + "github.com/pkg/errors" ) +type contextKey int + const ( // DefaultTenantHeader is the default header used to designate the tenant making a request. DefaultTenantHeader = "THANOS-TENANT" @@ -17,6 +22,8 @@ const ( DefaultTenant = "default-tenant" // DefaultTenantLabel is the default label-name with which the tenant is announced in stored metrics. DefaultTenantLabel = "tenant_id" + // This key is used to pass tenant information using Context. + TenantKey contextKey = 0 ) // Allowed fields in client certificates. @@ -94,3 +101,11 @@ func getTenantFromCertificate(r *http.Request, certTenantField string) (string, return tenant, nil } + +func GetTenantFromGRPCMetadata(ctx context.Context) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok || len(md.Get(DefaultTenantHeader)) == 0 { + return DefaultTenant, errors.Errorf("could not get tenant from grpc metadata, using default: %s", DefaultTenantHeader) + } + return md.Get(DefaultTenantHeader)[0], nil +} diff --git a/pkg/tenancy/tenancy_test.go b/pkg/tenancy/tenancy_test.go new file mode 100644 index 0000000000..5134a731ec --- /dev/null +++ b/pkg/tenancy/tenancy_test.go @@ -0,0 +1,198 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package tenancy_test + +import ( + "context" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/tenancy" + + "github.com/pkg/errors" + + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" +) + +// mockedStoreAPI is test gRPC store API client. +type mockedStoreAPI struct { + RespSeries []*storepb.SeriesResponse + RespLabelValues *storepb.LabelValuesResponse + RespLabelNames *storepb.LabelNamesResponse + RespError error + RespDuration time.Duration + // Index of series in store to slow response. + SlowSeriesIndex int + + LastSeriesReq *storepb.SeriesRequest + LastLabelValuesReq *storepb.LabelValuesRequest + LastLabelNamesReq *storepb.LabelNamesRequest + + t *testing.T +} + +// storeSeriesServer is test gRPC storeAPI series server. +type storeSeriesServer struct { + storepb.Store_SeriesServer + + ctx context.Context +} + +func (s *storeSeriesServer) Context() context.Context { + return s.ctx +} + +const testTenant = "test-tenant" + +func getAndAssertTenant(ctx context.Context, t *testing.T) { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok || len(md.Get(tenancy.DefaultTenantHeader)) == 0 { + testutil.Ok(t, errors.Errorf("could not get tenant from grpc metadata, using default: %s", tenancy.DefaultTenantHeader)) + } + tenant := md.Get(tenancy.DefaultTenantHeader)[0] + testutil.Assert(t, tenant == testTenant) +} + +func (s *mockedStoreAPI) Info(context.Context, *storepb.InfoRequest, ...grpc.CallOption) (*storepb.InfoResponse, error) { + return nil, status.Error(codes.Unimplemented, "not implemented") +} + +func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { + getAndAssertTenant(ctx, s.t) + + return &storetestutil.StoreSeriesClient{Ctx: ctx, RespSet: s.RespSeries, RespDur: s.RespDuration, SlowSeriesIndex: s.SlowSeriesIndex}, s.RespError +} + +func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { + getAndAssertTenant(ctx, s.t) + + return s.RespLabelNames, s.RespError +} + +func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) { + getAndAssertTenant(ctx, s.t) + + return s.RespLabelValues, s.RespError +} + +func TestTenantFromGRPC(t *testing.T) { + t.Run("tenant-present", func(t *testing.T) { + + ctx := context.Background() + md := metadata.New(map[string]string{tenancy.DefaultTenantHeader: testTenant}) + ctx = metadata.NewIncomingContext(ctx, md) + + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + testutil.Ok(t, err) + testutil.Assert(t, tenant == testTenant) + }) + t.Run("no-tenant", func(t *testing.T) { + + ctx := context.Background() + + tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx) + testutil.NotOk(t, err) + testutil.Assert(t, tenant == tenancy.DefaultTenant) + }) +} + +func TestTenantProxyPassing(t *testing.T) { + // When a querier requests info from a store + // the tenant information is passed from the query api + // to the proxy via context. This test ensures + // proxy store correct places the tenant into the + // outgoing grpc metadata + t.Run("tenant-via-context", func(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctx = context.WithValue(ctx, tenancy.TenantKey, testTenant) + + mockedStore := &mockedStoreAPI{ + RespLabelValues: &storepb.LabelValuesResponse{ + Values: []string{"1", "2"}, + Warnings: []string{"warning"}, + }, + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + t: t, + } + + cls := []store.Client{ + &storetestutil.TestClient{StoreClient: mockedStore}, + } + + q := store.NewProxyStore(nil, + nil, + func() []store.Client { return cls }, + component.Query, + nil, 0*time.Second, store.EagerRetrieval, + ) + // We assert directly in the mocked store apis LabelValues/LabelNames/Series funcs + _, _ = q.LabelValues(ctx, &storepb.LabelValuesRequest{}) + _, _ = q.LabelNames(ctx, &storepb.LabelNamesRequest{}) + + seriesMatchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + } + + _ = q.Series(&storepb.SeriesRequest{Matchers: seriesMatchers}, &storeSeriesServer{ctx: ctx}) + }) + + // In the case of nested queriers, the 2nd querier + // will get the tenant via grpc metadata from the 1st querier. + // This test ensures that the proxy store of the 2nd querier + // correctly places the tenant information in the outgoing + // grpc metadata to be sent to its stores. + t.Run("tenant-via-grpc", func(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + md := metadata.New(map[string]string{tenancy.DefaultTenantHeader: testTenant}) + ctx = metadata.NewIncomingContext(ctx, md) + + mockedStore := &mockedStoreAPI{ + RespLabelValues: &storepb.LabelValuesResponse{ + Values: []string{"1", "2"}, + Warnings: []string{"warning"}, + }, + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + t: t, + } + + cls := []store.Client{ + &storetestutil.TestClient{StoreClient: mockedStore}, + } + + q := store.NewProxyStore(nil, + nil, + func() []store.Client { return cls }, + component.Query, + nil, 0*time.Second, store.EagerRetrieval, + ) + + // We assert directly in the mocked store apis LabelValues/LabelNames/Series funcs + _, _ = q.LabelValues(ctx, &storepb.LabelValuesRequest{}) + _, _ = q.LabelNames(ctx, &storepb.LabelNamesRequest{}) + + seriesMatchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + } + + _ = q.Series(&storepb.SeriesRequest{Matchers: seriesMatchers}, &storeSeriesServer{ctx: ctx}) + }) +}