Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Kafka source tracing #1211

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/apache/camel-k v0.0.0-20200430164844-778ae8a2bd63
github.com/aws/aws-sdk-go v1.30.16
github.com/cloudevents/sdk-go v1.2.0
github.com/cloudevents/sdk-go/v2 v2.0.0-RC2
github.com/cloudevents/sdk-go/v2 v2.0.0-RC3
github.com/davecgh/go-spew v1.1.1
github.com/flimzy/diff v0.1.7 // indirect
github.com/go-kivik/couchdb/v3 v3.0.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ github.com/cloudevents/sdk-go v1.2.0 h1:2AxI14EJUw1PclJ5gZJtzbxnHIfNMdi76Qq3P3G1
github.com/cloudevents/sdk-go v1.2.0/go.mod h1:ss+jWJ88wypiewnPEzChSBzTYXGpdcILoN9YHk8uhTQ=
github.com/cloudevents/sdk-go/v2 v2.0.0-RC2 h1:XXqj/WXjOWhxUR8/+Ovn5YtSuIE83uOD6Gy3vUnBdUQ=
github.com/cloudevents/sdk-go/v2 v2.0.0-RC2/go.mod h1:f6d2RzSysHwhr4EsysDapUIWyJOFKqIhDisATXEa6Wk=
github.com/cloudevents/sdk-go/v2 v2.0.0-RC3 h1:bb9+ORnbe3NtwDGyNZNDUXFF1/OjQtyLgVpcyDIsRh8=
github.com/cloudevents/sdk-go/v2 v2.0.0-RC3/go.mod h1:f6d2RzSysHwhr4EsysDapUIWyJOFKqIhDisATXEa6Wk=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko=
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
Expand Down
46 changes: 46 additions & 0 deletions kafka/source/config/config-observability.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,49 @@ data:
# flag to "true" could cause extra Stackdriver charge.
# If metrics.backend-destination is not Stackdriver, this is ignored.
metrics.allow-stackdriver-custom-metrics: "false"

---

apiVersion: v1
kind: ConfigMap
metadata:
name: config-tracing
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
knative.dev/config-propagation: original
knative.dev/config-category: eventing
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################
# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
#
# This may be "zipkin" or "stackdriver", the default is "none"
backend: "none"

# URL to zipkin collector where traces are sent.
# This must be specified when backend is "zipkin"
zipkin-endpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans"

# The GCP project into which stackdriver metrics will be written
# when backend is "stackdriver". If unspecified, the project-id
# is read from GCP metadata when running on GCP.
stackdriver-project-id: "my-project"

# Enable zipkin debug mode. This allows all spans to be sent to the server
# bypassing sampling.
debug: "false"

# Percentage (0-1) of requests to trace
sample-rate: "0.1"
6 changes: 5 additions & 1 deletion kafka/source/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"strings"

"go.opencensus.io/trace"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/source"
Expand Down Expand Up @@ -116,12 +117,15 @@ func (a *Adapter) Start(stopCh <-chan struct{}) error {
// --------------------------------------------------------------------

func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool, error) {
ctx, span := trace.StartSpan(ctx, "kafka-source")
defer span.End()

req, err := a.httpMessageSender.NewCloudEventRequest(ctx)
if err != nil {
return false, err
}

err = a.ConsumerMessageToHttpRequest(ctx, msg, req, a.logger)
err = a.ConsumerMessageToHttpRequest(ctx, span, msg, req, a.logger)
if err != nil {
return true, err
}
Expand Down
132 changes: 81 additions & 51 deletions kafka/source/pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,18 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/v2/types"
"go.uber.org/zap"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/source"

"go.uber.org/zap"

"github.com/Shopify/sarama"

"knative.dev/eventing/pkg/kncloudevents"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
)

func TestPostMessage_ServeHTTP(t *testing.T) {
func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
aTimestamp := time.Now()

testCases := map[string]struct {
Expand All @@ -60,12 +58,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -81,12 +80,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "-16771305",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "-16771305",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -103,12 +103,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "0.00000000000000000000000000000000000002536316309005082",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "0.00000000000000000000000000000000000002536316309005082",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -125,12 +126,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "AQoXFw==",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "AQoXFw==",
},
expectedBody: `{"key":"value"}`,
error: false,
Expand All @@ -155,6 +157,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
Expand Down Expand Up @@ -186,6 +189,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
Expand Down Expand Up @@ -226,24 +230,20 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
},
Timestamp: aTimestamp,
},
// Because we need to write the distributed tracing extension
expectedHeaders: map[string]string{
"content-type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-id": "A234-1234-1234",
"ce-time": "2018-04-05T17:31:00Z",
"ce-type": "com.github.pull.create",
"ce-subject": "123",
"ce-source": "https://github.com/cloudevents/spec/pull",
"ce-comexampleextension1": "value",
"ce-comexampleothervalue": "5",
"content-type": "application/json",
},
expectedBody: string(mustJsonMarshal(t, map[string]interface{}{
"specversion": "1.0",
"type": "com.github.pull.create",
"source": "https://github.com/cloudevents/spec/pull",
"subject": "123",
"id": "A234-1234-1234",
"time": "2018-04-05T17:31:00Z",
"comexampleextension1": "value",
"comexampleothervalue": 5,
"datacontenttype": "application/json",
"data": map[string]string{
"hello": "Francesco",
},
})),
error: false,
expectedBody: `{"hello":"Francesco"}`,
error: false,
},
"accepted_binary": {
sink: sinkAccepted,
Expand Down Expand Up @@ -301,12 +301,13 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
Timestamp: aTimestamp,
},
expectedHeaders: map[string]string{
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
expectedBody: `{"key":"value"}`,
error: true,
Expand All @@ -323,6 +324,15 @@ func TestPostMessage_ServeHTTP(t *testing.T) {

statsReporter, _ := source.NewStatsReporter()

// If you wanna test tracing using a local zipkin server, uncomment this
//tracing.SetupStaticPublishing(zap.L().Sugar(), "localhost", &tracingconfig.Config{
// Backend: tracingconfig.Zipkin,
// Debug: true,
// SampleRate: 1.0,
// ZipkinEndpoint: "http://localhost:9411/api/v2/spans",
//})
//defer time.Sleep(1 * time.Second)

s, err := kncloudevents.NewHttpMessageSender(nil, sinkServer.URL)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -350,12 +360,32 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
t.Errorf("expected error, but got %v", err)
}

// Remove headers we aren't interested to test
h.header.Del("user-agent")
h.header.Del("accept-encoding")
h.header.Del("content-length")

// Check headers
for k, expected := range tc.expectedHeaders {
actual := h.header.Get(k)
if actual != expected {
t.Errorf("Expected header with key %s: '%q', but got '%q'", k, expected, actual)
}
h.header.Del(k)
}

// Check tracing headers
if h.header.Get("traceparent") == "" {
t.Errorf("Expected traceparent header")
}
h.header.Del("traceparent")
if h.header.Get("ce-traceparent") == "" {
t.Errorf("Expected ce-traceparent header")
}
h.header.Del("ce-traceparent")

if len(h.header) != 0 {
t.Errorf("Unexpected headers: %v", h.header)
}

// Check body
Expand Down
11 changes: 8 additions & 3 deletions kafka/source/pkg/adapter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import (
"github.com/Shopify/sarama"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/extensions"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/protocol/kafka_sarama"
"go.opencensus.io/trace"
"go.uber.org/zap"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
)

func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error {
func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.Span, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error {
msg := kafka_sarama.NewMessageFromConsumerMessage(cm)

defer func() {
Expand All @@ -44,9 +46,12 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, cm *sarama.C
}
}()

// Build tracing ext to write it as output
tracingExt := extensions.FromSpanContext(span.SpanContext())

if msg.ReadEncoding() != binding.EncodingUnknown {
// Message is a CloudEvent -> Encode directly to HTTP
return http.WriteRequest(ctx, msg, req)
return http.WriteRequest(ctx, msg, req, tracingExt.WriteTransformer())
}

// Message is not a CloudEvent -> We need to translate it to a valid CloudEvent
Expand All @@ -67,7 +72,7 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, cm *sarama.C
return err
}

return http.WriteRequest(ctx, binding.ToMessage(&event), req)
return http.WriteRequest(ctx, binding.ToMessage(&event), req, tracingExt.WriteTransformer())
}

func makeEventId(partition int32, offset int64) string {
Expand Down
11 changes: 5 additions & 6 deletions kafka/source/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (
"os"

"k8s.io/client-go/tools/cache"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/source"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/resolver"

"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
Expand All @@ -58,6 +58,7 @@ func NewController(
deploymentLister: deploymentInformer.Lister(),
receiveAdapterImage: raImage,
loggingContext: ctx,
configs: source.StartWatchingSourceConfigurations(ctx, component, cmw),
}

impl := kafkasource.NewImpl(ctx, c)
Expand All @@ -72,7 +73,5 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

cmw.Watch(logging.ConfigMapName(), c.UpdateFromLoggingConfigMap)
cmw.Watch(metrics.ConfigMapName(), c.UpdateFromMetricsConfigMap)
return impl
}
Loading