Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: Add store.response-timeout #928

Merged
merged 6 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
enablePartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified.").
Default("true").Bool()

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true)
if err != nil {
Expand Down Expand Up @@ -139,6 +141,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*webPrefixHeaderName,
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
peer,
selectorLset,
Expand Down Expand Up @@ -254,6 +257,7 @@ func runQuery(
webPrefixHeaderName string,
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
peer cluster.Peer,
selectorLset labels.Labels,
Expand Down Expand Up @@ -304,7 +308,7 @@ func runQuery(
},
dialOpts,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
5 changes: 5 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,10 @@ Flags:
if no max_source_resolution param is specified.
--query.partial-response Enable partial response for queries if no
partial_response param is specified.
--store.response-timeout=0ms
If a Store doesn't send any data in this
specified duration then a Store will be ignored
and partial data will be returned if it's
enabled. 0 disables timeout.

```
4 changes: 4 additions & 0 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func (s *storeRef) String() string {
return fmt.Sprintf("Addr: %s Labels: %v Mint: %d Maxt: %d", s.addr, s.Labels(), mint, maxt)
}

func (s *storeRef) Addr() string {
return s.addr
}

func (s *storeRef) close() {
runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr))
}
Expand Down
99 changes: 79 additions & 20 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -32,6 +33,8 @@ type Client interface {
TimeRange() (mint int64, maxt int64)

String() string
// Addr returns address of a Client.
Addr() string
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
Expand All @@ -40,6 +43,8 @@ type ProxyStore struct {
stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels

responseTimeout time.Duration
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
Expand All @@ -49,15 +54,18 @@ func NewProxyStore(
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
responseTimeout time.Duration,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
}

s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
}
return s
}
Expand Down Expand Up @@ -147,7 +155,11 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

sc, err := st.Series(gctx, r)
// This is used to cancel this stream when one operations takes too long.
seriesCtx, closeSeries := context.WithCancel(gctx)
defer closeSeries()

sc, err := st.Series(seriesCtx, r)
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
Expand All @@ -162,12 +174,13 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
continue
}

// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled))
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))

if len(seriesSet) == 0 {
// This is indicates that configured StoreAPIs are not the ones end user expects
err := errors.New("No store matched for this query")
Expand Down Expand Up @@ -196,7 +209,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return err
}
return nil

}

type warnSender interface {
Expand All @@ -206,6 +218,9 @@ type warnSender interface {
// streamSeriesSet iterates over incoming stream of series.
// All errors are sent out of band via warning channel.
type streamSeriesSet struct {
ctx context.Context
logger log.Logger

stream storepb.Store_SeriesClient
warnCh warnSender

Expand All @@ -215,30 +230,44 @@ type streamSeriesSet struct {
errMtx sync.Mutex
err error

name string
name string
partialResponse bool

responseTimeout time.Duration
closeSeries context.CancelFunc
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
closeSeries context.CancelFunc,
wg *sync.WaitGroup,
stream storepb.Store_SeriesClient,
warnCh warnSender,
name string,
partialResponse bool,
responseTimeout time.Duration,
) *streamSeriesSet {
s := &streamSeriesSet{
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
ctx: ctx,
logger: logger,
closeSeries: closeSeries,
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, 10),
name: name,
partialResponse: partialResponse,
responseTimeout: responseTimeout,
}

wg.Add(1)
go func() {
defer wg.Done()
defer close(s.recvCh)

for {
r, err := s.stream.Recv()

if err == io.EOF {
return
}
Expand All @@ -248,14 +277,15 @@ func startStreamSeriesSet(
}

if err != nil {
wrapErr := errors.Wrapf(err, "receive series from %s", s.name)
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series")))
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr))
return
}

s.errMtx.Lock()
defer s.errMtx.Unlock()
s.err = err
s.err = wrapErr
s.errMtx.Unlock()
return
}

Expand All @@ -269,10 +299,39 @@ func startStreamSeriesSet(
return s
}

// Next blocks until new message is received or stream is closed.
// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
s.currSeries, ok = <-s.recvCh
return ok
ctx := s.ctx
timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name)

if s.responseTimeout != 0 {
timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)

timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout)
defer done()
ctx = timeoutCtx
}

select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
// closeSeries to shutdown a goroutine in startStreamSeriesSet.
s.closeSeries()

err := errors.Wrap(ctx.Err(), timeoutMsg)
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return false
}
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
Loading