-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): add open telemetry trace support #5034
Conversation
// Calculate the size of the encoded proto message by accounting | ||
// for the length of an individual PubSubMessage and Data/Attributes field. | ||
msgSize := proto.Size(&pb.PubsubMessage{ | ||
Data: msg.Data, | ||
Attributes: msg.Attributes, | ||
OrderingKey: msg.OrderingKey, | ||
}) | ||
span.SetAttributes(semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this already set in getSpanAttributes
on line 532, above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand: After injecting the span into the message attributes, the size of the message needs to be updated.
@@ -697,6 +728,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) | |||
ipubsub.SetPublishResult(bm.res, "", err) | |||
} else { | |||
ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) | |||
bm.span.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the "publish RPC" span also need to be updated with the message ID?
// If this call fails (e.g. because the service account doesn't have | ||
// the roles/viewer or roles/pubsub.viewer role) we will assume | ||
// EnableMessageOrdering to be true. | ||
// See: https://github.com/googleapis/google-cloud-go/issues/3884 | ||
func (s *Subscription) checkOrdering(ctx context.Context) { | ||
func (s *Subscription) checkSubConfig() { | ||
ctx := context.Background() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did this context switch to background?
cfg, err := s.Config(ctx) | ||
if err != nil { | ||
s.enableOrdering = true | ||
} else { | ||
s.enableOrdering = cfg.EnableMessageOrdering | ||
s.topicName = cfg.Topic.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not work when the gcp identity don't have roles/pubsub.viewer
, it should fallback to the subscription name in the err != nil
case, otherwise the span name will be " receive"
@@ -926,6 +941,9 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes | |||
defer wg.Wait() | |||
defer cancel2() | |||
for { | |||
opts := getSubSpanAttributes(s.topicName, &Message{}, semconv.MessagingOperationReceive) | |||
ctx2, rs := s.tracer.Start(ctx2, fmt.Sprintf("%s receive", s.topicName), opts...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this span can block for a long time if there is no message to be processed in the topic.
It can potentially cause the consumer span to be started before the message is published:
Process P: | topic send |
--
Process C: |---sub receive---|---sub process---|
where both sub receive
and sub process
are children of topic send
.
This is what it looks like in honeycomb (with no parent trace in the message)
PS: It also causes the span based metrics to be messed up.
semconv.MessagingDestinationKindTopic, | ||
semconv.MessagingMessageIDKey.String(msg.ID), | ||
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), | ||
attribute.String(orderingAttribute, msg.OrderingKey), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.OrderingKey
is empty on unordered, maybe there should be a default when setting it in attributes.
google-cloud-go/internal/pubsub/message.go
Lines 58 to 60 in dd8973b
// OrderingKey identifies related messages for which publish order should | |
// be respected. If empty string is used, message will be sent unordered. | |
OrderingKey string |
Hey, is there any expectation to when this will be supported? thanks |
Thanks for your patience. This was briefly paused to account for a different design decision for tracing. Initially, we were following the specifications on this otel messaging page on batch receiving which we think no longer makes sense for Pub/Sub users. We believe that it would be more useful for users to trace messages individually to better view the lifespan of a single message, which deviates slightly from otel semantic convention's approach of tracking multiple messages in a single trace. If you're interested in learning more, I'd be happy to share. With that said, we're currently prioritizing other features at the moment, including exactly-once delivery and BigQuery subscriptions, which should be out in the next month or so. Afterwards, resuming work on tracing will be the highest priority item. |
@hongalex , thanks for letting me know. For the mean while I took your propagator design and implemented it in my code. Will add it here so other can use it till a final solution exist
*** Note, this does not provide attributes that need to be added to the designated span. like |
Hi, what is the current status? |
Are there any alternative thread? Or it just died and nobody is looking into it? |
We are still tracking this here: #4665 |
This PR adds the foundation for tracing of a publisher and subscriber for Pub/Sub. This PR attempts to mimic existing messaging system tracing and follows the semantic conventions defined here. This is a draft and is subject to backwards incompatible changes.
Here's a sample that demonstrates using this otel enabled library w/ Google Cloud Tracing as the exporter. The output looks something like the following: