-
Notifications
You must be signed in to change notification settings - Fork 1
/
publisher.go
76 lines (58 loc) · 1.75 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package ocwatermill
import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/sagikazarmark/ocwatermill/internal"
)
type PublisherDecorator struct {
pub message.Publisher
publisherName string
}
func (p *PublisherDecorator) Publish(topic string, messages ...*message.Message) (err error) {
if len(messages) == 0 {
return p.pub.Publish(topic)
}
// TODO: take ctx not only from first msg. Might require changing the signature of Publish, which is planned anyway.
ctx := messages[0].Context()
publisherName := message.PublisherNameFromCtx(ctx)
if publisherName == "" {
publisherName = p.publisherName
}
handlerName := message.HandlerNameFromCtx(ctx)
if handlerName == "" {
handlerName = tagValueNoHandler
}
tags := []tag.Mutator{
tag.Upsert(PublisherName, publisherName),
tag.Upsert(HandlerName, handlerName),
}
start := time.Now()
defer func() {
if publishAlreadyObserved(ctx) {
// decorator idempotency when applied decorator multiple times
return
}
if err != nil {
tags = append(tags, tag.Upsert(Success, "false"))
} else {
tags = append(tags, tag.Upsert(Success, "true"))
}
_ = stats.RecordWithTags(ctx, tags, PublisherPublishTime.M(float64(time.Since(start))/float64(time.Millisecond)))
}()
for _, msg := range messages {
msg.SetContext(setPublishObservedToCtx(msg.Context()))
}
return p.pub.Publish(topic, messages...)
}
func (p *PublisherDecorator) Close() error {
return p.pub.Close()
}
// DecoratePublisher decorates a publisher with instrumentation.
func DecoratePublisher(pub message.Publisher) (message.Publisher, error) {
return &PublisherDecorator{
pub: pub,
publisherName: internal.StructName(pub),
}, nil
}