Skip to content

Commit

Permalink
Merge pull request #7286 from fpetkovski/instant-query-warns
Browse files Browse the repository at this point in the history
Propagate warnings from instant queries
  • Loading branch information
fpetkovski authored Apr 18, 2024
2 parents f7ba140 + fe0931d commit 6582c81
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b).
- [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: Fix block deduplication
- [#6706](https://github.com/thanos-io/thanos/pull/6706) Store: Series responses should always be sorted
- [#7286](https://github.com/thanos-io/thanos/pull/7286) Query: Propagate instant query warnings in distributed execution mode.

### Added

Expand Down
13 changes: 9 additions & 4 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
return &promql.Result{Err: err}
}
var (
result = make(promql.Vector, 0)
builder = labels.NewScratchBuilder(8)
result = make(promql.Vector, 0)
warnings annotations.Annotations
builder = labels.NewScratchBuilder(8)
)
for {
msg, err := qry.Recv()
Expand All @@ -269,7 +270,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}

if warn := msg.GetWarnings(); warn != "" {
return &promql.Result{Err: errors.New(warn)}
warnings.Add(errors.New(warn))
continue
}

ts := msg.GetTimeseries()
Expand All @@ -287,7 +289,10 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}
}

return &promql.Result{Value: result}
return &promql.Result{
Value: result,
Warnings: warnings,
}
}

request := &querypb.QueryRangeRequest{
Expand Down
45 changes: 37 additions & 8 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestRemoteEngine_Warnings(t *testing.T) {
client := NewClient(&queryWarnClient{}, "", nil)
client := NewClient(&warnClient{}, "", nil)
engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{
Timeout: 1 * time.Second,
})
Expand All @@ -41,11 +41,23 @@ func TestRemoteEngine_Warnings(t *testing.T) {
Start: time.Now(),
End: time.Now().Add(2 * time.Hour),
}, logicalplan.PlanOptions{})
qry, err := engine.NewRangeQuery(context.Background(), nil, plan.Root(), start, end, step)
testutil.Ok(t, err)
res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))

t.Run("instant_query", func(t *testing.T) {
qry, err := engine.NewInstantQuery(context.Background(), nil, plan.Root(), start)
testutil.Ok(t, err)
res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
})

t.Run("range_query", func(t *testing.T) {
qry, err := engine.NewRangeQuery(context.Background(), nil, plan.Root(), start, end, step)
testutil.Ok(t, err)
res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
testutil.Equals(t, 1, len(res.Warnings))
})

}

func TestRemoteEngine_LabelSets(t *testing.T) {
Expand Down Expand Up @@ -198,11 +210,15 @@ func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet {
}
}

type queryWarnClient struct {
type warnClient struct {
querypb.QueryClient
}

func (m queryWarnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) {
func (m warnClient) Query(ctx context.Context, in *querypb.QueryRequest, opts ...grpc.CallOption) (querypb.Query_QueryClient, error) {
return &queryWarnClient{}, nil
}

func (m warnClient) QueryRange(ctx context.Context, in *querypb.QueryRangeRequest, opts ...grpc.CallOption) (querypb.Query_QueryRangeClient, error) {
return &queryRangeWarnClient{}, nil
}

Expand All @@ -218,3 +234,16 @@ func (m *queryRangeWarnClient) Recv() (*querypb.QueryRangeResponse, error) {
m.warnSent = true
return querypb.NewQueryRangeWarningsResponse(errors.New("warning")), nil
}

type queryWarnClient struct {
querypb.Query_QueryClient
warnSent bool
}

func (m *queryWarnClient) Recv() (*querypb.QueryResponse, error) {
if m.warnSent {
return nil, io.EOF
}
m.warnSent = true
return querypb.NewQueryWarningsResponse(errors.New("warning")), nil
}

0 comments on commit 6582c81

Please sign in to comment.