Skip to content

Commit

Permalink
Support lookback_delta on query frontend (#5854)
Browse files Browse the repository at this point in the history
* support lookback_delta on query frontend and add it as part of the cache key

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Nov 3, 2022
1 parent 50b4156 commit 6e95c3e
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.
- [#5854](https://github.com/thanos-io/thanos/pull/5854) Query Frontend: Handles `lookback_delta` param in query frontend.

### Added

Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Re
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s:%d", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey, tr.LookbackDelta)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
Expand Down
21 changes: 16 additions & 5 deletions pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 60 * seconds,
},
expected: "fe::up:60000:0:2:-",
expected: "fe::up:60000:0:2:-:0",
},
{
name: "10s step",
Expand All @@ -48,7 +48,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
},
expected: "fe::up:10000:0:2:-",
expected: "fe::up:10000:0:2:-:0",
},
{
name: "1m downsampling resolution",
Expand All @@ -58,7 +58,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
},
expected: "fe::up:10000:0:2:-",
expected: "fe::up:10000:0:2:-:0",
},
{
name: "5m downsampling resolution, different cache key",
Expand All @@ -68,7 +68,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
},
expected: "fe::up:10000:0:1:-",
expected: "fe::up:10000:0:1:-:0",
},
{
name: "1h downsampling resolution, different cache key",
Expand All @@ -78,7 +78,18 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: hour,
},
expected: "fe::up:10000:0:0:-",
expected: "fe::up:10000:0:0:-:0",
},
{
name: "1h downsampling resolution with lookback delta",
req: &ThanosQueryRangeRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: hour,
LookbackDelta: 1000,
},
expected: "fe::up:10000:0:0:-:1000",
},
{
name: "label names, no matcher",
Expand Down
9 changes: 9 additions & 0 deletions pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (c queryInstantCodec) DecodeRequest(_ context.Context, r *http.Request, for
return nil, err
}

result.LookbackDelta, err = parseLookbackDelta(r.Form, queryv1.LookbackDeltaParam)
if err != nil {
return nil, err
}

result.Query = r.FormValue("query")
result.Path = r.URL.Path

Expand Down Expand Up @@ -161,6 +166,10 @@ func (c queryInstantCodec) EncodeRequest(ctx context.Context, r queryrange.Reque
params[queryv1.ShardInfoParam] = []string{data}
}

if thanosReq.LookbackDelta > 0 {
params[queryv1.LookbackDeltaParam] = []string{encodeDurationMillis(thanosReq.LookbackDelta)}
}

req, err := http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
Expand Down
11 changes: 11 additions & 0 deletions pkg/queryfrontend/queryinstant_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func TestQueryInstantCodec_DecodeRequest(t *testing.T) {
},
},
},
{
name: "lookback_delta",
url: "/api/v1/query?lookback_delta=1000",
partialResponse: false,
expectedRequest: &ThanosQueryInstantRequest{
Path: "/api/v1/query",
Dedup: true,
LookbackDelta: 1000000,
StoreMatchers: [][]*labels.Matcher{},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, tc.url, nil)
Expand Down
22 changes: 18 additions & 4 deletions pkg/queryfrontend/queryrange_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, forwa
return nil, err
}

result.LookbackDelta, err = parseLookbackDelta(r.Form, queryv1.LookbackDeltaParam)
if err != nil {
return nil, err
}

result.Query = r.FormValue("query")
result.Path = r.URL.Path

Expand Down Expand Up @@ -179,6 +184,10 @@ func (c queryRangeCodec) EncodeRequest(ctx context.Context, r queryrange.Request
params[queryv1.ShardInfoParam] = []string{data}
}

if thanosReq.LookbackDelta > 0 {
params[queryv1.LookbackDeltaParam] = []string{encodeDurationMillis(thanosReq.LookbackDelta)}
}

req, err := http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
Expand Down Expand Up @@ -260,13 +269,18 @@ func parseMatchersParam(ss url.Values, matcherParam string) ([][]*labels.Matcher
return matchers, nil
}

func parseShardInfo(ss url.Values, key string) (*storepb.ShardInfo, error) {
func parseLookbackDelta(ss url.Values, key string) (int64, error) {
data, ok := ss[key]
if !ok {
return nil, nil
if !ok || len(data) == 0 {
return 0, nil
}

if len(data) == 0 {
return parseDurationMillis(data[0])
}

func parseShardInfo(ss url.Values, key string) (*storepb.ShardInfo, error) {
data, ok := ss[key]
if !ok || len(data) == 0 {
return nil, nil
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/queryfrontend/queryrange_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ func TestQueryRangeCodec_DecodeRequest(t *testing.T) {
},
},
},
{
name: "lookback_delta",
url: `/api/v1/query_range?start=123&end=456&step=1&lookback_delta=1000`,
partialResponse: false,
expectedRequest: &ThanosQueryRangeRequest{
Path: "/api/v1/query_range",
Start: 123000,
End: 456000,
Step: 1000,
Dedup: true,
LookbackDelta: 1000000,
StoreMatchers: [][]*labels.Matcher{},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, tc.url, nil)
Expand Down Expand Up @@ -269,6 +283,21 @@ func TestQueryRangeCodec_EncodeRequest(t *testing.T) {
r.FormValue(queryv1.MaxSourceResolutionParam) == "3600"
},
},
{
name: "Lookback delta",
req: &ThanosQueryRangeRequest{
Start: 123000,
End: 456000,
Step: 1000,
LookbackDelta: 1000,
},
checkFunc: func(r *http.Request) bool {
return r.FormValue("start") == "123" &&
r.FormValue("end") == "456" &&
r.FormValue("step") == "1" &&
r.FormValue(queryv1.LookbackDeltaParam) == "1"
},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
Expand Down
2 changes: 2 additions & 0 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ThanosQueryRangeRequest struct {
Headers []*RequestHeader
Stats string
ShardInfo *storepb.ShardInfo
LookbackDelta int64
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand Down Expand Up @@ -152,6 +153,7 @@ type ThanosQueryInstantRequest struct {
Headers []*RequestHeader
Stats string
ShardInfo *storepb.ShardInfo
LookbackDelta int64 // in milliseconds.
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand Down

0 comments on commit 6e95c3e

Please sign in to comment.