Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Forward tenant information via StoreAPI #6530

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -218,6 +219,10 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Float64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Float64List()

tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).Hidden().String()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have a tenancy config file instead of a bunch of command line parameters? There are a bunch of tenancy features that people would like to have so it would be easier to extend a config file in the future

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we discussed this during proposal review as well. I believe we decided to follow same Receive convention? Feel free to correct me if I'm wrong cc: @douglascamata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to the specific discussion (I think): #6320 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that for this flag specifically, a configuration file won't be necessary.

For all the per-tenant configuration that it enables (i.e. different query limits, concurrency, etc) should be managed through a file, like we do in Thanos Receive for --receive.limits-config and --receive.tenant-header. This keeps us consistent.

Copy link
Contributor

@douglascamata douglascamata Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential to enable configuration of all the CLI args in a file. But those CLI args that also should have files as content complicate this path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all in for a migration of all tenant configuration to a single file across all components. This would have to be well planned, especially thinking about backwards compatibility. 🤔

defaultTenant := cmd.Flag("query.default-tenant", "Name of the default tenant.").Default(tenancy.DefaultTenant).Hidden().String()
tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Hidden().Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName)

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

Expand Down Expand Up @@ -337,6 +342,9 @@ func registerQuery(app *extkingpin.App) {
*defaultEngine,
storeRateLimits,
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
*tenantCertField,
)
})
}
Expand Down Expand Up @@ -413,6 +421,9 @@ func runQuery(
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
tenantCertField string,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -747,6 +758,9 @@ func runQuery(
queryTelemetrySeriesQuantiles,
),
reg,
tenantHeader,
defaultTenant,
tenantCertField,
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(tenancy.DefaultTenantHeader).StringVar(&rc.tenantHeader)

cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName)
cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName)

cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(tenancy.DefaultTenant).StringVar(&rc.defaultTenantID)

Expand Down
50 changes: 48 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -161,6 +162,10 @@ type QueryAPI struct {
queryRangeHist prometheus.Histogram

seriesStatsAggregator seriesQueryPerformanceMetricsAggregator

tenantHeader string
defaultTenant string
tenantCertField string
}

type seriesQueryPerformanceMetricsAggregator interface {
Expand Down Expand Up @@ -196,6 +201,9 @@ func NewQueryAPI(
gate gate.Gate,
statsAggregator seriesQueryPerformanceMetricsAggregator,
reg *prometheus.Registry,
tenantHeader string,
defaultTenant string,
tenantCertField string,
) *QueryAPI {
if statsAggregator == nil {
statsAggregator = &store.NoopSeriesStatsAggregator{}
Expand Down Expand Up @@ -226,6 +234,9 @@ func NewQueryAPI(
defaultMetadataTimeRange: defaultMetadataTimeRange,
disableCORS: disableCORS,
seriesStatsAggregator: statsAggregator,
tenantHeader: tenantHeader,
defaultTenant: defaultTenant,
tenantCertField: tenantCertField,

queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_query_range_requested_timespan_duration_seconds",
Expand Down Expand Up @@ -505,6 +516,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
lookbackDelta = lookbackDeltaFromReq
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we shouldn't return error here, and instead assume best-effort and just fallback to default-tenant?
This way we don't break existing setups.

ditto for rest of the places this is implemented in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this should break any existing setups (unless someone is already sending tenant-headers which are currently just being ignored). In case the tenant header is empty/non-existing we'll use the default one.

However, if we get a tenant which is invalid (i.e a malicious one per #5969 ), or if one specifically expects a tenant to be found via the certificate, but none is, then we do error out. At least in the case of a malicious tenant being sent, I think that is sensible?

I'm not sure in the case of the certificate, I suspect that one wouldn't have a cert without the configured field, though I am not sure. I re-used the receive implementation here, so the behavior here should be the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I forgot how GetTenantFromHTTP worked, it's fine then.

return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()
Expand Down Expand Up @@ -665,6 +683,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
lookbackDelta = lookbackDeltaFromReq
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

// Record the query range requested.
qapi.queryRangeHist.Observe(end.Sub(start).Seconds())

Expand Down Expand Up @@ -770,6 +795,13 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
true,
nil,
Expand Down Expand Up @@ -866,6 +898,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr, func() {}
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "")
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
enableDedup,
replicaLabels,
Expand All @@ -876,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
Expand Down Expand Up @@ -926,6 +965,13 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "")
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
true,
nil,
Expand All @@ -936,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func TestQueryEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}

start := time.Unix(0, 0)
Expand Down Expand Up @@ -744,6 +746,8 @@ func TestMetadataEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}
apiWithLabelLookback := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Expand All @@ -759,6 +763,8 @@ func TestMetadataEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}

var tests = []endpointTestCase{
Expand Down Expand Up @@ -1229,6 +1235,8 @@ func TestStoresEndpoint(t *testing.T) {
},
}
},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}
apiWithInvalidEndpoint := &QueryAPI{
endpointStatus: func() []query.EndpointStatus {
Expand Down
4 changes: 4 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -340,6 +341,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

// TODO(bwplotka): Use inprocess gRPC when we want to stream responses.
// Currently streaming won't help due to nature of the both PromQL engine which
Expand Down Expand Up @@ -419,6 +421,7 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

pbMatchers, err := storepb.PromMatchersToMatchers(matchers...)
if err != nil {
Expand Down Expand Up @@ -452,6 +455,7 @@ func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

pbMatchers, err := storepb.PromMatchersToMatchers(matchers...)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ const (
labelError = "error"
)

// Allowed fields in client certificates.
const (
CertificateFieldOrganization = "organization"
CertificateFieldOrganizationalUnit = "organizationalUnit"
CertificateFieldCommonName = "commonName"
)

var (
// errConflict is returned whenever an operation fails due to any conflict-type error.
errConflict = errors.New("conflict")
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -1229,6 +1230,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer s.queryGate.Done()
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(srv.Context())
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the tenant value actually used in store gateway? I see it is only used in the logger rn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right for now. Splitting up the full implementation in order to not end up with a very large PR. There's an action plan at the end of the proposal. This PR corresponds to step one of the query component.

level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These debug and warning logs seem too verbose, no? Can we remove those logs in next pr?
It is expected to not have tenant header propagated for some usecase and no need to log it


matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -1478,6 +1485,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant)

resHints := &hintspb.LabelNamesResponseHints{}

var reqBlockMatchers []*labels.Matcher
Expand Down Expand Up @@ -1666,6 +1679,12 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", err)
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}
level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant)

resHints := &hintspb.LabelValuesResponseHints{}

g, gctx := errgroup.WithContext(ctx)
Expand Down
Loading
Loading