-
Notifications
You must be signed in to change notification settings - Fork 406
/
publisher.go
66 lines (55 loc) · 1.77 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package metrics
import (
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/prometheus/client_golang/prometheus"
)
var (
publisherLabelKeys = []string{
labelKeyHandlerName,
labelKeyPublisherName,
labelSuccess,
}
)
// PublisherPrometheusMetricsDecorator decorates a publisher to capture Prometheus metrics.
type PublisherPrometheusMetricsDecorator struct {
pub message.Publisher
publisherName string
publishTimeSeconds *prometheus.HistogramVec
}
// Publish updates the relevant publisher metrics and calls the wrapped publisher's Publish.
func (m PublisherPrometheusMetricsDecorator) Publish(topic string, messages ...*message.Message) (err error) {
if len(messages) == 0 {
return m.pub.Publish(topic)
}
// TODO: take ctx not only from first msg. Might require changing the signature of Publish, which is planned anyway.
ctx := messages[0].Context()
labels := labelsFromCtx(ctx, publisherLabelKeys...)
if labels[labelKeyPublisherName] == "" {
labels[labelKeyPublisherName] = m.publisherName
}
if labels[labelKeyHandlerName] == "" {
labels[labelKeyHandlerName] = labelValueNoHandler
}
start := time.Now()
defer func() {
if publishAlreadyObserved(ctx) {
// decorator idempotency when applied decorator multiple times
return
}
if err != nil {
labels[labelSuccess] = "false"
} else {
labels[labelSuccess] = "true"
}
m.publishTimeSeconds.With(labels).Observe(time.Since(start).Seconds())
}()
for _, msg := range messages {
msg.SetContext(setPublishObservedToCtx(msg.Context()))
}
return m.pub.Publish(topic, messages...)
}
// Close decreases the total publisher count, closes the Prometheus HTTP server and calls wrapped Close.
func (m PublisherPrometheusMetricsDecorator) Close() error {
return m.pub.Close()
}