Skip to content

Commit

Permalink
Add gauge loki_ingest_storage_reader_phase
Browse files Browse the repository at this point in the history
This commit adds a new gauge loki_ingest_storage_reader_phase that
tracks the current phase of the reader such as "starting" or
"running".
  • Loading branch information
grobinson-grafana committed Oct 31, 2024
1 parent 002a921 commit 8953dd3
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func (p *Reader) start(ctx context.Context) error {
return errors.Wrap(err, "creating kafka reader client")
}

p.metrics.phase.WithLabelValues("starting").Set(1)
p.metrics.phase.WithLabelValues("running").Set(0)

// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
Expand Down Expand Up @@ -134,13 +137,18 @@ func (p *Reader) start(ctx context.Context) error {
}
}

<-time.After(time.Minute)

return nil
}

// run is the main loop of the PartitionReader. It continuously fetches and processes
// data from Kafka, and send it to the consumer.
func (p *Reader) run(ctx context.Context) error {
level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup)
p.metrics.phase.WithLabelValues("starting").Set(0)
p.metrics.phase.WithLabelValues("running").Set(1)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -513,6 +521,7 @@ func isErrFetch(fetch kgo.Fetch) bool {
}

type readerMetrics struct {
phase *prometheus.GaugeVec
receiveDelayWhenStarting prometheus.Observer
receiveDelayWhenRunning prometheus.Observer
recordsPerFetch prometheus.Histogram
Expand All @@ -538,6 +547,10 @@ func newReaderMetrics(reg prometheus.Registerer) readerMetrics {
}, []string{"phase"})

return readerMetrics{
phase: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_phase",
Help: "The current phase of the consumer.",
}, []string{"phase"}),
receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"),
receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"),
kprom: client.NewReaderClientMetrics("partition-reader", reg),
Expand Down

0 comments on commit 8953dd3

Please sign in to comment.