Skip to content

Commit

Permalink
Use streaming responses
Browse files Browse the repository at this point in the history
Signed-off-by: fpetkovski <[email protected]>
  • Loading branch information
fpetkovski committed Mar 25, 2022
1 parent f785519 commit dfb993b
Show file tree
Hide file tree
Showing 7 changed files with 621 additions and 176 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func runQuery(
info.WithTargetsInfoFunc(),
)

grpcAPI := apiv1.NewGrpcAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
Expand Down
26 changes: 19 additions & 7 deletions docs/proposals-done/202203-grpc-query-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ We want to be able to distinguish between gRPC Store APIs and other Queriers in
This is useful for a few reasons:

* When Queriers register disjoint Store targets, they should be able to deduplicate series and then execute the query without concerns of duplicate data from other queriers. This new API would enable users to effectively partition by Querier, and avoid shipping raw series back from each disjointed Querier to the root Querier.
* If Queriers register conjoint Store targets, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers.
* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP.
* If Queriers register Store targets with overlapping series, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers.
* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP.
* When there is only one StoreAPI connected to Thanos Query which completely covers the requested range of the original user's query, then it is more optimal to execute the query directly in the store, instead of sending raw samples to the querier. This scenario is not unlikely given query-frontend's sharding capabilities.

### Pitfalls of the current solution

Expand All @@ -37,9 +38,9 @@ Thanos Query currently allows for `query` and `query_range` operations through H
We propose defining the following gRPC API:
```protobuf
service Query {
rpc Query(QueryRequest) returns (QueryResponse);
rpc Query(QueryRequest) returns (stream QueryResponse);
rpc QueryRange(QueryRangeRequest) returns (QueryRangeResponse);
rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse);
}
```

Expand All @@ -64,7 +65,13 @@ message QueryRequest {
}
message QueryResponse {
repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
message QueryRangeRequest {
Expand All @@ -88,9 +95,14 @@ message QueryRangeRequest {
}
message QueryRangeResponse {
repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
}
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;
/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
```

The `Query` Service will be implemented by the gRPC server which is started via the `thanos query` command.
Expand Down
69 changes: 39 additions & 30 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
"google.golang.org/grpc"
)

type GrpcAPI struct {
type GRPCAPI struct {
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGrpcAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GrpcAPI {
return &GrpcAPI{
func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
return &GRPCAPI{
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
Expand All @@ -37,7 +37,8 @@ func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) {
}
}

func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest) (*querypb.QueryResponse, error) {
func (grpcAPI *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error {
ctx := context.Background()
var ts time.Time
if request.TimeSeconds == 0 {
ts = grpcAPI.now()
Expand All @@ -59,7 +60,7 @@ func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return nil, err
return err
}

qe := grpcAPI.queryEngine(request.MaxResolutionSeconds)
Expand All @@ -74,36 +75,41 @@ func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
return nil, err
return err
}

result := qry.Exec(ctx)
if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings)); err != nil {
return nil
}

switch vector := result.Value.(type) {
case promql.Scalar:
return &querypb.QueryResponse{
Timeseries: []prompb.TimeSeries{{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}},
}, nil
case promql.Vector:
response := &querypb.QueryResponse{
Timeseries: make([]prompb.TimeSeries, 0, len(vector)),
series := &prompb.TimeSeries{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}

if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
case promql.Vector:
for _, sample := range vector {
response.Timeseries = append(response.Timeseries, prompb.TimeSeries{
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
})
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
}

return response, nil
return nil
}

return nil, nil
return nil
}

func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRangeRequest) (*querypb.QueryRangeResponse, error) {
func (grpcAPI *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error {
ctx := context.Background()
if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds))
Expand All @@ -117,7 +123,7 @@ func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRa

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return nil, err
return err
}

qe := grpcAPI.queryEngine(request.MaxResolutionSeconds)
Expand All @@ -137,25 +143,28 @@ func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRa

qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval)
if err != nil {
return nil, err
return err
}

result := qry.Exec(ctx)
if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings)); err != nil {
return err
}

switch matrix := result.Value.(type) {
case promql.Matrix:
response := &querypb.QueryRangeResponse{
Timeseries: make([]prompb.TimeSeries, len(matrix)),
}

for i, series := range matrix {
response.Timeseries[i] = prompb.TimeSeries{
for _, series := range matrix {
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
}
}

return response, nil
return nil
}

return nil, nil
return nil
}
Loading

0 comments on commit dfb993b

Please sign in to comment.