Skip to content

Commit

Permalink
Frontend Search Caching (#3225)
Browse files Browse the repository at this point in the history
* add query-frontend cache

Signed-off-by: Joe Elliott <[email protected]>

* docs and changelog

Signed-off-by: Joe Elliott <[email protected]>

* removed load and added test

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Dec 13, 2023
1 parent f7fcfac commit dd0992a
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 118 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)
* [ENHANCEMENT] Improve TraceQL regex performance in certain queries. [#3139](https://github.com/grafana/tempo/pull/3139) (@joe-elliott)
* [ENHANCEMENT] Improve TraceQL performance in complex queries. [#3113](https://github.com/grafana/tempo/pull/3113) (@joe-elliott)
* [ENHANCEMENT] Added a `frontend-search` cache role for job search caching. [#3225](https://github.com/grafana/tempo/pull/3225) (@joe-elliott)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
* [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno)
* [ENHANCEMENT] Update opentelemetry-collector-contrib dependency to the latest version, v0.89.0 [#3148](https://github.com/grafana/tempo/pull/3148) (@gebn)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.frontend = v1

// create query frontend
queryFrontend, err := frontend.New(t.cfg.Frontend, cortexTripper, t.Overrides, t.store, t.cfg.HTTPAPIPrefix, log.Logger, prometheus.DefaultRegisterer)
queryFrontend, err := frontend.New(t.cfg.Frontend, cortexTripper, t.Overrides, t.store, t.cacheProvider, t.cfg.HTTPAPIPrefix, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ cache:
# parquet-footer - Parquet footer values. Useful for search and trace by id lookup.
# parquet-column-idx - Parquet column index values. Useful for search and trace by id lookup.
# parquet-offset-idx - Parquet offset index values. Useful for search and trace by id lookup.
# frontend-search - Frontend search job results.
- roles:
- <role1>
Expand Down
1 change: 1 addition & 0 deletions modules/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func allRoles() map[cache.Role]struct{} {
cache.RoleParquetColumnIdx,
cache.RoleParquetOffsetIdx,
cache.RoleTraceIDIdx,
cache.RoleFrontendSearch,
}

roles := map[cache.Role]struct{}{}
Expand Down
69 changes: 69 additions & 0 deletions modules/frontend/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package frontend

import (
"bytes"
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/cache"
)

type frontendCache struct {
c cache.Cache
}

func newFrontendCache(cacheProvider cache.Provider, role cache.Role, logger log.Logger) *frontendCache {
var c cache.Cache
if cacheProvider != nil {
c = cacheProvider.CacheFor(role)
}

level.Info(logger).Log("msg", "init frontend cache", "role", role, "enabled", c != nil)

return &frontendCache{
c: c,
}
}

// store stores the response body in the cache. the caller assumes the responsibility of closing the response body
func (c *frontendCache) store(ctx context.Context, key string, buffer []byte) {
if c.c == nil {
return
}

if key == "" {
return
}

if len(buffer) == 0 {
return
}

c.c.Store(ctx, []string{key}, [][]byte{buffer})
}

// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body.
func (c *frontendCache) fetch(key string, pb proto.Message) bool {
if c.c == nil {
return false
}

if len(key) == 0 {
return false
}

_, bufs, _ := c.c.Fetch(context.Background(), []string{key})
if len(bufs) != 1 {
return false
}

err := (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(bufs[0]), pb)
if err != nil {
return false
}

return true
}
60 changes: 60 additions & 0 deletions modules/frontend/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package frontend

import (
"bytes"
"context"
"io"
"net/http/httptest"
"testing"

"github.com/go-kit/log"
"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestNilProvider(t *testing.T) {
testKey := "key"

c := newFrontendCache(nil, cache.RoleBloom, log.NewNopLogger())
require.NotNil(t, c)

rec := httptest.NewRecorder()

bodyBuffer, err := io.ReadAll(rec.Result().Body)
require.NoError(t, err)

c.store(context.Background(), testKey, bodyBuffer)
found := c.fetch(testKey, &tempopb.SearchResponse{})

require.False(t, found)
}

func TestCacheCaches(t *testing.T) {
expected := &tempopb.SearchTagsResponse{
TagNames: []string{"foo", "bar"},
}

// marshal mesage to bytes
buf := bytes.NewBuffer([]byte{})
err := (&jsonpb.Marshaler{}).Marshal(buf, expected)
require.NoError(t, err)

testKey := "key"
testData := buf.Bytes()

p := test.NewMockProvider()
c := newFrontendCache(p, cache.RoleBloom, log.NewNopLogger())
require.NotNil(t, c)

// create response
c.store(context.Background(), testKey, testData)

actual := &tempopb.SearchTagsResponse{}
found := c.fetch(testKey, actual)

require.True(t, found)
require.Equal(t, expected, actual)
}
21 changes: 14 additions & 7 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
)
Expand All @@ -27,12 +28,13 @@ type streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.Streami

type QueryFrontend struct {
TraceByIDHandler, SearchHandler, SearchTagsHandler, SpanMetricsSummaryHandler, SearchWSHandler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
logger log.Logger
}

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempodb.Reader, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempodb.Reader, cacheProvider cache.Provider, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.TraceByID.QueryShards < minQueryShards || cfg.TraceByID.QueryShards > maxQueryShards {
Expand All @@ -53,9 +55,12 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo

retryWare := newRetryWare(cfg.MaxRetries, registerer)

// tracebyid middleware
// cache
searchCache := newFrontendCache(cacheProvider, cache.RoleFrontendSearch, logger)

// middleware
traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, o, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, reader, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, reader, searchCache, logger), retryWare)
searchTagsMiddleware := MergeMiddlewares(newSearchTagsMiddleware(), retryWare)

spanMetricsMiddleware := MergeMiddlewares(newSpanMetricsMiddleware(), retryWare)
Expand All @@ -70,8 +75,9 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
SearchHandler: newHandler(search, searchSLOPostHook(cfg.Search.SLO), searchSLOPreHook, logger),
SearchTagsHandler: newHandler(searchTags, nil, nil, logger),
SpanMetricsSummaryHandler: newHandler(metrics, nil, nil, logger),
SearchWSHandler: newSearchStreamingWSHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
streamingSearch: newSearchStreamingGRPCHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
SearchWSHandler: newSearchStreamingWSHandler(cfg, o, retryWare.Wrap(next), reader, searchCache, apiPrefix, logger),
cacheProvider: cacheProvider,
streamingSearch: newSearchStreamingGRPCHandler(cfg, o, retryWare.Wrap(next), reader, searchCache, apiPrefix, logger),
logger: logger,
}, nil
}
Expand Down Expand Up @@ -170,9 +176,10 @@ func newTraceByIDMiddleware(cfg Config, o overrides.Interface, logger log.Logger
}

// newSearchMiddleware creates a new frontend middleware to handle search and search tags requests.
func newSearchMiddleware(cfg Config, o overrides.Interface, reader tempodb.Reader, logger log.Logger) Middleware {
func newSearchMiddleware(cfg Config, o overrides.Interface, reader tempodb.Reader, c *frontendCache, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
searchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, newSearchProgress, logger))
ss := newSearchSharder(reader, o, cfg.Search.Sharder, newSearchProgress, c, logger)
searchRT := NewRoundTripper(next, ss)

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// backend search queries require sharding, so we pass through a special roundtripper
Expand Down
12 changes: 6 additions & 6 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
},
SLO: testSLOcfg,
},
}, next, nil, nil, "", log.NewNopLogger(), nil)
}, next, nil, nil, nil, "", log.NewNopLogger(), nil)
require.NoError(t, err)

