Skip to content

Commit

Permalink
query: cleanup store statuses as they come and go
Browse files Browse the repository at this point in the history
Signed-off-by: Adrien Fillon <[email protected]>
  • Loading branch information
adrien-f committed Apr 8, 2019
1 parent 4fd0adc commit 4ad70e0
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added
- [#811](https://github.com/improbable-eng/thanos/pull/811) Remote write receiver
- [#798](https://github.com/improbable-eng/thanos/pull/798) Ability to limit the maximum concurrent about of Series() calls in Thanos Store and the maximum amount of samples.
- [#910](https://github.com/improbable-eng/thanos/pull/910) Query's stores UI page is now sorted by type and old DNS or File SD stores are removed after 5 minutes (configurable via the new `--store.unhealthy-timeout=5m` flag).

New options:

Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
dnsSDInterval := modelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions.").
Default("30s"))

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified. ").
unhealthyStoreTimeout := modelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m"))

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified.").
Default("false").Bool()

enablePartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified.").
Expand Down Expand Up @@ -150,6 +152,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*enablePartialResponse,
fileSD,
time.Duration(*dnsSDInterval),
time.Duration(*unhealthyStoreTimeout),
)
}
}
Expand Down Expand Up @@ -266,6 +269,7 @@ func runQuery(
enablePartialResponse bool,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
unhealthyStoreTimeout time.Duration,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -310,6 +314,7 @@ func runQuery(
return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ Flags:
is used as a resync fallback.
--store.sd-dns-interval=30s
Interval between DNS resolutions.
--store.unhealthy-timeout=5m
Timeout before an unhealthy store is cleaned
from the store UI page.
--query.auto-downsampling Enable automatic adjustment (step / 5) to what
source of data should be used in store gateways
if no max_source_resolution param is specified.
Expand Down
71 changes: 48 additions & 23 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ type StoreSet struct {
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration

mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelStores map[string]int
storeStatuses map[string]*StoreStatus
mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
}

type storeSetNodeCollector struct {
Expand Down Expand Up @@ -118,6 +119,7 @@ func NewStoreSet(
reg *prometheus.Registry,
storeSpecs func() []StoreSpec,
dialOpts []grpc.DialOption,
unhealthyStoreTimeout time.Duration,
) *StoreSet {
storeNodeConnections := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_nodes_grpc_connections",
Expand All @@ -135,14 +137,15 @@ func NewStoreSet(
}

ss := &StoreSet{
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelStores: map[string]int{},
stores: make(map[string]*storeRef),
storeStatuses: make(map[string]*StoreStatus),
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelStores: map[string]int{},
stores: make(map[string]*storeRef),
storeStatuses: make(map[string]*StoreStatus),
unhealthyStoreTimeout: unhealthyStoreTimeout,
}

storeNodeCollector := &storeSetNodeCollector{externalLabelOccurrences: ss.externalLabelOccurrences}
Expand Down Expand Up @@ -255,6 +258,7 @@ func (s *StoreSet) Update(ctx context.Context) {
}
s.externalLabelStores = externalLabelStores
s.storeNodeConnections.Set(float64(len(s.stores)))
s.cleanUpStoreStatuses()
}

func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
Expand Down Expand Up @@ -345,16 +349,23 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
s.storesStatusesMtx.Lock()
defer s.storesStatusesMtx.Unlock()

now := time.Now()
s.storeStatuses[store.addr] = &StoreStatus{
Name: store.addr,
LastError: err,
LastCheck: now,
Labels: store.labels,
StoreType: store.storeType,
MinTime: store.minTime,
MaxTime: store.maxTime,
status := StoreStatus{Name: store.addr}
prev, ok := s.storeStatuses[store.addr]
if ok {
status = *prev
}

status.LastError = err
status.LastCheck = time.Now()

if err == nil {
status.Labels = store.labels
status.StoreType = store.storeType
status.MinTime = store.minTime
status.MaxTime = store.maxTime
}

s.storeStatuses[store.addr] = &status
}

func (s *StoreSet) GetStoreStatus() []StoreStatus {
Expand Down Expand Up @@ -401,3 +412,17 @@ func (s *StoreSet) Close() {
st.close()
}
}

func (s *StoreSet) cleanUpStoreStatuses() {
s.storesStatusesMtx.Lock()
defer s.storesStatusesMtx.Unlock()

now := time.Now()
for addr, status := range s.storeStatuses {
if _, ok := s.stores[addr]; !ok {
if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout {
delete(s.storeStatuses, addr)
}
}
}
}
8 changes: 4 additions & 4 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestStoreSet_AllAvailable_ThenDown(t *testing.T) {

// Testing if duplicates can cause weird results.
initialStoreAddr = append(initialStoreAddr, initialStoreAddr[0])
storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts)
storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
storeSet.gRPCInfoCallTimeout = 2 * time.Second
defer storeSet.Close()

Expand Down Expand Up @@ -185,7 +185,7 @@ func TestStoreSet_StaticStores_OneAvailable(t *testing.T) {
initialStoreAddr := st.StoreAddresses()
st.CloseOne(initialStoreAddr[0])

storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts)
storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
storeSet.gRPCInfoCallTimeout = 2 * time.Second
defer storeSet.Close()

Expand Down Expand Up @@ -215,7 +215,7 @@ func TestStoreSet_StaticStores_NoneAvailable(t *testing.T) {
st.CloseOne(initialStoreAddr[0])
st.CloseOne(initialStoreAddr[1])

storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts)
storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
storeSet.gRPCInfoCallTimeout = 2 * time.Second

// Should not matter how many of these we run.
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestStoreSet_AllAvailable_BlockExtLsetDuplicates(t *testing.T) {

initialStoreAddr := st.StoreAddresses()

storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts)
storeSet := NewStoreSet(nil, nil, specsFromAddrFunc(initialStoreAddr), testGRPCOpts, time.Minute)
storeSet.gRPCInfoCallTimeout = 2 * time.Second
defer storeSet.Close()

Expand Down
Loading

0 comments on commit 4ad70e0

Please sign in to comment.