Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add store.read-timeout parameter to avoid partial response failure when one of stores timed out #895

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmark/cmd/thanosbench/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func createPrometheus(opts *opts, name string, bucket string) *appsv1.StatefulSe
"--storage.tsdb.max-block-duration=2h",
"--storage.tsdb.retention=2h",
"--query.timeout=10m",
"--store.read-timeout=10m",
},
VolumeMounts: []v1.VolumeMount{
{
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

storeReadTimeout := modelDuration(cmd.Flag("store.read-timeout", "Maximum time to read a response from a store. If a request to one of the stores has timed out and store.read-timeout < query.timeout then a partial response will be returned. If store.read-timeout >= query.timeout and one of the stores has timed out then the client will get no data, and the timeout error will be returned.").
Default("2m"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true)
if err != nil {
Expand Down Expand Up @@ -142,6 +145,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
time.Duration(*storeReadTimeout),
*replicaLabel,
peer,
selectorLset,
Expand Down Expand Up @@ -258,6 +262,7 @@ func runQuery(
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
storeReadTimeout time.Duration,
replicaLabel string,
peer cluster.Peer,
selectorLset labels.Labels,
Expand Down Expand Up @@ -311,7 +316,7 @@ func runQuery(
},
dialOpts,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout, storeReadTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
8 changes: 8 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,5 +275,13 @@ Flags:
specified duration then a Store will be ignored
and partial data will be returned if it's
enabled. 0 disables timeout.
--store.read-timeout=2m Maximum time to read a response from a store.
If a request to one of the stores has timed out
and store.read-timeout < query.timeout then a
partial response will be returned. If
store.read-timeout >= query.timeout and one of
the stores has timed out then the client will
get no data, and the timeout error will be
returned.

```
1 change: 0 additions & 1 deletion pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math"
"math/rand"
"testing"

"time"

"github.com/fortytw2/leaktest"
Expand Down
58 changes: 46 additions & 12 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Client interface {
TimeRange() (mint int64, maxt int64)

String() string

// Addr returns address of a Client.
Addr() string
}
Expand All @@ -44,7 +45,11 @@ type ProxyStore struct {
component component.StoreAPI
selectorLabels labels.Labels

// responseTimeout is a timeout for any GRPC operation during series query
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing trailing period in comment, same below

responseTimeout time.Duration

// readTimeout is a timeout for entire store request
readTimeout time.Duration
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
Expand All @@ -55,6 +60,7 @@ func NewProxyStore(
component component.StoreAPI,
selectorLabels labels.Labels,
responseTimeout time.Duration,
readTimeout time.Duration,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -66,6 +72,7 @@ func NewProxyStore(
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
readTimeout: readTimeout,
}
return s
}
Expand Down Expand Up @@ -119,6 +126,8 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb
return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) }
}

// send writes response to sender channel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use full sentence comments. They are for humans (:

// or just returns if sender context was timed out
func (s ctxRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
Expand Down Expand Up @@ -147,6 +156,9 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)

storeReadCtx, storeReadCancelFunc := s.contextWithReadTimeout(gctx)
defer storeReadCancelFunc()

g.Go(func() error {
var (
seriesSet []storepb.SeriesSet
Expand Down Expand Up @@ -178,7 +190,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

// This is used to cancel this stream when one operations takes too long.
seriesCtx, closeSeries := context.WithCancel(gctx)
seriesCtx, closeSeries := context.WithCancel(storeReadCtx)
defer closeSeries()

sc, err := st.Series(seriesCtx, r)
Expand Down Expand Up @@ -295,19 +307,12 @@ func startStreamSeriesSet(
}

if ctx.Err() != nil {
s.writeWarningOrErrorResponse(partialResponse, errors.Wrapf(ctx.Err(), "receive series from %s", s.name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice 👍

return
}

if err != nil {
wrapErr := errors.Wrapf(err, "receive series from %s", s.name)
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr))
return
}

s.errMtx.Lock()
s.err = wrapErr
s.errMtx.Unlock()
s.writeWarningOrErrorResponse(partialResponse, errors.Wrapf(err, "receive series from %s", s.name))
return
}

Expand Down Expand Up @@ -356,6 +361,24 @@ func (s *streamSeriesSet) Next() (ok bool) {
}
}

// writeWarningOrErrorResponse sends warning if partial response enabled or sets error otherwise
func (s *streamSeriesSet) writeWarningOrErrorResponse(partialResponse bool, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this return nothing since we aren't using the return values anywhere and we aren't even setting them 😄

if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return
}

s.setError(err)
return
}

// setError sets error (thread-safe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

full sentence comments

func (s *streamSeriesSet) setError(err error) {
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
if s.currSeries == nil {
return nil, nil
Expand Down Expand Up @@ -411,11 +434,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
g, gctx = errgroup.WithContext(ctx)
)

storeReadCtx, storeReadCancelFunc := s.contextWithReadTimeout(gctx)
defer storeReadCancelFunc()

for _, st := range s.stores() {
store := st
g.Go(func() error {
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{
Label: r.Label,
resp, err := store.LabelValues(storeReadCtx, &storepb.LabelValuesRequest{
Label: r.Label,
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
Expand Down Expand Up @@ -448,3 +474,11 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
Warnings: warnings,
}, nil
}

func (s *ProxyStore) contextWithReadTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
if s.readTimeout > 0 {
return context.WithTimeout(ctx, s.readTimeout)
}

return ctx, func() {}
}
96 changes: 88 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestProxyStore_Info(t *testing.T) {
q := NewProxyStore(nil,
func() []Client { return nil },
component.Query,
nil, 0*time.Second,
nil, 0*time.Second, 0*time.Second,
)

resp, err := q.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestProxyStore_Series(t *testing.T) {
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
0*time.Second,
0*time.Second, 0*time.Second,
)

s := newStoreSeriesServer(context.Background())
Expand Down Expand Up @@ -452,15 +452,15 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
expectedWarningsLen int
}{
{
title: "partial response disabled one thanos query is slow to respond",
title: "partial response disabled one thanos query is slow to respond, but responding",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 10 * time.Second,
RespDuration: 2100 * time.Millisecond,
},
labels: []storepb.Label{{Name: "ext", Value: "1"}},
minTime: 1,
Expand All @@ -484,7 +484,40 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseDisabled: true,
},
expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"),
expectedErr: errors.New("test: failed to receive any data in 2s from test: context deadline exceeded"),
},
{
title: "partial response disabled one thanos query is slow to respond and doesn't return any data until store.read-timeout occurs",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 7 * time.Second,
},
labels: []storepb.Label{{Name: "ext", Value: "1"}},
minTime: 1,
maxTime: 300,
},
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
},
labels: []storepb.Label{{Name: "ext", Value: "1"}},
minTime: 1,
maxTime: 300,
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseDisabled: true,
},
expectedErr: errors.New("test: failed to receive any data in 2s from test: context deadline exceeded"),
},
{
title: "partial response enabled one thanos query is slow to respond",
Expand All @@ -506,7 +539,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 10 * time.Second,
RespDuration: 7 * time.Second,
},
labels: []storepb.Label{{Name: "ext", Value: "1"}},
minTime: 1,
Expand All @@ -524,15 +557,16 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
samples: []sample{{1, 1}, {2, 2}, {3, 3}},
},
},
expectedWarningsLen: 2,
expectedWarningsLen: 3,
},
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
4*time.Second,
2*time.Second,
5*time.Second,
)

s := newStoreSeriesServer(context.Background())
Expand Down Expand Up @@ -575,6 +609,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
component.Query,
nil,
0*time.Second,
0*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -634,6 +669,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
component.Query,
tlabels.FromStrings("fed", "a"),
0*time.Second,
0*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -672,6 +708,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
component.Query,
nil,
0*time.Second,
0*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -794,6 +831,49 @@ func TestStoreMatches(t *testing.T) {
}
}

// TestProxyStore_StoreReadTimeout checks that
// store.read-timeout (context) must close before query-timeout
func TestProxyStore_StoreReadTimeout(t *testing.T) {
enable := os.Getenv("THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS")
if enable == "" {
t.Skip("enable THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS to run store-read-timeout tests")
}

queryTimeout := 5 * time.Millisecond
storeReadTimeout := 1 * time.Millisecond

q := NewProxyStore(
nil,
func() []Client {
return []Client{
&testClient{StoreClient: &mockedStoreAPI{}, minTime: 1, maxTime: 300},
}
},
component.Query,
nil,
0*time.Second,
storeReadTimeout,
)

queryCtx, _ := context.WithTimeout(context.Background(), queryTimeout)
s := newStoreSeriesServer(queryCtx)

storeQueryDone := make(chan struct{})

go func() {
testutil.Ok(t, q.Series(&storepb.SeriesRequest{Matchers: nil}, s))
storeQueryDone <- struct{}{}
}()

select {
case <-storeQueryDone:
// Do nothing - storeQuery expected to finish first
case <-queryCtx.Done():
// Something went wrong - global query context expected to close last
testutil.Ok(t, errors.New("query finished first, but expected store query will finish first (readTimeout is smaller that query timeout)"))
}
}

// storeSeriesServer is test gRPC storeAPI series server.
type storeSeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
Expand Down