Skip to content

Commit

Permalink
Add subscribed topics gauge metric (#339)
Browse files Browse the repository at this point in the history
* Add subscribe/unsubscribe gauge metric

* Emit unsubscribe in defer
  • Loading branch information
Steven Normore authored Jan 23, 2024
1 parent a15ebd3 commit becf56a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
15 changes: 11 additions & 4 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

metrics.EmitSubscribeTopicsLength(stream.Context(), log, len(req.ContentTopics))
metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

var streamLock sync.Mutex
for _, topic := range req.ContentTopics {
Expand Down Expand Up @@ -211,17 +211,20 @@ func (s *Service) Subscribe(req *proto.SubscribeRequest, stream proto.MessageApi
}
defer func() {
_ = sub.Unsubscribe()
metrics.EmitUnsubscribeTopics(stream.Context(), log, 1)
}()
}

select {
case <-stream.Context().Done():
log.Debug("stream closed")
return nil
break
case <-s.ctx.Done():
log.Info("service closed")
return nil
break
}

return nil
}

func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
Expand Down Expand Up @@ -264,6 +267,7 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
for _, sub := range subs {
_ = sub.Unsubscribe()
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, len(subs))
}()
var streamLock sync.Mutex
for {
Expand All @@ -280,7 +284,7 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}
log.Info("updating subscription", zap.Int("num_content_topics", len(req.ContentTopics)))

metrics.EmitSubscribeTopicsLength(stream.Context(), log, len(req.ContentTopics))
metrics.EmitSubscribeTopics(stream.Context(), log, len(req.ContentTopics))

topics := map[string]bool{}
for _, topic := range req.ContentTopics {
Expand Down Expand Up @@ -313,13 +317,16 @@ func (s *Service) Subscribe2(stream proto.MessageApi_Subscribe2Server) error {
}

// If subscription not in topic, then unsubscribe.
var count int
for topic, sub := range subs {
if topics[topic] {
continue
}
_ = sub.Unsubscribe()
delete(subs, topic)
count++
}
metrics.EmitUnsubscribeTopics(stream.Context(), log, count)
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/metrics/api-subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package metrics

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var subscribedTopics = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "xmtp_subscribed_topics",
Help: "Number of subscribed topics",
},
appClientVersionTagKeys,
)

var subscribeTopicsLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtp_subscribe_topics_length",
Help: "Number of subscribed topics per request",
Buckets: []float64{1, 2, 4, 8, 16, 50, 100, 10000, 100000},
},
appClientVersionTagKeys,
)

func EmitSubscribeTopics(ctx context.Context, log *zap.Logger, topics int) {
labels := contextLabels(ctx)
subscribeTopicsLength.With(labels).Observe(float64(topics))
subscribedTopics.With(labels).Add(float64(topics))
}

func EmitUnsubscribeTopics(ctx context.Context, log *zap.Logger, topics int) {
labels := contextLabels(ctx)
subscribedTopics.With(labels).Add(-float64(topics))
}
14 changes: 0 additions & 14 deletions pkg/metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@ func EmitAPIRequest(ctx context.Context, log *zap.Logger, fields []zapcore.Field
apiRequestsDuration.With(labels).Observe(float64(duration / time.Millisecond))
}

var subscribeTopicsLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtp_subscribe_topics_length",
Help: "Number of subscribed topics per request",
Buckets: []float64{1, 2, 4, 8, 16, 50, 100, 10000, 100000},
},
appClientVersionTagKeys,
)

func EmitSubscribeTopicsLength(ctx context.Context, log *zap.Logger, topics int) {
labels := contextLabels(ctx)
subscribeTopicsLength.With(labels).Observe(float64(topics))
}

var publishedEnvelopeSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtp_published_envelope",
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func registerCollectors(reg prometheus.Registerer) {
apiRequests,
apiRequestsDuration,
subscribeTopicsLength,
subscribedTopics,
publishedEnvelopeSize,
publishedEnvelopeCount,
queryDuration,
Expand Down

0 comments on commit becf56a

Please sign in to comment.