Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tempo Serverless: Queriers Prefer Self #1307

Merged
merged 7 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno)
* [ENHANCEMENT] Improve serverless handler error messages [#1305](https://github.com/grafana/tempo/pull/1305) (@joe-elliott)
* [ENHANCEMENT] Added a configuration option `search_prefer_self` to allow the queriers to do some work while also leveraging serverless in search. [#1307](https://github.com/grafana/tempo/pull/1307) (@joe-elliott)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)
* [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno)
Expand Down
12 changes: 12 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ querier:
# The default value of "" disables this feature.
[search_external_endpoints: <list of strings> | default = <empty list>]

# If search_external_endpoints is set then the querier will primarily act as a proxy for whatever serverless backend
# you have configured. This setting allows the operator to have the querier prefer itself for a configurable
# number of queries. In the default case of 2 the querier will process up to 2 search requests before starting
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
# to reach out to search_external_endpoints.
[search_prefer_self: <int> | default = 2 ]

# The query frontend turns both trace by id (/api/traces/<id>) and search (/api/search?<params>) requests
# into subqueries that are then pulled and serviced by the queriers.
# This value controls the overall number of simultaneous subqueries that the querier will service at once. It does
# not distinguish between the types of queries.
[max_concurrent_queries: <int> | default = 5]

# config of the worker that connects to the query frontend
frontend_worker:

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
go.uber.org/goleak v1.1.12
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
google.golang.org/api v0.61.0
google.golang.org/grpc v1.42.0
Expand Down
2 changes: 2 additions & 0 deletions modules/querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
TraceLookupQueryTimeout time.Duration `yaml:"query_timeout"`
SearchQueryTimeout time.Duration `yaml:"search_query_timeout"`
SearchExternalEndpoints []string `yaml:"search_external_endpoints"`
SearchPreferSelf int `yaml:"search_prefer_self"`
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
MaxConcurrentQueries int `yaml:"max_concurrent_queries"`
Worker worker.Config `yaml:"frontend_worker"`
Expand All @@ -25,6 +26,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.SearchQueryTimeout = 30 * time.Second
cfg.ExtraQueryDelay = 0
cfg.MaxConcurrentQueries = 5
cfg.SearchPreferSelf = 2
cfg.Worker = worker.Config{
MatchMaxConcurrency: true,
MaxConcurrentRequests: cfg.MaxConcurrentQueries,
Expand Down
28 changes: 21 additions & 7 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/user"
"go.uber.org/multierr"
"golang.org/x/sync/semaphore"

ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
Expand Down Expand Up @@ -66,6 +67,8 @@ type Querier struct {
store storage.Store
limits *overrides.Overrides

searchPreferSelf *semaphore.Weighted

subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}
Expand All @@ -90,8 +93,9 @@ func New(cfg Config, clientCfg ingester_client.Config, ring ring.ReadRing, store
factory,
metricIngesterClients,
log.Logger),
store: store,
limits: limits,
store: store,
limits: limits,
searchPreferSelf: semaphore.NewWeighted(int64(cfg.SearchPreferSelf)),
}

q.Service = services.NewBasicService(q.starting, q.running, q.stopping)
Expand Down Expand Up @@ -391,13 +395,23 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal

// SearchBlock searches the specified subset of the block for the passed tags.
func (q *Querier) SearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
// todo: if the querier is not currently doing anything it should prefer handling the request itself to
// offloading to the external endpoint
if len(q.cfg.SearchExternalEndpoints) != 0 {
endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))]
return searchExternalEndpoint(ctx, endpoint, req)
// if we have no external configuration always search in the querier
if len(q.cfg.SearchExternalEndpoints) == 0 {
return q.internalSearchBlock(ctx, req)
}

// if we have external configuration but there's an open slot locally then search in the querier
if q.searchPreferSelf.TryAcquire(1) {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
defer q.searchPreferSelf.Release(1)
return q.internalSearchBlock(ctx, req)
}

// proxy externally!
endpoint := q.cfg.SearchExternalEndpoints[rand.Intn(len(q.cfg.SearchExternalEndpoints))]
return searchExternalEndpoint(ctx, endpoint, req)
}

func (q *Querier) internalSearchBlock(ctx context.Context, req *tempopb.SearchBlockRequest) (*tempopb.SearchResponse, error) {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, errors.Wrap(err, "error extracting org id in Querier.BackendSearch")
Expand Down
74 changes: 74 additions & 0 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package querier

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
"github.com/weaveworks/common/user"
)

func TestQuerierUsesSearchExternalEndpoint(t *testing.T) {
numExternalRequests := atomic.NewInt32(0)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
numExternalRequests.Inc()
}))
defer srv.Close()

tests := []struct {
cfg Config
queriesToExecute int
externalExpected int32
}{
// SearchExternalEndpoints is respected
{
cfg: Config{
SearchExternalEndpoints: []string{srv.URL},
},
queriesToExecute: 3,
externalExpected: 3,
},
// No SearchExternalEndpoints causes the querier to service everything internally
{
cfg: Config{},
queriesToExecute: 3,
externalExpected: 0,
},
// SearchPreferSelf is respected. this test won't pass b/c SearchBlock fails instantly and so
// all 3 queries are executed locally and nothing is proxied to the external endpoint.
// we'd have to mock the storage.Store interface to get this to pass. it's a big interface.
// {
// cfg: Config{
// SearchExternalEndpoints: []string{srv.URL},
// SearchPreferSelf: 2,
// },
// queriesToExecute: 3,
// externalExpected: 1,
// },
}

ctx := user.InjectOrgID(context.Background(), "blerg")

for _, tc := range tests {
numExternalRequests.Store(0)

q, err := New(tc.cfg, client.Config{}, nil, nil, nil)
require.NoError(t, err)

for i := 0; i < tc.queriesToExecute; i++ {
// ignore error purposefully here. all queries will error, but we don't care
// numExternalRequests will tell us what we need to know
_, _ = q.SearchBlock(ctx, &tempopb.SearchBlockRequest{})
}

require.Equal(t, tc.externalExpected, numExternalRequests.Load())
}
}
2 changes: 1 addition & 1 deletion modules/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {

Parallelism int `yaml:"parallelism"`
MatchMaxConcurrency bool `yaml:"match_max_concurrent"`
MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine.
MaxConcurrentRequests int `yaml:"-"`

QuerierID string `yaml:"id"`

Expand Down
3 changes: 3 additions & 0 deletions vendor/golang.org/x/sync/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/golang.org/x/sync/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/golang.org/x/sync/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/sync/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/semaphore/semaphore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,9 @@ golang.org/x/oauth2/google/internal/externalaccount
golang.org/x/oauth2/internal
golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
## explicit
golang.org/x/sync/semaphore
# golang.org/x/sys v0.0.0-20211124211545-fe61309f8881
## explicit; go 1.17
golang.org/x/sys/cpu
Expand Down