Skip to content

Commit

Permalink
store: Expose bucket index operation duration histogram (#2725)
Browse files Browse the repository at this point in the history
* Expose bucket fetch operation duration histograms

Signed-off-by: Kemal Akkoyun <[email protected]>

* Apply suggestions from code review

Co-authored-by: Marco Pracucci <[email protected]>
Signed-off-by: Kemal Akkoyun <[email protected]>

* Merge histograms

Signed-off-by: Kemal Akkoyun <[email protected]>

* Use another approach to track index operations

Signed-off-by: Kemal Akkoyun <[email protected]>

* Split lookup duration histograms

Signed-off-by: Kemal Akkoyun <[email protected]>

* Use timer API

Signed-off-by: Kemal Akkoyun <[email protected]>

* Rename and clarify description

Signed-off-by: Kemal Akkoyun <[email protected]>

Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
kakkoyun and pracucci authored Aug 10, 2020
1 parent 9b578af commit dec6c09
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (c *memcachedClient) resolveAddrs() error {

// If some of the dns resolution fails, log the error.
if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil {
level.Error(c.logger).Log("msg", "failed to resolve addresses for storeAPIs", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err)
}
// Fail in case no server address is resolved.
servers := c.dnsProvider.Addresses()
Expand Down
1 change: 1 addition & 0 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"operation"}),

lastSuccessfulUploadTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_objstore_bucket_last_successful_upload_time",
Help: "Second timestamp of the last successful upload to the bucket.",
Expand Down
41 changes: 34 additions & 7 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type bucketStoreMetrics struct {
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter

seriesFetchDuration prometheus.Histogram
postingsFetchDuration prometheus.Histogram
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -221,6 +224,18 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Help: "Compressed size of postings stored into cache.",
})

m.seriesFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_cached_series_fetch_duration_seconds",
Help: "Time it takes to fetch series from a bucket to respond a query. It also includes the time it takes to cache fetch and store operations.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_cached_postings_fetch_duration_seconds",
Help: "Time it takes to fetch postings from a bucket to respond a query. It also includes the time it takes to cache fetch and store operations.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

return &m
}

Expand Down Expand Up @@ -473,7 +488,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

indexHeaderReader, err := indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling)
indexHeaderReader, err := indexheader.NewBinaryReader(
ctx,
s.logger,
s.bkt,
s.dir,
meta.ULID,
s.postingOffsetsInMemSampling,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
Expand All @@ -486,14 +508,14 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", meta.ULID),
s.metrics,
meta,
s.bkt,
dir,
s.indexCache,
s.chunkPool,
indexHeaderReader,
s.partitioner,
s.metrics.seriesRefetches,
s.enablePostingsCompression,
)
if err != nil {
Expand Down Expand Up @@ -1264,6 +1286,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
dir string
Expand All @@ -1278,8 +1301,6 @@ type bucketBlock struct {

partitioner partitioner

seriesRefetches prometheus.Counter

enablePostingsCompression bool

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
Expand All @@ -1290,26 +1311,26 @@ type bucketBlock struct {
func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
enablePostingsCompression bool,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
enablePostingsCompression: enablePostingsCompression,
}

Expand Down Expand Up @@ -1616,6 +1637,9 @@ type postingPtr struct {
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

var ptrs []postingPtr

output := make([]index.Postings, len(keys))
Expand Down Expand Up @@ -1833,6 +1857,9 @@ func (it *bigEndianPostings) length() int {
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.block.indexCache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids)
Expand Down Expand Up @@ -1883,7 +1910,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc
}

// Inefficient, but should be rare.
r.block.seriesRefetches.Inc()
r.block.metrics.seriesRefetches.Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
Expand Down
37 changes: 20 additions & 17 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"

"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand All @@ -52,7 +52,6 @@ import (
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/atomic"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestBucketBlock_matchLabels(t *testing.T) {
},
}

b, err := newBucketBlock(context.Background(), log.NewNopLogger(), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, true)
b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, true)
testutil.Ok(t, err)

cases := []struct {
Expand Down Expand Up @@ -921,10 +920,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
ULID: ulid.MustNew(1, nil),
},
},
bkt: bkt,
seriesRefetches: s.seriesRefetches,
logger: log.NewNopLogger(),
indexCache: noopCache{},
bkt: bkt,
logger: log.NewNopLogger(),
metrics: s,
indexCache: noopCache{},
}

buf := encoding.Encbuf{}
Expand Down Expand Up @@ -1130,6 +1129,7 @@ func benchmarkExpandedPostings(
t.Run(c.name, func(t testutil.TB) {
b := &bucketBlock{
logger: log.NewNopLogger(),
metrics: newBucketStoreMetrics(nil),
indexHeaderReader: r,
indexCache: noopCache{},
bkt: bkt,
Expand Down Expand Up @@ -1228,15 +1228,16 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String())))

m := newBucketStoreMetrics(nil)
b := &bucketBlock{
indexCache: noopCache{},
logger: logger,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
seriesRefetches: promauto.With(nil).NewCounter(prometheus.CounterOpts{}),
indexCache: noopCache{},
logger: logger,
metrics: m,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
blocks = append(blocks, b)
}
Expand Down Expand Up @@ -1289,7 +1290,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request

for _, b := range blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
testutil.Equals(t, 0.0, promtest.ToFloat64(b.seriesRefetches))
testutil.Equals(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
}
}
Expand Down Expand Up @@ -1393,6 +1394,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b1 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down Expand Up @@ -1431,6 +1433,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
b2 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
Expand Down

0 comments on commit dec6c09

Please sign in to comment.