diff --git a/CHANGELOG.md b/CHANGELOG.md index 35bd5ac679e..41a58dbc808 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn) * [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno) * [ENHANCEMENT] Improve serverless handler error messages [#1305](https://github.com/grafana/tempo/pull/1305) (@joe-elliott) +* [ENHANCEMENT] Added a configuration option `search_prefer_self` to allow the queriers to do some work while also leveraging serverless in search. [#1307](https://github.com/grafana/tempo/pull/1307) (@joe-elliott) * [ENHANCEMENT] Make trace combination/compaction more efficient [#1291](https://github.com/grafana/tempo/pull/1291) (@mdisibio) * [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245) * [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index c58468d67e3..8d7ee608744 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -254,6 +254,19 @@ querier: # The default value of "" disables this feature. [search_external_endpoints: | default = ] + # If search_external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend + # you have configured. This setting allows the operator to have the querier prefer itself for a configurable + # number of subqueries. In the default case of 2 the querier will process up to 2 search requests subqueries before starting + # to reach out to search_external_endpoints. + # Setting this to 0 will disable this feature and the querier will proxy all search subqueries to search_external_endpoints. + [search_prefer_self: | default = 2 ] + + # The query frontend turns both trace by id (/api/traces/) and search (/api/search?) requests + # into subqueries that are then pulled and serviced by the queriers. + # This value controls the overall number of simultaneous subqueries that the querier will service at once. It does + # not distinguish between the types of queries. + [max_concurrent_queries: | default = 5] + # config of the worker that connects to the query frontend frontend_worker: diff --git a/go.mod b/go.mod index bc1802bd929..b858a7b1895 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( go.uber.org/goleak v1.1.12 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.1 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 google.golang.org/api v0.61.0 google.golang.org/grpc v1.42.0 diff --git a/modules/querier/config.go b/modules/querier/config.go index e400fc6dd68..66250503078 100644 --- a/modules/querier/config.go +++ b/modules/querier/config.go @@ -14,6 +14,7 @@ type Config struct { TraceLookupQueryTimeout time.Duration `yaml:"query_timeout"` SearchQueryTimeout time.Duration `yaml:"search_query_timeout"` SearchExternalEndpoints []string `yaml:"search_external_endpoints"` + SearchPreferSelf int `yaml:"search_prefer_self"` ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` MaxConcurrentQueries int `yaml:"max_concurrent_queries"` Worker worker.Config `yaml:"frontend_worker"` @@ -25,6 +26,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.SearchQueryTimeout = 30 * time.Second cfg.ExtraQueryDelay = 0 cfg.MaxConcurrentQueries = 5 + cfg.SearchPreferSelf = 2 cfg.Worker = worker.Config{ MatchMaxConcurrency: true, MaxConcurrentRequests: cfg.MaxConcurrentQueries, diff --git a/modules/querier/querier.go b/modules/querier/querier.go index b3194d9c33f..ebfa60dfdb9 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -25,6 +25,7 @@ import ( httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/user" "go.uber.org/multierr" + "golang.org/x/sync/semaphore" ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" @@ -66,6 +67,8 @@ type Querier struct { store storage.Store limits *overrides.Overrides + searchPreferSelf *semaphore.Weighted + subservices *services.Manager subservicesWatcher *services.FailureWatcher } @@ -90,8 +93,9 @@ func New(cfg Config, clientCfg ingester_client.Config, ring ring.ReadRing, store factory, metricIngesterClients, log.Logger), - store: store, - limits: limits, + store: store, + limits: limits, + searchPreferSelf: semaphore.NewWeighted(int64(cfg.SearchPreferSelf)), } q.Service = services.NewBasicService(q.starting, q.running, q.stopping) @@ -395,13 +399,23 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal // SearchBlock searches the specified subset of the block for the passed tags. func (q *Querier) SearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) { - // todo: if the querier is not currently doing anything it should prefer handling the request itself to - // offloading to the external endpoint - if len(q.cfg.SearchExternalEndpoints) != 0 { - endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))] - return searchExternalEndpoint(ctx, endpoint, req) + // if we have no external configuration always search in the querier + if len(q.cfg.SearchExternalEndpoints) == 0 { + return q.internalSearchBlock(ctx, req) + } + + // if we have external configuration but there's an open slot locally then search in the querier + if q.searchPreferSelf.TryAcquire(1) { + defer q.searchPreferSelf.Release(1) + return q.internalSearchBlock(ctx, req) } + // proxy externally! + endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))] + return searchExternalEndpoint(ctx, endpoint, req) +} + +func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) { tenantID, err := user.ExtractOrgID(ctx) if err != nil { return nil, errors.Wrap(err, "error extracting org id in Querier.BackendSearch") diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go new file mode 100644 index 00000000000..edaae7aad0a --- /dev/null +++ b/modules/querier/querier_test.go @@ -0,0 +1,74 @@ +package querier + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/grafana/tempo/modules/ingester/client" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/stretchr/testify/require" + "github.com/uber-go/atomic" + "github.com/weaveworks/common/user" +) + +func TestQuerierUsesSearchExternalEndpoint(t *testing.T) { + numExternalRequests := atomic.NewInt32(0) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + numExternalRequests.Inc() + })) + defer srv.Close() + + tests := []struct { + cfg Config + queriesToExecute int + externalExpected int32 + }{ + // SearchExternalEndpoints is respected + { + cfg: Config{ + SearchExternalEndpoints: []string{srv.URL}, + }, + queriesToExecute: 3, + externalExpected: 3, + }, + // No SearchExternalEndpoints causes the querier to service everything internally + { + cfg: Config{}, + queriesToExecute: 3, + externalExpected: 0, + }, + // SearchPreferSelf is respected. this test won't pass b/c SearchBlock fails instantly and so + // all 3 queries are executed locally and nothing is proxied to the external endpoint. + // we'd have to mock the storage.Store interface to get this to pass. it's a big interface. + // { + // cfg: Config{ + // SearchExternalEndpoints: []string{srv.URL}, + // SearchPreferSelf: 2, + // }, + // queriesToExecute: 3, + // externalExpected: 1, + // }, + } + + ctx := user.InjectOrgID(context.Background(), "blerg") + + for _, tc := range tests { + numExternalRequests.Store(0) + + q, err := New(tc.cfg, client.Config{}, nil, nil, nil) + require.NoError(t, err) + + for i := 0; i < tc.queriesToExecute; i++ { + // ignore error purposefully here. all queries will error, but we don't care + // numExternalRequests will tell us what we need to know + _, _ = q.SearchBlock(ctx, &tempopb.SearchBlockRequest{}) + } + + require.Equal(t, tc.externalExpected, numExternalRequests.Load()) + } +} diff --git a/modules/querier/worker/worker.go b/modules/querier/worker/worker.go index 7f55ae302fa..e1279e9bd61 100644 --- a/modules/querier/worker/worker.go +++ b/modules/querier/worker/worker.go @@ -26,7 +26,7 @@ type Config struct { Parallelism int `yaml:"parallelism"` MatchMaxConcurrency bool `yaml:"match_max_concurrent"` - MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine. + MaxConcurrentRequests int `yaml:"-"` QuerierID string `yaml:"id"` diff --git a/vendor/golang.org/x/sync/AUTHORS b/vendor/golang.org/x/sync/AUTHORS new file mode 100644 index 00000000000..15167cd746c --- /dev/null +++ b/vendor/golang.org/x/sync/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/sync/CONTRIBUTORS b/vendor/golang.org/x/sync/CONTRIBUTORS new file mode 100644 index 00000000000..1c4577e9680 --- /dev/null +++ b/vendor/golang.org/x/sync/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000000..6a66aea5eaf --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000000..733099041f8 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 00000000000..30f632c577b --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,136 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking until resources +// are available or ctx is done. On success, returns nil. On failure, returns +// ctx.Err() and leaves the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5c39f2669b7..0637ddd6add 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1146,6 +1146,9 @@ golang.org/x/oauth2/google/internal/externalaccount golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt +# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c +## explicit +golang.org/x/sync/semaphore # golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 ## explicit; go 1.17 golang.org/x/sys/cpu