Skip to content

Commit

Permalink
query: Add store.response-timeout (#928)
Browse files Browse the repository at this point in the history
* Add store.receive-timeout

* Apply suggestions from code review

Co-Authored-By: povilasv <[email protected]>

* Apply suggestions from code review

Co-Authored-By: povilasv <[email protected]>

* Fixes after review

* Update pkg/store/proxy.go

Co-Authored-By: povilasv <[email protected]>

* Fixes after review
  • Loading branch information
povilasv authored and bwplotka committed Mar 26, 2019
1 parent 5b74df2 commit 441a769
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 23 deletions.
6 changes: 5 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
enablePartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified.").
Default("true").Bool()

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true)
if err != nil {
Expand Down Expand Up @@ -139,6 +141,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*webPrefixHeaderName,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
peer,
selectorLset,
Expand Down Expand Up @@ -254,6 +257,7 @@ func runQuery(
webPrefixHeaderName string,
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
peer cluster.Peer,
selectorLset labels.Labels,
Expand Down Expand Up @@ -307,7 +311,7 @@ func runQuery(
},
dialOpts,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
5 changes: 5 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,10 @@ Flags:
if no max_source_resolution param is specified.
--query.partial-response Enable partial response for queries if no
partial_response param is specified.
--store.response-timeout=0ms
If a Store doesn't send any data in this
specified duration then a Store will be ignored
and partial data will be returned if it's
enabled. 0 disables timeout.
```
4 changes: 4 additions & 0 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func (s *storeRef) String() string {
return fmt.Sprintf("Addr: %s Labels: %v Mint: %d Maxt: %d", s.addr, s.Labels(), mint, maxt)
}

func (s *storeRef) Addr() string {
return s.addr
}

func (s *storeRef) close() {
runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr))
}
Expand Down
99 changes: 79 additions & 20 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -32,6 +33,8 @@ type Client interface {
TimeRange() (mint int64, maxt int64)

String() string
// Addr returns address of a Client.
Addr() string
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
Expand All @@ -40,6 +43,8 @@ type ProxyStore struct {
stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels

responseTimeout time.Duration
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
Expand All @@ -49,15 +54,18 @@ func NewProxyStore(
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
responseTimeout time.Duration,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
}

s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
}
return s
}
Expand Down Expand Up @@ -147,7 +155,11 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

sc, err := st.Series(gctx, r)
// This is used to cancel this stream when one operations takes too long.
seriesCtx, closeSeries := context.WithCancel(gctx)
defer closeSeries()

sc, err := st.Series(seriesCtx, r)
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
Expand All @@ -162,12 +174,13 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
continue
}

// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled))
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))

if len(seriesSet) == 0 {
// This is indicates that configured StoreAPIs are not the ones end user expects
err := errors.New("No store matched for this query")
Expand Down Expand Up @@ -196,7 +209,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return err
}
return nil

}

type warnSender interface {
Expand All @@ -206,6 +218,9 @@ type warnSender interface {
// streamSeriesSet iterates over incoming stream of series.
// All errors are sent out of band via warning channel.
type streamSeriesSet struct {
ctx context.Context
logger log.Logger

stream storepb.Store_SeriesClient
warnCh warnSender

Expand All @@ -215,30 +230,44 @@ type streamSeriesSet struct {
errMtx sync.Mutex
err error

name string
name string
partialResponse bool

responseTimeout time.Duration
closeSeries context.CancelFunc
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
closeSeries context.CancelFunc,
wg *sync.WaitGroup,
stream storepb.Store_SeriesClient,
warnCh warnSender,
name string,
partialResponse bool,
responseTimeout time.Duration,
) *streamSeriesSet {
s := &streamSeriesSet{
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
ctx: ctx,
logger: logger,
closeSeries: closeSeries,
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
partialResponse: partialResponse,
responseTimeout: responseTimeout,
}

wg.Add(1)
go func() {
defer wg.Done()
defer close(s.recvCh)

for {
r, err := s.stream.Recv()

if err == io.EOF {
return
}
Expand All @@ -248,14 +277,15 @@ func startStreamSeriesSet(
}

if err != nil {
wrapErr := errors.Wrapf(err, "receive series from %s", s.name)
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series")))
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr))
return
}

s.errMtx.Lock()
defer s.errMtx.Unlock()
s.err = err
s.err = wrapErr
s.errMtx.Unlock()
return
}

Expand All @@ -269,10 +299,39 @@ func startStreamSeriesSet(
return s
}

// Next blocks until new message is received or stream is closed.
// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
s.currSeries, ok = <-s.recvCh
return ok
ctx := s.ctx
timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name)

if s.responseTimeout != 0 {
timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)

timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout)
defer done()
ctx = timeoutCtx
}

select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
// closeSeries to shutdown a goroutine in startStreamSeriesSet.
s.closeSeries()

err := errors.Wrap(ctx.Err(), timeoutMsg)
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return false
}
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
Loading

0 comments on commit 441a769

Please sign in to comment.