Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify store gateway metrics #123

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
* [ENHANCEMENT] Querier now can use the `LabelNames` call with matchers, if matchers are provided in the `/labels` API call, instead of using the more expensive `MetricsForLabelMatchers` call as before. This can be enabled by enabling the `-querier.query-label-names-with-matchers-enabled` flag once the ingesters are updated to this version. In the future this is expected to become the default behavior. #3
* [ENHANCEMENT] Ingester: added option `-ingester.readiness-check-ring-health` to disable the ring health check in the readiness endpoint. #48
* [ENHANCEMENT] Added option `-distributor.excluded-zones` to exclude ingesters running in specific zones both on write and read path. #51
* [ENHANCEMENT] Store-gateway: added `cortex_bucket_store_sent_chunk_size_bytes` metric, tracking the size of chunks sent from store-gateway to querier. #123
* [ENHANCEMENT] Store-gateway: reduced CPU and memory utilization due to exported metrics aggregation for instances with a large number of tenants. #123
* [BUGFIX] Upgrade Prometheus. TSDB now waits for pending readers before truncating Head block, fixing the `chunk not found` error and preventing wrong query results. #16
* [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #28

Expand Down
192 changes: 21 additions & 171 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -89,174 +88,16 @@ const (
labelDecode = "decode"
)

type bucketStoreMetrics struct {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to pkg/storegateway/bucket_store_metrics.go

blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
blockLoadFailures prometheus.Counter
blockDrops prometheus.Counter
blockDropFailures prometheus.Counter
seriesDataTouched *prometheus.SummaryVec
seriesDataFetched *prometheus.SummaryVec
seriesDataSizeTouched *prometheus.SummaryVec
seriesDataSizeFetched *prometheus.SummaryVec
seriesBlocksQueried prometheus.Summary
seriesGetAllDuration prometheus.Histogram
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
cachedPostingsCompressionErrors *prometheus.CounterVec
cachedPostingsCompressionTimeSeconds *prometheus.CounterVec
cachedPostingsOriginalSizeBytes prometheus.Counter
cachedPostingsCompressedSizeBytes prometheus.Counter

seriesHashCacheRequests prometheus.Counter
seriesHashCacheHits prometheus.Counter

seriesFetchDuration prometheus.Histogram
postingsFetchDuration prometheus.Histogram
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
var m bucketStoreMetrics

m.blockLoads = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_block_loads_total",
Help: "Total number of remote block loading attempts.",
})
m.blockLoadFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_block_load_failures_total",
Help: "Total number of failed remote block loading attempts.",
})
m.blockDrops = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_block_drops_total",
Help: "Total number of local blocks that were dropped.",
})
m.blockDropFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_block_drop_failures_total",
Help: "Total number of local blocks that failed to be dropped.",
})
m.blocksLoaded = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_blocks_loaded",
Help: "Number of currently loaded blocks.",
})

m.seriesDataTouched = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{
Name: "thanos_bucket_store_series_data_touched",
Help: "How many items of a data type in a block were touched for a single series request.",
}, []string{"data_type"})
m.seriesDataFetched = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{
Name: "thanos_bucket_store_series_data_fetched",
Help: "How many items of a data type in a block were fetched for a single series request.",
}, []string{"data_type"})

m.seriesDataSizeTouched = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{
Name: "thanos_bucket_store_series_data_size_touched_bytes",
Help: "Size of all items of a data type in a block were touched for a single series request.",
}, []string{"data_type"})
m.seriesDataSizeFetched = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{
Name: "thanos_bucket_store_series_data_size_fetched_bytes",
Help: "Size of all items of a data type in a block were fetched for a single series request.",
}, []string{"data_type"})

m.seriesBlocksQueried = promauto.With(reg).NewSummary(prometheus.SummaryOpts{
Name: "thanos_bucket_store_series_blocks_queried",
Help: "Number of blocks in a bucket store that were touched to satisfy a query.",
})
m.seriesGetAllDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_series_get_all_duration_seconds",
Help: "Time it takes until all per-block prepares and loads for a query are finished.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})
m.seriesMergeDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_series_merge_duration_seconds",
Help: "Time it takes to merge sub-results from all queried blocks into a single result.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})
m.resultSeriesCount = promauto.With(reg).NewSummary(prometheus.SummaryOpts{
Name: "thanos_bucket_store_series_result_series",
Help: "Number of series observed in the final result of a query.",
})

m.chunkSizeBytes = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_sent_chunk_size_bytes",
Help: "Size in bytes of the chunks for the single series, which is adequate to the gRPC message size sent to querier.",
Buckets: []float64{
32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024,
},
})

m.queriesDropped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_dropped_total",
Help: "Number of queries that were dropped due to the limit.",
}, []string{"reason"})
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
})

m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compressions_total",
Help: "Number of postings compressions before storing to index cache.",
}, []string{"op"})
m.cachedPostingsCompressions.WithLabelValues(labelEncode)
m.cachedPostingsCompressions.WithLabelValues(labelDecode)

m.cachedPostingsCompressionErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_errors_total",
Help: "Number of postings compression errors.",
}, []string{"op"})
m.cachedPostingsCompressionErrors.WithLabelValues(labelEncode)
m.cachedPostingsCompressionErrors.WithLabelValues(labelDecode)

m.cachedPostingsCompressionTimeSeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compression_time_seconds_total",
Help: "Time spent compressing postings before storing them into postings cache.",
}, []string{"op"})
m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode)
m.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode)

m.cachedPostingsOriginalSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_original_size_bytes_total",
Help: "Original size of postings stored into cache.",
})
m.cachedPostingsCompressedSizeBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_cached_postings_compressed_size_bytes_total",
Help: "Compressed size of postings stored into cache.",
})

m.seriesFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_cached_series_fetch_duration_seconds",
Help: "The time it takes to fetch series to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.postingsFetchDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_bucket_store_cached_postings_fetch_duration_seconds",
Help: "The time it takes to fetch postings to respond to a request sent to a store gateway. It includes both the time to fetch it from the cache and from storage in case of cache misses.",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
})

m.seriesHashCacheRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_hash_cache_requests_total",
Help: "Total number of fetch attempts to the in-memory series hash cache.",
})
m.seriesHashCacheHits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_hash_cache_hits_total",
Help: "Total number of fetch hits to the in-memory series hash cache.",
})

return &m
}

// FilterConfig is a configuration, which Store uses for filtering metrics based on time.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

type BucketStoreStats struct {
// BlocksLoaded is the number of blocks currently loaded in the bucket store.
BlocksLoaded int
}

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
//
Expand All @@ -266,7 +107,7 @@ type FilterConfig struct {
type BucketStore struct {
logger log.Logger
reg prometheus.Registerer // TODO(metalmatze) remove and add via BucketStoreOption
metrics *bucketStoreMetrics
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
dir string
Expand Down Expand Up @@ -386,6 +227,7 @@ func NewBucketStore(
lazyIndexReaderEnabled bool,
lazyIndexReaderIdleTimeout time.Duration,
seriesHashCache *SeriesHashCache,
metrics *BucketStoreMetrics,
options ...BucketStoreOption,
) (*BucketStore, error) {
s := &BucketStore{
Expand All @@ -406,6 +248,7 @@ func NewBucketStore(
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
seriesHashCache: seriesHashCache,
metrics: metrics,
}

for _, option := range options {
Expand All @@ -414,7 +257,6 @@ func NewBucketStore(

// Depend on the options
s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg))
s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too

if err := os.MkdirAll(dir, 0750); err != nil {
return nil, errors.Wrap(err, "create dir")
Expand All @@ -436,6 +278,17 @@ func (s *BucketStore) Close() (err error) {
return err
}

// Stats returns statistics about the BucketStore instance.
func (s *BucketStore) Stats() BucketStoreStats {
stats := BucketStoreStats{}

s.mtx.RLock()
stats.BlocksLoaded = len(s.blocks)
s.mtx.RUnlock()

return stats
}

// SyncBlocks synchronizes the stores state with the Bucket bucket.
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
Expand Down Expand Up @@ -619,8 +472,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
}
s.blocks[b.meta.ULID] = b

s.metrics.blocksLoaded.Inc()

return nil
}

Expand All @@ -638,7 +489,6 @@ func (s *BucketStore) removeBlock(id ulid.ULID) error {
return nil
}

s.metrics.blocksLoaded.Dec()
if err := b.Close(); err != nil {
return errors.Wrap(err, "close block")
}
Expand Down Expand Up @@ -1623,7 +1473,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
metrics *BucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
dir string
Expand All @@ -1647,7 +1497,7 @@ type bucketBlock struct {
func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
metrics *BucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
dir string,
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
true,
time.Minute,
NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(s.logger),
WithIndexCache(s.cache),
WithFilterConfig(filterConf),
Expand Down
Loading