req := httptest.NewRequest("GET", "/", nil)
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)")

assert.Nil(t, f)
Expand All @@ -86,7 +86,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -102,7 +102,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
assert.Nil(t, f)

Expand All @@ -118,7 +118,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
assert.Nil(t, f)

Expand All @@ -136,7 +136,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)
}
10 changes: 7 additions & 3 deletions modules/frontend/search_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ func (p *diffSearchProgress) finalResult() *shardedSearchResults {
}

// newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler
func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, apiPrefix string, logger log.Logger) streamingSearchHandler {
func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, searchCache *frontendCache, apiPrefix string, logger log.Logger) streamingSearchHandler {
searcher := streamingSearcher{
logger: logger,
downstream: downstream,
reader: reader,
postSLOHook: searchSLOPostHook(cfg.Search.SLO),
o: o,
searchCache: searchCache,
cfg: &cfg,
}

Expand All @@ -129,13 +130,14 @@ func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream
}
}

func newSearchStreamingWSHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, apiPrefix string, logger log.Logger) http.Handler {
func newSearchStreamingWSHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, searchCache *frontendCache, apiPrefix string, logger log.Logger) http.Handler {
searcher := streamingSearcher{
logger: logger,
downstream: downstream,
reader: reader,
postSLOHook: searchSLOPostHook(cfg.Search.SLO),
o: o,
searchCache: searchCache,
cfg: &cfg,
}

Expand Down Expand Up @@ -231,6 +233,7 @@ type streamingSearcher struct {
reader tempodb.Reader
postSLOHook handlerPostHook
o overrides.Interface
searchCache *frontendCache
cfg *Config
}

Expand All @@ -254,7 +257,8 @@ func (s *streamingSearcher) handle(r *http.Request, forwardResults func(*tempopb
return p
}
// build roundtripper
rt := NewRoundTripper(s.downstream, newSearchSharder(s.reader, s.o, s.cfg.Search.Sharder, fn, s.logger))
ss := newSearchSharder(s.reader, s.o, s.cfg.Search.Sharder, fn, s.searchCache, s.logger)
rt := NewRoundTripper(s.downstream, ss)

type roundTripResult struct {
resp *http.Response
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func testHandler(t *testing.T, next http.RoundTripper) streamingSearchHandler {
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000000"),
},
},
}, "", log.NewNopLogger())
}, &frontendCache{}, "", log.NewNopLogger())

return handler
}
Expand Down
Loading

0 comments on commit dd0992a

Please sign in to comment.