-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): add open telemetry trace support #5034
Changes from all commits
3e3eb4c
080170e
4465dce
f82fea8
44fc62a
d6e8727
101b7ed
9577495
d6a4df7
994739c
b323e77
6d4b7be
e99b1a7
3a9fab2
f3e7a0a
516663e
7bc99da
d0d24e2
db3bcad
e669b5c
17767de
0ffbdad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,11 @@ import ( | |
"cloud.google.com/go/internal/optional" | ||
"cloud.google.com/go/pubsub/internal/scheduler" | ||
gax "github.com/googleapis/gax-go/v2" | ||
"go.opentelemetry.io/otel" | ||
"go.opentelemetry.io/otel/attribute" | ||
otelcodes "go.opentelemetry.io/otel/codes" | ||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" | ||
"go.opentelemetry.io/otel/trace" | ||
"golang.org/x/sync/errgroup" | ||
pb "google.golang.org/genproto/googleapis/pubsub/v1" | ||
fmpb "google.golang.org/genproto/protobuf/field_mask" | ||
|
@@ -51,6 +56,10 @@ type Subscription struct { | |
receiveActive bool | ||
|
||
enableOrdering bool | ||
|
||
// topicName is for creating spans for OpenTelemetry tracing. | ||
topicName string | ||
tracer trace.Tracer | ||
} | ||
|
||
// Subscription creates a reference to a subscription. | ||
|
@@ -60,9 +69,15 @@ func (c *Client) Subscription(id string) *Subscription { | |
|
||
// SubscriptionInProject creates a reference to a subscription in a given project. | ||
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { | ||
return newSubscription(c, fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id)) | ||
} | ||
|
||
func newSubscription(c *Client, name string) *Subscription { | ||
return &Subscription{ | ||
c: c, | ||
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), | ||
c: c, | ||
name: name, | ||
ReceiveSettings: DefaultReceiveSettings, | ||
tracer: otel.Tracer(defaultTracerName), | ||
} | ||
} | ||
|
||
|
@@ -112,7 +127,7 @@ func (subs *SubscriptionIterator) Next() (*Subscription, error) { | |
if err != nil { | ||
return nil, err | ||
} | ||
return &Subscription{c: subs.c, name: subName}, nil | ||
return newSubscription(subs.c, subName), nil | ||
} | ||
|
||
// NextConfig returns the next subscription config. If there are no more subscriptions, | ||
|
@@ -840,7 +855,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
s.mu.Unlock() | ||
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() | ||
|
||
s.checkOrdering(ctx) | ||
s.checkSubConfig() | ||
|
||
maxCount := s.ReceiveSettings.MaxOutstandingMessages | ||
if maxCount == 0 { | ||
|
@@ -926,6 +941,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
defer wg.Wait() | ||
defer cancel2() | ||
for { | ||
opts := getSubSpanAttributes(s.topicName, &Message{}, semconv.MessagingOperationReceive) | ||
ctx2, rs := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) | ||
|
||
var maxToPull int32 // maximum number of messages to pull | ||
if po.synchronous { | ||
if po.maxPrefetch < 0 { | ||
|
@@ -955,13 +973,17 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
return nil | ||
default: | ||
} | ||
|
||
msgs, err := iter.receive(maxToPull) | ||
if err == io.EOF { | ||
return nil | ||
} | ||
if err != nil { | ||
rs.RecordError(err) | ||
rs.SetStatus(otelcodes.Error, err.Error()) | ||
return err | ||
} | ||
rs.End() | ||
// If context is done and messages have been pulled, | ||
// nack them. | ||
select { | ||
|
@@ -972,8 +994,20 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
return nil | ||
default: | ||
} | ||
|
||
for i, msg := range msgs { | ||
msg := msg | ||
opts := getSubSpanAttributes(s.topicName, msg, semconv.MessagingOperationProcess) | ||
if msg.Attributes != nil && msg.Attributes["gogclient_traceparent"] != "" { | ||
lctx := otel.GetTextMapPropagator().Extract(ctx2, NewPubsubMessageCarrier(msg)) | ||
link := trace.LinkFromContext(lctx) | ||
opts = append(opts, trace.WithLinks(link)) | ||
} | ||
_, ps := s.tracer.Start(ctx2, fmt.Sprintf("%s process", s.topicName), opts...) | ||
ps.AddEvent("waiting for subscriber flow control") | ||
var fcTimer time.Time | ||
if ps.IsRecording() { | ||
fcTimer = time.Now() | ||
} | ||
// TODO(jba): call acquire closer to when the message is allocated. | ||
if err := fc.acquire(ctx, len(msg.Data)); err != nil { | ||
// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. | ||
|
@@ -983,11 +1017,18 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
// Return nil if the context is done, not err. | ||
return nil | ||
} | ||
ps.AddEvent("acquired subscriber flow control resources", trace.WithAttributes(attribute.Float64("elapsed_ms", float64(time.Since(fcTimer))/float64(time.Millisecond)))) | ||
ackh, _ := msgAckHandler(msg) | ||
old := ackh.doneFunc | ||
msgLen := len(msg.Data) | ||
ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) { | ||
defer fc.release(ctx, msgLen) | ||
defer ps.End() | ||
if ack { | ||
ps.SetAttributes(attribute.String("result", "ack")) | ||
} else { | ||
ps.SetAttributes(attribute.String("result", "nack")) | ||
} | ||
old(ackID, ack, receiveTime) | ||
} | ||
wg.Add(1) | ||
|
@@ -1000,9 +1041,13 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
// constructor level? | ||
if err := sched.Add(key, msg, func(msg interface{}) { | ||
defer wg.Done() | ||
rs.AddEvent("started handling provided callback") | ||
f(ctx2, msg.(*Message)) | ||
rs.AddEvent("finished handling provided callback") | ||
}); err != nil { | ||
wg.Done() | ||
ps.RecordError(err) | ||
ps.SetStatus(otelcodes.Error, err.Error()) | ||
return err | ||
} | ||
} | ||
|
@@ -1028,17 +1073,21 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |
return group.Wait() | ||
} | ||
|
||
// checkOrdering calls Config to check theEnableMessageOrdering field. | ||
// checkSubConfig calls Config to check the subscription config fields. | ||
// For ordering, we check the EnableMessageOrdering field. | ||
// For OpenTelemetry, we check the topic name. | ||
// If this call fails (e.g. because the service account doesn't have | ||
// the roles/viewer or roles/pubsub.viewer role) we will assume | ||
// EnableMessageOrdering to be true. | ||
// See: https://github.com/googleapis/google-cloud-go/issues/3884 | ||
func (s *Subscription) checkOrdering(ctx context.Context) { | ||
func (s *Subscription) checkSubConfig() { | ||
ctx := context.Background() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why did this context switch to background? |
||
cfg, err := s.Config(ctx) | ||
if err != nil { | ||
s.enableOrdering = true | ||
} else { | ||
s.enableOrdering = cfg.EnableMessageOrdering | ||
s.topicName = cfg.Topic.name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this does not work when the gcp identity don't have |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this span can block for a long time if there is no message to be processed in the topic.
It can potentially cause the consumer span to be started before the message is published:
where both
sub receive
andsub process
are children oftopic send
.This is what it looks like in honeycomb (with no parent trace in the message)
PS: It also causes the span based metrics to be messed up.