From c2ce628970fdfb24f8cfc5d16bda90a26e38d8ef Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 25 Sep 2024 17:31:19 +0200 Subject: [PATCH] kafka replay speed: ingestion metrics (#9346) * kafka replay speed: ingestion metrics Signed-off-by: Dimitar Dimitrov * Separate batch processing time by batch contents Signed-off-by: Dimitar Dimitrov * Also set time on metadata Signed-off-by: Dimitar Dimitrov * Add tenant to metrics Signed-off-by: Dimitar Dimitrov * Add metrics for errors Signed-off-by: Dimitar Dimitrov * Rename batching queue metrics Signed-off-by: Dimitar Dimitrov * Pairing to address code review Co-Authored-By: Dimitar Dimitrov * Move the metrics into their own file Co-Authored-By: Dimitar Dimitrov * go mod tidy Signed-off-by: gotjosh --------- Signed-off-by: Dimitar Dimitrov Signed-off-by: gotjosh Co-authored-by: gotjosh --- go.sum | 1 - .../mimir-mixin/dashboards/writes.libsonnet | 8 +- pkg/storage/ingest/pusher.go | 127 +++++++++--------- pkg/storage/ingest/pusher_metrics.go | 98 ++++++++++++++ pkg/storage/ingest/pusher_test.go | 28 ++-- 5 files changed, 183 insertions(+), 79 deletions(-) create mode 100644 pkg/storage/ingest/pusher_metrics.go diff --git a/go.sum b/go.sum index 46727c1f7bf..75d4a44e95a 100644 --- a/go.sum +++ b/go.sum @@ -1021,7 +1021,6 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= -github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U= diff --git a/operations/mimir-mixin/dashboards/writes.libsonnet b/operations/mimir-mixin/dashboards/writes.libsonnet index d345ef368fe..dc03b5931a1 100644 --- a/operations/mimir-mixin/dashboards/writes.libsonnet +++ b/operations/mimir-mixin/dashboards/writes.libsonnet @@ -274,10 +274,10 @@ local filename = 'mimir-writes.json'; ) + $.queryPanel( [ - 'histogram_avg(sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], ], [ 'avg', diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 81db96a006a..ec5ac38fd26 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -15,8 +15,6 @@ import ( "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/mimir/pkg/mimirpb" @@ -53,44 +51,6 @@ type pusherConsumer struct { pusher Pusher } -type pusherConsumerMetrics struct { - numTimeSeriesPerFlush prometheus.Histogram - processingTimeSeconds prometheus.Observer - clientErrRequests prometheus.Counter - serverErrRequests prometheus.Counter - totalRequests prometheus.Counter -} - -// newPusherConsumerMetrics creates a new pusherConsumerMetrics instance. -func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics { - errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_records_failed_total", - Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", - }, []string{"cause"}) - - return &pusherConsumerMetrics{ - numTimeSeriesPerFlush: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_ingester_pusher_num_timeseries_per_shard_flush", - Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than ingestion-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.", - NativeHistogramBucketFactor: 1.1, - }), - processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_ingest_storage_reader_processing_time_seconds", - Help: "Time taken to process a single record (write request).", - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxBucketNumber: 100, - NativeHistogramMinResetDuration: 1 * time.Hour, - Buckets: prometheus.DefBuckets, - }), - clientErrRequests: errRequestsCounter.WithLabelValues("client"), - serverErrRequests: errRequestsCounter.WithLabelValues("server"), - totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_records_total", - Help: "Number of attempted records (write requests).", - }), - } -} - // newPusherConsumer creates a new pusherConsumer instance. func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer { return &pusherConsumer{ @@ -105,6 +65,10 @@ func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsu // Consume implements the recordConsumer interface. // It'll use a separate goroutine to unmarshal the next record while we push the current record to storage. func (c pusherConsumer) Consume(ctx context.Context, records []record) error { + defer func(processingStart time.Time) { + c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds()) + }(time.Now()) + type parsedRecord struct { *mimirpb.WriteRequest // ctx holds the tracing baggage for this record/request. @@ -190,24 +154,20 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { func (c pusherConsumer) newStorageWriter() PusherCloser { if c.kafkaConfig.IngestionConcurrency == 0 { - return newSequentialStoragePusher(c.metrics, c.pusher) + return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher) } - return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger) + return newParallelStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger) } func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error { spanLog, ctx := spanlogger.NewWithLogger(ctx, c.logger, "pusherConsumer.pushToStorage") defer spanLog.Finish() - processingStart := time.Now() - // Note that the implementation of the Pusher expects the tenantID to be in the context. ctx = user.InjectOrgID(ctx, tenantID) err := writer.PushToStorage(ctx, req) - // TODO dimitarvdimitrov processing time is flawed because it's only counting enqueuing time, not processing time. - c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds()) c.metrics.totalRequests.Inc() isServerErr := c.handlePushErr(ctx, tenantID, err, spanLog) @@ -259,13 +219,13 @@ func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bo // sequentialStoragePusher receives mimirpb.WriteRequest which are then pushed to the storage one by one. type sequentialStoragePusher struct { - metrics *pusherConsumerMetrics + metrics *storagePusherMetrics pusher Pusher } // newSequentialStoragePusher creates a new sequentialStoragePusher instance. -func newSequentialStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher) sequentialStoragePusher { +func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher) sequentialStoragePusher { return sequentialStoragePusher{ metrics: metrics, pusher: pusher, @@ -274,8 +234,11 @@ func newSequentialStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher) s // PushToStorage implements the PusherCloser interface. func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { - // TODO: What about time?? - ssp.metrics.numTimeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) + ssp.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) + defer func(now time.Time) { + ssp.metrics.processingTime.WithLabelValues(requestContents(wr)).Observe(time.Since(now).Seconds()) + }(time.Now()) + return ssp.pusher.PushToStorage(ctx, wr) } @@ -287,7 +250,7 @@ func (ssp sequentialStoragePusher) Close() []error { // parallelStoragePusher receives WriteRequest which are then pushed to the storage in parallel. // The parallelism is two-tiered which means that we first parallelize by tenantID and then by series. type parallelStoragePusher struct { - metrics *pusherConsumerMetrics + metrics *storagePusherMetrics logger log.Logger // pushers is map["$tenant|$source"]*parallelStorageShards @@ -298,7 +261,7 @@ type parallelStoragePusher struct { } // newParallelStoragePusher creates a new parallelStoragePusher instance. -func newParallelStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher { +func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher { return ¶llelStoragePusher{ logger: log.With(logger, "component", "parallel-storage-pusher"), pushers: make(map[string]*parallelStorageShards), @@ -340,7 +303,7 @@ func (c parallelStoragePusher) shardsFor(userID string, requestSource mimirpb.Wr } // Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes. hashLabels := labels.Labels.Hash - p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels) + p := newParallelStorageShards(c.metrics, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels) c.pushers[userID+"|"+requestSource.String()] = p return p } @@ -350,7 +313,7 @@ type labelsHashFunc func(labels.Labels) uint64 // parallelStorageShards is a collection of shards that are used to parallelize the writes to the storage by series. // Each series is hashed to a shard that contains its own batchingQueue. type parallelStorageShards struct { - numTimeSeriesPerFlush prometheus.Histogram + metrics *storagePusherMetrics pusher Pusher hashLabels labelsHashFunc @@ -364,20 +327,22 @@ type parallelStorageShards struct { } type flushableWriteRequest struct { + // startedAt is the time when the first item was added to this request (timeseries or metadata). + startedAt time.Time *mimirpb.WriteRequest context.Context } // newParallelStorageShards creates a new parallelStorageShards instance. -func newParallelStorageShards(numTimeSeriesPerFlush prometheus.Histogram, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards { +func newParallelStorageShards(metrics *storagePusherMetrics, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards { p := ¶llelStorageShards{ - numShards: numShards, - pusher: pusher, - hashLabels: hashLabels, - capacity: capacity, - numTimeSeriesPerFlush: numTimeSeriesPerFlush, - batchSize: batchSize, - wg: &sync.WaitGroup{}, + numShards: numShards, + pusher: pusher, + hashLabels: hashLabels, + capacity: capacity, + metrics: metrics, + batchSize: batchSize, + wg: &sync.WaitGroup{}, } p.start() @@ -398,7 +363,6 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request * mimirpb.FromLabelAdaptersOverwriteLabels(&builder, ts.Labels, &nonCopiedLabels) shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards) - // TODO: Add metrics to measure how long are items sitting in the queue before they are flushed. if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil { // TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error. // We'll do that in the next PR as otherwise it's too many changes right now. @@ -450,7 +414,7 @@ func (p *parallelStorageShards) start() { p.wg.Add(p.numShards) for i := range shards { - shards[i] = newBatchingQueue(p.capacity, p.batchSize) + shards[i] = newBatchingQueue(p.capacity, p.batchSize, p.metrics.batchingQueueMetrics) go p.run(shards[i]) } @@ -463,17 +427,38 @@ func (p *parallelStorageShards) run(queue *batchingQueue) { defer queue.Done() for wr := range queue.Channel() { - p.numTimeSeriesPerFlush.Observe(float64(len(wr.WriteRequest.Timeseries))) + p.metrics.batchAge.Observe(time.Since(wr.startedAt).Seconds()) + p.metrics.timeSeriesPerFlush.Observe(float64(len(wr.WriteRequest.Timeseries))) + processingStart := time.Now() + err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest) + + p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds()) if err != nil { queue.ErrorChannel() <- err } } } +func requestContents(request *mimirpb.WriteRequest) string { + switch { + case len(request.Timeseries) > 0 && len(request.Metadata) > 0: + return "timeseries_and_metadata" + case len(request.Timeseries) > 0: + return "timeseries" + case len(request.Metadata) > 0: + return "metadata" + default: + // This would be a bug, but at least we'd know. + return "empty" + } +} + // batchingQueue is a queue that batches the incoming time series according to the batch size. // Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method. type batchingQueue struct { + metrics *batchingQueueMetrics + ch chan flushableWriteRequest errCh chan error done chan struct{} @@ -483,8 +468,9 @@ type batchingQueue struct { } // newBatchingQueue creates a new batchingQueue instance. -func newBatchingQueue(capacity int, batchSize int) *batchingQueue { +func newBatchingQueue(capacity int, batchSize int, metrics *batchingQueueMetrics) *batchingQueue { return &batchingQueue{ + metrics: metrics, ch: make(chan flushableWriteRequest, capacity), errCh: make(chan error, capacity+1), // We check errs before pushing to the channel, so we need to have a buffer of at least capacity+1 so that the consumer can push all of its errors and not rely on the producer to unblock it. done: make(chan struct{}), @@ -496,6 +482,9 @@ func newBatchingQueue(capacity int, batchSize int) *batchingQueue { // AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel(). // If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed. func (q *batchingQueue) AddToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, ts mimirpb.PreallocTimeseries) error { + if q.currentBatch.startedAt.IsZero() { + q.currentBatch.startedAt = time.Now() + } q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts) q.currentBatch.Context = ctx q.currentBatch.Source = source @@ -505,6 +494,9 @@ func (q *batchingQueue) AddToBatch(ctx context.Context, source mimirpb.WriteRequ // AddMetadataToBatch adds metadata to the current batch. func (q *batchingQueue) AddMetadataToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, metadata *mimirpb.MetricMetadata) error { + if q.currentBatch.startedAt.IsZero() { + q.currentBatch.startedAt = time.Now() + } q.currentBatch.Metadata = append(q.currentBatch.Metadata, metadata) q.currentBatch.Context = ctx q.currentBatch.Source = source @@ -558,6 +550,9 @@ func (q *batchingQueue) pushIfFull() error { func (q *batchingQueue) push() error { errs := q.collectErrors() + q.metrics.flushErrorsTotal.Add(float64(len(errs))) + q.metrics.flushTotal.Inc() + q.ch <- q.currentBatch q.resetCurrentBatch() diff --git a/pkg/storage/ingest/pusher_metrics.go b/pkg/storage/ingest/pusher_metrics.go new file mode 100644 index 00000000000..d8bc231521c --- /dev/null +++ b/pkg/storage/ingest/pusher_metrics.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package ingest + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// pusherConsumerMetrics holds the metrics for the pusherConsumer. +type pusherConsumerMetrics struct { + processingTimeSeconds prometheus.Observer + clientErrRequests prometheus.Counter + serverErrRequests prometheus.Counter + totalRequests prometheus.Counter + + storagePusherMetrics *storagePusherMetrics +} + +// newPusherConsumerMetrics creates a new pusherConsumerMetrics instance. +func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics { + errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_records_failed_total", + Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", + }, []string{"cause"}) + + return &pusherConsumerMetrics{ + storagePusherMetrics: newStoragePusherMetrics(reg), + processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_records_processing_time_seconds", + Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + Buckets: prometheus.DefBuckets, + }), + + clientErrRequests: errRequestsCounter.WithLabelValues("client"), + serverErrRequests: errRequestsCounter.WithLabelValues("server"), + totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_records_total", + Help: "Number of attempted records (write requests).", + }), + } +} + +// storagePusherMetrics holds the metrics for both the sequentialStoragePusher and the parallelStoragePusher. +type storagePusherMetrics struct { + // batchAge is not really important unless we're pushing many things at once, so it's only used as part of parallelStoragePusher. + batchAge prometheus.Histogram + processingTime *prometheus.HistogramVec + timeSeriesPerFlush prometheus.Histogram + batchingQueueMetrics *batchingQueueMetrics +} + +// newStoragePusherMetrics creates a new storagePusherMetrics instance. +func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics { + return &storagePusherMetrics{ + batchingQueueMetrics: newBatchingQueueMetrics(reg), + batchAge: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_pusher_batch_age_seconds", + Help: "Age of the batch of samples that are being ingested by an ingestion shard. This is the time since adding the first sample to the batch. Higher values indicates that the batching queue is not processing fast enough or that the batches are not filling up fast enough.", + NativeHistogramBucketFactor: 1.1, + }), + processingTime: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_pusher_processing_time_seconds", + Help: "Time to ingest a batch of samples for timeseries or metadata by an ingestion shard. The 'batch_contents' label indicates the contents of the batch.", + NativeHistogramBucketFactor: 1.1, + }, []string{"content"}), + timeSeriesPerFlush: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_pusher_timeseries_per_flush", + Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than -ingest-storage.kafka.ingestion-concurrency-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.", + NativeHistogramBucketFactor: 1.1, + }), + } +} + +// batchingQueueMetrics holds the metrics for the batchingQueue. +type batchingQueueMetrics struct { + flushTotal prometheus.Counter + flushErrorsTotal prometheus.Counter +} + +// newBatchingQueueMetrics creates a new batchingQueueMetrics instance. +func newBatchingQueueMetrics(reg prometheus.Registerer) *batchingQueueMetrics { + return &batchingQueueMetrics{ + flushTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_batching_queue_flush_total", + Help: "Number of times a batch of samples is flushed to the storage.", + }), + flushErrorsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_batching_queue_flush_errors_total", + Help: "Number of errors encountered while flushing a batch of samples to the storage.", + }), + } +} diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 26c5618fcec..3ccde7d3c41 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -18,7 +18,6 @@ import ( "github.com/grafana/dskit/user" "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" @@ -209,7 +208,8 @@ func TestPusherConsumer(t *testing.T) { }) logs := &concurrency.SyncBuffer{} - c := newPusherConsumer(pusher, KafkaConfig{}, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), log.NewLogfmtLogger(logs)) + metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry()) + c := newPusherConsumer(pusher, KafkaConfig{}, metrics, log.NewLogfmtLogger(logs)) err := c.Consume(context.Background(), tc.records) if tc.expErr == "" { assert.NoError(t, err) @@ -395,8 +395,6 @@ func (m *mockPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRe } func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { - noopHistogram := promauto.With(prometheus.NewRegistry()).NewHistogram(prometheus.HistogramOpts{Name: "noop", NativeHistogramBucketFactor: 1.1}) - testCases := map[string]struct { shardCount int batchSize int @@ -669,7 +667,8 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { pusher := &mockPusher{} // run with a buffer of one, so some of the tests can fill the buffer and test the error handling const buffer = 1 - shardingP := newParallelStorageShards(noopHistogram, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash) + metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry()) + shardingP := newParallelStorageShards(metrics, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash) for i, req := range tc.expectedUpstreamPushes { pusher.On("PushToStorage", mock.Anything, req).Return(tc.upstreamPushErrs[i]) @@ -831,7 +830,7 @@ func TestParallelStoragePusher(t *testing.T) { receivedPushes[tenantID][req.Source]++ }).Return(nil) - metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry()) + metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry()) psp := newParallelStoragePusher(metrics, pusher, 1, 1, logger) // Process requests @@ -856,7 +855,9 @@ func TestParallelStoragePusher(t *testing.T) { func TestBatchingQueue_NoDeadlock(t *testing.T) { capacity := 2 batchSize := 3 - queue := newBatchingQueue(capacity, batchSize) + reg := prometheus.NewPedanticRegistry() + m := newBatchingQueueMetrics(reg) + queue := newBatchingQueue(capacity, batchSize, m) ctx := context.Background() series := mockPreallocTimeseries("series_1") @@ -889,6 +890,15 @@ func TestBatchingQueue_NoDeadlock(t *testing.T) { require.Len(t, queue.ch, 0) require.Len(t, queue.errCh, 0) require.Len(t, queue.currentBatch.Timeseries, 0) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` +# HELP cortex_ingest_storage_reader_batching_queue_flush_errors_total Number of errors encountered while flushing a batch of samples to the storage. +# TYPE cortex_ingest_storage_reader_batching_queue_flush_errors_total counter +cortex_ingest_storage_reader_batching_queue_flush_errors_total 0 +# HELP cortex_ingest_storage_reader_batching_queue_flush_total Number of times a batch of samples is flushed to the storage. +# TYPE cortex_ingest_storage_reader_batching_queue_flush_total counter +cortex_ingest_storage_reader_batching_queue_flush_total 3 +`))) } func TestBatchingQueue(t *testing.T) { @@ -1119,7 +1129,9 @@ func TestBatchingQueue_ErrorHandling(t *testing.T) { func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.PreallocTimeseries) *batchingQueue { t.Helper() - queue := newBatchingQueue(capacity, batchSize) + reg := prometheus.NewPedanticRegistry() + m := newBatchingQueueMetrics(reg) + queue := newBatchingQueue(capacity, batchSize, m) for _, s := range series { require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, s))