Skip to content

Commit

Permalink
kafka replay speed: ingestion metrics (#9346)
Browse files Browse the repository at this point in the history
* kafka replay speed: ingestion metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Separate batch processing time by batch contents

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Also set time on metadata

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add tenant to metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Add metrics for errors

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Rename batching queue metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Pairing to address code review

Co-Authored-By: Dimitar Dimitrov <[email protected]>

* Move the metrics into their own file

Co-Authored-By: Dimitar Dimitrov <[email protected]>

* go mod tidy

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: gotjosh <[email protected]>
  • Loading branch information
dimitarvdimitrov and gotjosh authored Sep 25, 2024
1 parent 2a58744 commit c2ce628
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 79 deletions.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,6 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A=
github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-jose/go-jose/v4 v4.0.1 h1:QVEPDE3OluqXBQZDcnNvQrInro2h0e4eqNbnZSWqS6U=
Expand Down
8 changes: 4 additions & 4 deletions operations/mimir-mixin/dashboards/writes.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ local filename = 'mimir-writes.json';
) +
$.queryPanel(
[
'histogram_avg(sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)],
],
[
'avg',
Expand Down
127 changes: 61 additions & 66 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -53,44 +51,6 @@ type pusherConsumer struct {
pusher Pusher
}

type pusherConsumerMetrics struct {
numTimeSeriesPerFlush prometheus.Histogram
processingTimeSeconds prometheus.Observer
clientErrRequests prometheus.Counter
serverErrRequests prometheus.Counter
totalRequests prometheus.Counter
}

// newPusherConsumerMetrics creates a new pusherConsumerMetrics instance.
func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics {
errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_failed_total",
Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
}, []string{"cause"})

return &pusherConsumerMetrics{
numTimeSeriesPerFlush: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_pusher_num_timeseries_per_shard_flush",
Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than ingestion-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.",
NativeHistogramBucketFactor: 1.1,
}),
processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_processing_time_seconds",
Help: "Time taken to process a single record (write request).",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.DefBuckets,
}),
clientErrRequests: errRequestsCounter.WithLabelValues("client"),
serverErrRequests: errRequestsCounter.WithLabelValues("server"),
totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_total",
Help: "Number of attempted records (write requests).",
}),
}
}

// newPusherConsumer creates a new pusherConsumer instance.
func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer {
return &pusherConsumer{
Expand All @@ -105,6 +65,10 @@ func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsu
// Consume implements the recordConsumer interface.
// It'll use a separate goroutine to unmarshal the next record while we push the current record to storage.
func (c pusherConsumer) Consume(ctx context.Context, records []record) error {
defer func(processingStart time.Time) {
c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds())
}(time.Now())

type parsedRecord struct {
*mimirpb.WriteRequest
// ctx holds the tracing baggage for this record/request.
Expand Down Expand Up @@ -190,24 +154,20 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error {

func (c pusherConsumer) newStorageWriter() PusherCloser {
if c.kafkaConfig.IngestionConcurrency == 0 {
return newSequentialStoragePusher(c.metrics, c.pusher)
return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher)
}

return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger)
return newParallelStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger)
}

func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error {
spanLog, ctx := spanlogger.NewWithLogger(ctx, c.logger, "pusherConsumer.pushToStorage")
defer spanLog.Finish()

processingStart := time.Now()

// Note that the implementation of the Pusher expects the tenantID to be in the context.
ctx = user.InjectOrgID(ctx, tenantID)
err := writer.PushToStorage(ctx, req)

// TODO dimitarvdimitrov processing time is flawed because it's only counting enqueuing time, not processing time.
c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds())
c.metrics.totalRequests.Inc()

isServerErr := c.handlePushErr(ctx, tenantID, err, spanLog)
Expand Down Expand Up @@ -259,13 +219,13 @@ func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bo

// sequentialStoragePusher receives mimirpb.WriteRequest which are then pushed to the storage one by one.
type sequentialStoragePusher struct {
metrics *pusherConsumerMetrics
metrics *storagePusherMetrics

pusher Pusher
}

// newSequentialStoragePusher creates a new sequentialStoragePusher instance.
func newSequentialStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher) sequentialStoragePusher {
func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher) sequentialStoragePusher {
return sequentialStoragePusher{
metrics: metrics,
pusher: pusher,
Expand All @@ -274,8 +234,11 @@ func newSequentialStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher) s

// PushToStorage implements the PusherCloser interface.
func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error {
// TODO: What about time??
ssp.metrics.numTimeSeriesPerFlush.Observe(float64(len(wr.Timeseries)))
ssp.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries)))
defer func(now time.Time) {
ssp.metrics.processingTime.WithLabelValues(requestContents(wr)).Observe(time.Since(now).Seconds())
}(time.Now())

return ssp.pusher.PushToStorage(ctx, wr)
}

Expand All @@ -287,7 +250,7 @@ func (ssp sequentialStoragePusher) Close() []error {
// parallelStoragePusher receives WriteRequest which are then pushed to the storage in parallel.
// The parallelism is two-tiered which means that we first parallelize by tenantID and then by series.
type parallelStoragePusher struct {
metrics *pusherConsumerMetrics
metrics *storagePusherMetrics
logger log.Logger

// pushers is map["$tenant|$source"]*parallelStorageShards
Expand All @@ -298,7 +261,7 @@ type parallelStoragePusher struct {
}

// newParallelStoragePusher creates a new parallelStoragePusher instance.
func newParallelStoragePusher(metrics *pusherConsumerMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher {
func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher {
return &parallelStoragePusher{
logger: log.With(logger, "component", "parallel-storage-pusher"),
pushers: make(map[string]*parallelStorageShards),
Expand Down Expand Up @@ -340,7 +303,7 @@ func (c parallelStoragePusher) shardsFor(userID string, requestSource mimirpb.Wr
}
// Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes.
hashLabels := labels.Labels.Hash
p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels)
p := newParallelStorageShards(c.metrics, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels)
c.pushers[userID+"|"+requestSource.String()] = p
return p
}
Expand All @@ -350,7 +313,7 @@ type labelsHashFunc func(labels.Labels) uint64
// parallelStorageShards is a collection of shards that are used to parallelize the writes to the storage by series.
// Each series is hashed to a shard that contains its own batchingQueue.
type parallelStorageShards struct {
numTimeSeriesPerFlush prometheus.Histogram
metrics *storagePusherMetrics

pusher Pusher
hashLabels labelsHashFunc
Expand All @@ -364,20 +327,22 @@ type parallelStorageShards struct {
}

type flushableWriteRequest struct {
// startedAt is the time when the first item was added to this request (timeseries or metadata).
startedAt time.Time
*mimirpb.WriteRequest
context.Context
}

// newParallelStorageShards creates a new parallelStorageShards instance.
func newParallelStorageShards(numTimeSeriesPerFlush prometheus.Histogram, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards {
func newParallelStorageShards(metrics *storagePusherMetrics, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards {
p := &parallelStorageShards{
numShards: numShards,
pusher: pusher,
hashLabels: hashLabels,
capacity: capacity,
numTimeSeriesPerFlush: numTimeSeriesPerFlush,
batchSize: batchSize,
wg: &sync.WaitGroup{},
numShards: numShards,
pusher: pusher,
hashLabels: hashLabels,
capacity: capacity,
metrics: metrics,
batchSize: batchSize,
wg: &sync.WaitGroup{},
}

p.start()
Expand All @@ -398,7 +363,6 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *
mimirpb.FromLabelAdaptersOverwriteLabels(&builder, ts.Labels, &nonCopiedLabels)
shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards)

// TODO: Add metrics to measure how long are items sitting in the queue before they are flushed.
if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
Expand Down Expand Up @@ -450,7 +414,7 @@ func (p *parallelStorageShards) start() {
p.wg.Add(p.numShards)

for i := range shards {
shards[i] = newBatchingQueue(p.capacity, p.batchSize)
shards[i] = newBatchingQueue(p.capacity, p.batchSize, p.metrics.batchingQueueMetrics)
go p.run(shards[i])
}

Expand All @@ -463,17 +427,38 @@ func (p *parallelStorageShards) run(queue *batchingQueue) {
defer queue.Done()

for wr := range queue.Channel() {
p.numTimeSeriesPerFlush.Observe(float64(len(wr.WriteRequest.Timeseries)))
p.metrics.batchAge.Observe(time.Since(wr.startedAt).Seconds())
p.metrics.timeSeriesPerFlush.Observe(float64(len(wr.WriteRequest.Timeseries)))
processingStart := time.Now()

err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest)

p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds())
if err != nil {
queue.ErrorChannel() <- err
}
}
}

func requestContents(request *mimirpb.WriteRequest) string {
switch {
case len(request.Timeseries) > 0 && len(request.Metadata) > 0:
return "timeseries_and_metadata"
case len(request.Timeseries) > 0:
return "timeseries"
case len(request.Metadata) > 0:
return "metadata"
default:
// This would be a bug, but at least we'd know.
return "empty"
}
}

// batchingQueue is a queue that batches the incoming time series according to the batch size.
// Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method.
type batchingQueue struct {
metrics *batchingQueueMetrics

ch chan flushableWriteRequest
errCh chan error
done chan struct{}
Expand All @@ -483,8 +468,9 @@ type batchingQueue struct {
}

// newBatchingQueue creates a new batchingQueue instance.
func newBatchingQueue(capacity int, batchSize int) *batchingQueue {
func newBatchingQueue(capacity int, batchSize int, metrics *batchingQueueMetrics) *batchingQueue {
return &batchingQueue{
metrics: metrics,
ch: make(chan flushableWriteRequest, capacity),
errCh: make(chan error, capacity+1), // We check errs before pushing to the channel, so we need to have a buffer of at least capacity+1 so that the consumer can push all of its errors and not rely on the producer to unblock it.
done: make(chan struct{}),
Expand All @@ -496,6 +482,9 @@ func newBatchingQueue(capacity int, batchSize int) *batchingQueue {
// AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel().
// If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed.
func (q *batchingQueue) AddToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, ts mimirpb.PreallocTimeseries) error {
if q.currentBatch.startedAt.IsZero() {
q.currentBatch.startedAt = time.Now()
}
q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts)
q.currentBatch.Context = ctx
q.currentBatch.Source = source
Expand All @@ -505,6 +494,9 @@ func (q *batchingQueue) AddToBatch(ctx context.Context, source mimirpb.WriteRequ

// AddMetadataToBatch adds metadata to the current batch.
func (q *batchingQueue) AddMetadataToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, metadata *mimirpb.MetricMetadata) error {
if q.currentBatch.startedAt.IsZero() {
q.currentBatch.startedAt = time.Now()
}
q.currentBatch.Metadata = append(q.currentBatch.Metadata, metadata)
q.currentBatch.Context = ctx
q.currentBatch.Source = source
Expand Down Expand Up @@ -558,6 +550,9 @@ func (q *batchingQueue) pushIfFull() error {
func (q *batchingQueue) push() error {
errs := q.collectErrors()

q.metrics.flushErrorsTotal.Add(float64(len(errs)))
q.metrics.flushTotal.Inc()

q.ch <- q.currentBatch
q.resetCurrentBatch()

Expand Down
Loading

0 comments on commit c2ce628

Please sign in to comment.