Skip to content

Commit

Permalink
Separate batch processing time by batch contents
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Sep 20, 2024
1 parent 1cacb2f commit ec286e3
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type pusherConsumer struct {
type pusherConsumerMetrics struct {
numTimeSeriesPerFlush prometheus.Histogram
ingestionShardBatchAge prometheus.Histogram
batchProcessingTimeSeconds prometheus.Histogram
batchProcessingTimeSeconds *prometheus.HistogramVec
processingTimeSeconds prometheus.Observer
clientErrRequests prometheus.Counter
serverErrRequests prometheus.Counter
Expand Down Expand Up @@ -89,11 +89,11 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics
Help: "Age of the batch when it is being ingested by an ingestion shard. This is the time since adding the first sample to the batch. A high value indicated that the batching queue is not processing fast enough or that the batches are not filling up fast enough.",
NativeHistogramBucketFactor: 1.1,
}),
batchProcessingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
batchProcessingTimeSeconds: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_ingestion_shard_batch_processing_time_seconds",
Help: "Time to process a batch of samples in an ingestion shard.",
NativeHistogramBucketFactor: 1.1,
}),
}, []string{"batch_contents"}),

clientErrRequests: errRequestsCounter.WithLabelValues("client"),
serverErrRequests: errRequestsCounter.WithLabelValues("server"),
Expand Down Expand Up @@ -483,13 +483,27 @@ func (p *parallelStorageShards) run(queue *batchingQueue) {

err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest)

p.metrics.batchProcessingTimeSeconds.Observe(time.Since(processingStart).Seconds())
p.metrics.batchProcessingTimeSeconds.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds())
if err != nil {
queue.ErrorChannel() <- err
}
}
}

func requestContents(request *mimirpb.WriteRequest) string {
switch {
case len(request.Timeseries) > 0 && len(request.Metadata) > 0:
return "timeseries_and_metadata"
case len(request.Timeseries) > 0:
return "timeseries"
case len(request.Metadata) > 0:
return "metadata"
default:
// This would be a bug, but at least we'd know.
return "empty"
}
}

// batchingQueue is a queue that batches the incoming time series according to the batch size.
// Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method.
type batchingQueue struct {
Expand Down

0 comments on commit ec286e3

Please sign in to comment.