Skip to content

Commit

Permalink
Tempo Serverless: Queriers Prefer Self (#1307)
Browse files Browse the repository at this point in the history
* Added max_concurrent_queries to doc

Signed-off-by: Joe Elliott <[email protected]>

* added support for local search self

Signed-off-by: Joe Elliott <[email protected]>

* add functionality and test

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* clarifications

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Feb 28, 2022
1 parent 4b26046 commit 642f15f
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno)
* [ENHANCEMENT] Improve serverless handler error messages [#1305](https://github.com/grafana/tempo/pull/1305) (@joe-elliott)
* [ENHANCEMENT] Added a configuration option `search_prefer_self` to allow the queriers to do some work while also leveraging serverless in search. [#1307](https://github.com/grafana/tempo/pull/1307) (@joe-elliott)
* [ENHANCEMENT] Make trace combination/compaction more efficient [#1291](https://github.com/grafana/tempo/pull/1291) (@mdisibio)
* [ENHANCEMENT] Add Content-Type headers to query-frontend paths [#1306](https://github.com/grafana/tempo/pull/1306) (@wperron)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
Expand Down
13 changes: 13 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ querier:
# The default value of "" disables this feature.
[search_external_endpoints: <list of strings> | default = <empty list>]

# If search_external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend
# you have configured. This setting allows the operator to have the querier prefer itself for a configurable
# number of subqueries. In the default case of 2 the querier will process up to 2 search requests subqueries before starting
# to reach out to search_external_endpoints.
# Setting this to 0 will disable this feature and the querier will proxy all search subqueries to search_external_endpoints.
[search_prefer_self: <int> | default = 2 ]

# The query frontend turns both trace by id (/api/traces/<id>) and search (/api/search?<params>) requests
# into subqueries that are then pulled and serviced by the queriers.
# This value controls the overall number of simultaneous subqueries that the querier will service at once. It does
# not distinguish between the types of queries.
[max_concurrent_queries: <int> | default = 5]

# config of the worker that connects to the query frontend
frontend_worker:

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
go.uber.org/goleak v1.1.12
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
google.golang.org/api v0.61.0
google.golang.org/grpc v1.42.0
Expand Down
2 changes: 2 additions & 0 deletions modules/querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
TraceLookupQueryTimeout time.Duration `yaml:"query_timeout"`
SearchQueryTimeout time.Duration `yaml:"search_query_timeout"`
SearchExternalEndpoints []string `yaml:"search_external_endpoints"`
SearchPreferSelf int `yaml:"search_prefer_self"`
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
MaxConcurrentQueries int `yaml:"max_concurrent_queries"`
Worker worker.Config `yaml:"frontend_worker"`
Expand All @@ -25,6 +26,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.SearchQueryTimeout = 30 * time.Second
cfg.ExtraQueryDelay = 0
cfg.MaxConcurrentQueries = 5
cfg.SearchPreferSelf = 2
cfg.Worker = worker.Config{
MatchMaxConcurrency: true,
MaxConcurrentRequests: cfg.MaxConcurrentQueries,
Expand Down
28 changes: 21 additions & 7 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"
"go.uber.org/multierr"
"golang.org/x/sync/semaphore"

ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
Expand Down Expand Up @@ -66,6 +67,8 @@ type Querier struct {
store storage.Store
limits *overrides.Overrides

searchPreferSelf *semaphore.Weighted

subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}
Expand All @@ -90,8 +93,9 @@ func New(cfg Config, clientCfg ingester_client.Config, ring ring.ReadRing, store
factory,
metricIngesterClients,
log.Logger),
store: store,
limits: limits,
store: store,
limits: limits,
searchPreferSelf: semaphore.NewWeighted(int64(cfg.SearchPreferSelf)),
}

q.Service = services.NewBasicService(q.starting, q.running, q.stopping)
Expand Down Expand Up @@ -395,13 +399,23 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal

// SearchBlock searches the specified subset of the block for the passed tags.
func (q *Querier) SearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
// todo: if the querier is not currently doing anything it should prefer handling the request itself to
// offloading to the external endpoint
if len(q.cfg.SearchExternalEndpoints) != 0 {
endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))]
return searchExternalEndpoint(ctx, endpoint, req)
// if we have no external configuration always search in the querier
if len(q.cfg.SearchExternalEndpoints) == 0 {
return q.internalSearchBlock(ctx, req)
}

// if we have external configuration but there's an open slot locally then search in the querier
if q.searchPreferSelf.TryAcquire(1) {
defer q.searchPreferSelf.Release(1)
return q.internalSearchBlock(ctx, req)
}

// proxy externally!
endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))]
return searchExternalEndpoint(ctx, endpoint, req)
}

func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, errors.Wrap(err, "error extracting org id in Querier.BackendSearch")
Expand Down
74 changes: 74 additions & 0 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package querier

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
"github.com/weaveworks/common/user"
)

func TestQuerierUsesSearchExternalEndpoint(t *testing.T) {
numExternalRequests := atomic.NewInt32(0)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
numExternalRequests.Inc()
}))
defer srv.Close()

tests := []struct {
cfg Config
queriesToExecute int
externalExpected int32
}{
// SearchExternalEndpoints is respected
{
cfg: Config{
SearchExternalEndpoints: []string{srv.URL},
},
queriesToExecute: 3,
externalExpected: 3,
},
// No SearchExternalEndpoints causes the querier to service everything internally
{
cfg: Config{},
queriesToExecute: 3,
externalExpected: 0,
},
// SearchPreferSelf is respected. this test won't pass b/c SearchBlock fails instantly and so
// all 3 queries are executed locally and nothing is proxied to the external endpoint.
// we'd have to mock the storage.Store interface to get this to pass. it's a big interface.
// {
// cfg: Config{
// SearchExternalEndpoints: []string{srv.URL},
// SearchPreferSelf: 2,
// },
// queriesToExecute: 3,
// externalExpected: 1,
// },
}

ctx := user.InjectOrgID(context.Background(), "blerg")

for _, tc := range tests {
numExternalRequests.Store(0)

q, err := New(tc.cfg, client.Config{}, nil, nil, nil)
require.NoError(t, err)

for i := 0; i < tc.queriesToExecute; i++ {
// ignore error purposefully here. all queries will error, but we don't care
// numExternalRequests will tell us what we need to know
_, _ = q.SearchBlock(ctx, &tempopb.SearchBlockRequest{})
}

require.Equal(t, tc.externalExpected, numExternalRequests.Load())
}
}
2 changes: 1 addition & 1 deletion modules/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {

Parallelism int `yaml:"parallelism"`
MatchMaxConcurrency bool `yaml:"match_max_concurrent"`
MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine.
MaxConcurrentRequests int `yaml:"-"`

QuerierID string `yaml:"id"`

Expand Down
3 changes: 3 additions & 0 deletions vendor/golang.org/x/sync/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/golang.org/x/sync/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/golang.org/x/sync/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/sync/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/semaphore/semaphore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,9 @@ golang.org/x/oauth2/google/internal/externalaccount
golang.org/x/oauth2/internal
golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
## explicit
golang.org/x/sync/semaphore
# golang.org/x/sys v0.0.0-20211124211545-fe61309f8881
## explicit; go 1.17
golang.org/x/sys/cpu
Expand Down

0 comments on commit 642f15f

Please sign in to comment.