-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
query: add store.read-timeout parameter to avoid partial response failure when one of stores timed out #895
Changes from all commits
dfadec5
3a5701e
fc4460e
81d7d3d
5a36f3f
5906c73
0e8adab
0bdcf03
86a44da
07b419b
8356b80
0224bf3
7f75399
8ffee1b
9be840e
4de0dc0
afe009b
e5f2494
a1ef762
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,9 +61,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string | |
webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() | ||
webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String() | ||
|
||
queryTimeout := modelDuration(cmd.Flag("query.timeout", "Maximum time to process query by query node."). | ||
Default("2m")) | ||
|
||
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node."). | ||
Default("20").Int() | ||
|
||
|
@@ -97,6 +94,12 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string | |
enablePartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified."). | ||
Default("true").Bool() | ||
|
||
promqlTimeout := modelDuration(cmd.Flag("promql.timeout", "Maximum time to execute PromQL in query node."). | ||
Default("2m")) | ||
|
||
queryTimeout := modelDuration(cmd.Flag("query.timeout", "Maximum time to process request by query node. If a request to one of the stores has timed out and query.timeout < promql.timeout then a partial response will be returned. If query.timeout >= promql.timeout then only timeout error will be returned."). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is quite confusing, I am having hard time to understand on what things we timeout here |
||
Default("2m")) | ||
|
||
defaultEvaluationInterval := modelDuration(cmd.Flag("query.default-evaluation-interval", "Set default evaluation interval for sub queries.").Default("1m")) | ||
|
||
storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms")) | ||
|
@@ -150,8 +153,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string | |
*webExternalPrefix, | ||
*webPrefixHeaderName, | ||
*maxConcurrentQueries, | ||
time.Duration(*queryTimeout), | ||
time.Duration(*promqlTimeout), | ||
time.Duration(*storeResponseTimeout), | ||
time.Duration(*queryTimeout), | ||
*replicaLabel, | ||
peer, | ||
selectorLset, | ||
|
@@ -268,8 +272,9 @@ func runQuery( | |
webExternalPrefix string, | ||
webPrefixHeaderName string, | ||
maxConcurrentQueries int, | ||
queryTimeout time.Duration, | ||
promqlTimeout time.Duration, | ||
storeResponseTimeout time.Duration, | ||
queryTimeout time.Duration, | ||
replicaLabel string, | ||
peer cluster.Peer, | ||
selectorLset labels.Labels, | ||
|
@@ -327,7 +332,7 @@ func runQuery( | |
dialOpts, | ||
unhealthyStoreTimeout, | ||
) | ||
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) | ||
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout, queryTimeout) | ||
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) | ||
engine = promql.NewEngine( | ||
promql.EngineOpts{ | ||
|
@@ -336,7 +341,7 @@ func runQuery( | |
MaxConcurrent: maxConcurrentQueries, | ||
// TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703 | ||
MaxSamples: math.MaxInt32, | ||
Timeout: queryTimeout, | ||
Timeout: promqlTimeout, | ||
}, | ||
) | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ import ( | |
"math" | ||
"math/rand" | ||
"testing" | ||
|
||
"time" | ||
|
||
"github.com/fortytw2/leaktest" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ type Client interface { | |
TimeRange() (mint int64, maxt int64) | ||
|
||
String() string | ||
|
||
// Addr returns address of a Client. | ||
Addr() string | ||
} | ||
|
@@ -44,7 +45,11 @@ type ProxyStore struct { | |
component component.StoreAPI | ||
selectorLabels labels.Labels | ||
|
||
// responseTimeout is a timeout for any GRPC operation during series query | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing trailing period in comment, same below |
||
responseTimeout time.Duration | ||
|
||
// queryTimeout is a timeout for entire request | ||
queryTimeout time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's kill it IMO |
||
} | ||
|
||
// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. | ||
|
@@ -55,6 +60,7 @@ func NewProxyStore( | |
component component.StoreAPI, | ||
selectorLabels labels.Labels, | ||
responseTimeout time.Duration, | ||
queryTimeout time.Duration, | ||
) *ProxyStore { | ||
if logger == nil { | ||
logger = log.NewNopLogger() | ||
|
@@ -66,6 +72,7 @@ func NewProxyStore( | |
component: component, | ||
selectorLabels: selectorLabels, | ||
responseTimeout: responseTimeout, | ||
queryTimeout: queryTimeout, | ||
} | ||
return s | ||
} | ||
|
@@ -119,6 +126,8 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb | |
return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) } | ||
} | ||
|
||
// send writes response to sender channel | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use full sentence comments. They are for humans (: |
||
// or just returns if sender context was timed out | ||
func (s ctxRespSender) send(r *storepb.SeriesResponse) { | ||
select { | ||
case <-s.ctx.Done(): | ||
|
@@ -147,6 +156,9 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe | |
respSender, respRecv, closeFn = newRespCh(gctx, 10) | ||
) | ||
|
||
storeCtx, cancel := s.contextWithTimeout(gctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, what's the difference in this timeout to the overall gRPC request timeout or gRPC server timeout? I believe Querier can control the same thing by just specifying timeout here: https://github.com/improbable-eng/thanos/blob/1cd9ddd14999d6b074f34a4328e03f7ac3b7c26a/pkg/query/querier.go#L183 I would remove this from Effect is the same, if not better, as you missed to pass this context to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Imagine that we have two stores: first is very slow, second is fast. Partial time-out is enabled. I think it doesn't matter where we have this timeout, more important that at the moment it doesn't work perfect because of reading from stores in mergedSet: reading from stores works in series there and first slow store still blocks reading from second. We need some improvements there... |
||
defer cancel() | ||
|
||
g.Go(func() error { | ||
var ( | ||
seriesSet []storepb.SeriesSet | ||
|
@@ -177,8 +189,8 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe | |
} | ||
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) | ||
|
||
// This is used to cancel this stream when one operations takes too long. | ||
seriesCtx, closeSeries := context.WithCancel(gctx) | ||
// This is used to cancel this stream when one operation takes too long. | ||
seriesCtx, closeSeries := context.WithCancel(storeCtx) | ||
defer closeSeries() | ||
|
||
sc, err := st.Series(seriesCtx, r) | ||
|
@@ -295,19 +307,12 @@ func startStreamSeriesSet( | |
} | ||
|
||
if ctx.Err() != nil { | ||
s.writeWarningOrErrorResponse(partialResponse, errors.Wrapf(ctx.Err(), "receive series from %s", s.name)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice 👍 |
||
return | ||
} | ||
|
||
if err != nil { | ||
wrapErr := errors.Wrapf(err, "receive series from %s", s.name) | ||
if partialResponse { | ||
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr)) | ||
return | ||
} | ||
|
||
s.errMtx.Lock() | ||
s.err = wrapErr | ||
s.errMtx.Unlock() | ||
s.writeWarningOrErrorResponse(partialResponse, errors.Wrapf(err, "receive series from %s", s.name)) | ||
return | ||
} | ||
|
||
|
@@ -356,6 +361,24 @@ func (s *streamSeriesSet) Next() (ok bool) { | |
} | ||
} | ||
|
||
// writeWarningOrErrorResponse sends warning if partial response enabled or sets error otherwise | ||
func (s *streamSeriesSet) writeWarningOrErrorResponse(partialResponse bool, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can make this return nothing since we aren't using the return values anywhere and we aren't even setting them 😄 |
||
if partialResponse { | ||
s.warnCh.send(storepb.NewWarnSeriesResponse(err)) | ||
return | ||
} | ||
|
||
s.setError(err) | ||
return | ||
} | ||
|
||
// setError sets error (thread-safe) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. full sentence comments |
||
func (s *streamSeriesSet) setError(err error) { | ||
s.errMtx.Lock() | ||
s.err = err | ||
s.errMtx.Unlock() | ||
} | ||
|
||
func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) { | ||
if s.currSeries == nil { | ||
return nil, nil | ||
|
@@ -411,11 +434,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ | |
g, gctx = errgroup.WithContext(ctx) | ||
) | ||
|
||
storeCtx, cancel := s.contextWithTimeout(gctx) | ||
defer cancel() | ||
|
||
for _, st := range s.stores() { | ||
store := st | ||
g.Go(func() error { | ||
resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ | ||
Label: r.Label, | ||
resp, err := store.LabelValues(storeCtx, &storepb.LabelValuesRequest{ | ||
Label: r.Label, | ||
PartialResponseDisabled: r.PartialResponseDisabled, | ||
}) | ||
if err != nil { | ||
|
@@ -448,3 +474,11 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ | |
Warnings: warnings, | ||
}, nil | ||
} | ||
|
||
func (s *ProxyStore) contextWithTimeout(ctx context.Context) (context.Context, context.CancelFunc) { | ||
if s.queryTimeout > 0 { | ||
return context.WithTimeout(ctx, s.queryTimeout) | ||
} | ||
|
||
return ctx, func() {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave this as it was, reasons:
query
sounds like being related to QueryAPI, that's why this name. Which always includedpromql
one. That's why I would leave like it was, the fact that query also queries something else underneath is hidden. Also it matches Prometheusquery
flags as well. And yet another thing compatibility. Even though we are not1.0
this API change is major hit into compatibiltiy as suddently query timeout means something opposite to what it was ):promql
sounds like only promQL, however we know it involvesproxy StoreAPI Series()
invocation. More over not only one, but sometimes more then one! PromQL can run multipleSelect
s per single Query.I really like this idea, but IMO it should be:
query.timeout
(as it was)store.timeout
for proxy.go client timeout.alternatively if we want to be more explicit maybe
query.select-timeout
? That will also include then the deduplication process.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed