Skip to content

Commit

Permalink
Broker tracing conventions (#2983)
Browse files Browse the repository at this point in the history
* Add tracing conventions to broker spec

* Add broker ingress spans with messaging span attributes

* Add trigger span and messaging span attributes

* Add test coverage of attributes.go

It is possible to directly test the attribute helpers since attribute fields are unexported.

* Format docs/spec/broker.md

Co-authored-by: Matt Moore <[email protected]>

* Format Markdown

Co-authored-by: Matt Moore <[email protected]>

Co-authored-by: Matt Moore <[email protected]>
  • Loading branch information
ian-mi and mattmoor authored May 13, 2020
1 parent 09a6267 commit 1689191
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 23 deletions.
14 changes: 13 additions & 1 deletion docs/spec/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,19 @@ Upon receiving an event with context attributes defined in the
[CloudEvents Distributed Tracing extension](https://github.com/cloudevents/spec/blob/master/extensions/distributed-tracing.md),
the Broker SHOULD preserve that trace header on delivery to subscribers and on
reply events, unless the reply is sent with a different set of tracing
attributes.
attributes. Forwarded trace headers SHOULD be updated with any intermediate
spans emitted by the broker.

Spans emitted by the Broker SHOULD follow the
[OpenTelemetry Semantic Conventions for Messaging Systems](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md)
whenever possible. In particular, spans emitted by the Broker SHOULD set the
following attributes:

- messaging.system: "knative"
- messaging.destination: broker:name.namespace or trigger:name.namespace with
the Broker or Trigger to which the event is being routed
- messaging.protocol: the name of the underlying transport protocol
- messaging.message_id: the event ID

## Conformance Tests

Expand Down
14 changes: 14 additions & 0 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go"
"go.opencensus.io/trace"
"go.uber.org/zap"

eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
Expand All @@ -33,6 +34,7 @@ import (
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler/trigger/path"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -184,6 +186,18 @@ func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
return errors.New("unable to parse path as a Trigger")
}

ctx, span := trace.StartSpan(ctx, tracing.TriggerMessagingDestination(triggerRef.NamespacedName))
defer span.End()

if span.IsRecordingEvents() {
span.AddAttributes(
tracing.MessagingSystemAttribute,
tracing.MessagingProtocolHTTP,
tracing.TriggerMessagingDestinationAttribute(triggerRef.NamespacedName),
tracing.MessagingMessageIDAttribute(event.ID()),
)
}

// Remove the TTL attribute that is used by the Broker.
ttl, err := broker.GetTTL(event.Context)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"

"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -75,6 +78,22 @@ func (h *Handler) Start(ctx context.Context) error {
}

func (h *Handler) receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
brokerName := types.NamespacedName{
Name: h.BrokerName,
Namespace: h.Namespace,
}
ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerName))
defer span.End()

if span.IsRecordingEvents() {
span.AddAttributes(
tracing.MessagingSystemAttribute,
tracing.MessagingProtocolHTTP,
tracing.BrokerMessagingDestinationAttribute(brokerName),
tracing.MessagingMessageIDAttribute(event.ID()),
)
}

// Setting the extension as a string as the CloudEvents sdk does not support non-string extensions.
event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()})
tctx := cloudevents.HTTPTransportContextFrom(ctx)
Expand Down
14 changes: 14 additions & 0 deletions pkg/mtbroker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go"
"go.opencensus.io/trace"
"go.uber.org/zap"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/broker"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler/trigger/path"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -184,6 +186,18 @@ func (r *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
return errors.New("unable to parse path as a Trigger")
}

ctx, span := trace.StartSpan(ctx, tracing.TriggerMessagingDestination(triggerRef.NamespacedName))
defer span.End()

if span.IsRecordingEvents() {
span.AddAttributes(
tracing.MessagingSystemAttribute,
tracing.MessagingProtocolHTTP,
tracing.TriggerMessagingDestinationAttribute(triggerRef.NamespacedName),
tracing.MessagingMessageIDAttribute(event.ID()),
)
}

// Remove the TTL attribute that is used by the Broker.
ttl, err := broker.GetTTL(event.Context)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/mtbroker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -95,6 +98,22 @@ func (h *Handler) receive(ctx context.Context, event cloudevents.Event, resp *cl
brokerNamespace := pieces[1]
brokerName := pieces[2]

brokerNamespacedName := types.NamespacedName{
Name: brokerName,
Namespace: brokerNamespace,
}
ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerNamespacedName))
defer span.End()

if span.IsRecordingEvents() {
span.AddAttributes(
tracing.MessagingSystemAttribute,
tracing.MessagingProtocolHTTP,
tracing.BrokerMessagingDestinationAttribute(brokerNamespacedName),
tracing.MessagingMessageIDAttribute(event.ID()),
)
}

reporterArgs := &ReportArgs{
ns: brokerNamespace,
broker: brokerName,
Expand Down
59 changes: 59 additions & 0 deletions pkg/tracing/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"fmt"

"go.opencensus.io/trace"
"k8s.io/apimachinery/pkg/types"
)

const (
MessagingSystemAttributeName = "messaging.system"
MessagingDestinationAttributeName = "messaging.destination"
MessagingProtocolAttributeName = "messaging.protocol"
MessagingMessageIDAttributeName = "messaging.message_id"
)

var (
MessagingSystemAttribute trace.Attribute = trace.StringAttribute(MessagingSystemAttributeName, "knative")
MessagingProtocolHTTP trace.Attribute = MessagingProtocolAttribute("HTTP")
)

func MessagingProtocolAttribute(protocol string) trace.Attribute {
return trace.StringAttribute(MessagingProtocolAttributeName, protocol)
}

func MessagingMessageIDAttribute(ID string) trace.Attribute {
return trace.StringAttribute(MessagingMessageIDAttributeName, ID)
}

func BrokerMessagingDestination(b types.NamespacedName) string {
return fmt.Sprintf("broker:%s.%s", b.Name, b.Namespace)
}
func BrokerMessagingDestinationAttribute(b types.NamespacedName) trace.Attribute {
return trace.StringAttribute(MessagingDestinationAttributeName, BrokerMessagingDestination(b))
}

func TriggerMessagingDestination(t types.NamespacedName) string {
return fmt.Sprintf("trigger:%s.%s", t.Name, t.Namespace)
}

func TriggerMessagingDestinationAttribute(t types.NamespacedName) trace.Attribute {
return trace.StringAttribute(MessagingDestinationAttributeName, TriggerMessagingDestination(t))
}
45 changes: 45 additions & 0 deletions pkg/tracing/attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"testing"

"k8s.io/apimachinery/pkg/types"
)

func TestBrokerMessagingDestination(t *testing.T) {
got := BrokerMessagingDestination(types.NamespacedName{
Namespace: "brokerns",
Name: "brokername",
})
want := "broker:brokername.brokerns"
if want != got {
t.Errorf("unexpected messaging destination: want %q, got %q", want, got)
}
}

func TestTriggerMessagingDestination(t *testing.T) {
got := TriggerMessagingDestination(types.NamespacedName{
Namespace: "triggerns",
Name: "triggername",
})
want := "trigger:triggername.triggerns"
if want != got {
t.Errorf("unexpected messaging destination: want %q, got %q", want, got)
}
}
61 changes: 39 additions & 22 deletions test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/eventing/v1beta1"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
Expand Down Expand Up @@ -102,7 +103,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
client.CreatePodOrFail(logPod, lib.WithService(loggerPodName))

// Create a Trigger that receives events (type=bar) and sends them to the logger Pod.
client.CreateTriggerOrFail(
loggerTrigger := client.CreateTriggerOrFail(
"logger",
resources.WithBroker(broker.Name),
resources.WithAttributesTriggerFilter(v1alpha1.TriggerAnyFilter, etLogger, map[string]interface{}{}),
Expand All @@ -119,7 +120,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
client.CreatePodOrFail(eventTransformerPod, lib.WithService(eventTransformerPod.Name))

// Create a Trigger that receives events (type=foo) and sends them to the transformer Pod.
client.CreateTriggerOrFail(
transformerTrigger := client.CreateTriggerOrFail(
"transformer",
resources.WithBroker(broker.Name),
resources.WithAttributesTriggerFilter(v1alpha1.TriggerAnyFilter, etTransformer, map[string]interface{}{}),
Expand Down Expand Up @@ -162,13 +163,9 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
loggerSVCHost := k8sServiceHost(domain, client.Namespace, loggerPodName)
transformerSVCHost := k8sServiceHost(domain, client.Namespace, eventTransformerPod.Name)

// Steps 7-10: Event from TrChannel sent to transformer Trigger and its reply to the InChannel.
transformerEventSentFromTrChannelToTransformer := tracinghelper.TestSpanTree{
Note: "3. Broker Filter for the 'transformer' trigger sends the event to the transformer pod.",
Span: tracinghelper.MatchHTTPSpanWithReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(transformerSVCHost, "/"),
),
Span: triggerSpan(transformerTrigger, eventID),
Children: []tracinghelper.TestSpanTree{
{
Note: "4. Transformer pod receives the event from the Broker Filter for the 'transformer' trigger.",
Expand All @@ -181,23 +178,23 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
},
}

// Step 11-20. Directly steps 11-12. Steps 13-20 are children.
// Steps 11-12 Reply from the 'transformer' is sent by the Broker TrChannel to the Broker
// Ingress.
transformerEventResponseFromTrChannel := tracinghelper.TestSpanTree{
Note: "5. Broker Filter for the 'logger' trigger sends the event to the logger pod.",
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(loggerSVCHost, "/"),
),
Note: "5. Broker ingress for reply from the 'transformer'",
Span: ingressSpan(broker, eventID),
Children: []tracinghelper.TestSpanTree{
{
Note: "6. Logger pod receives the event from the Broker Filter for the 'logger' trigger.",
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(loggerSVCHost, "/"),
tracinghelper.WithLocalEndpointServiceName(loggerPodName),
),
Note: "6. Broker Filter for the 'logger' trigger sends the event to the logger pod.",
Span: triggerSpan(loggerTrigger, eventID),
Children: []tracinghelper.TestSpanTree{
{
Note: "7. Logger pod receives the event from the Broker Filter for the 'logger' trigger.",
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(loggerSVCHost, "/"),
tracinghelper.WithLocalEndpointServiceName(loggerPodName),
),
},
},
},
},
}
Expand All @@ -207,7 +204,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
// Ingress.
expected := tracinghelper.TestSpanTree{
Note: "2. Broker Ingress receives the event from the sending pod.",
Span: tracinghelper.MatchHTTPSpanNoReply(model.Server),
Span: ingressSpan(broker, eventID),
Children: []tracinghelper.TestSpanTree{
// Steps 7-10.
transformerEventSentFromTrChannelToTransformer,
Expand Down Expand Up @@ -240,3 +237,23 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
return expected, matchFunc
}
}

func ingressSpan(broker *v1beta1.Broker, eventID string) *tracinghelper.SpanMatcher {
return &tracinghelper.SpanMatcher{
Tags: map[string]string{
"messaging.system": "knative",
"messaging.destination": fmt.Sprintf("broker:%s.%s", broker.Name, broker.Namespace),
"messaging.message_id": eventID,
},
}
}

func triggerSpan(trigger *v1alpha1.Trigger, eventID string) *tracinghelper.SpanMatcher {
return &tracinghelper.SpanMatcher{
Tags: map[string]string{
"messaging.system": "knative",
"messaging.destination": fmt.Sprintf("trigger:%s.%s", trigger.Name, trigger.Namespace),
"messaging.message_id": eventID,
},
}
}

0 comments on commit 1689191

Please sign in to comment.