Skip to content

Commit

Permalink
[#216] Simplify constructing zipkin exporter (#412)
Browse files Browse the repository at this point in the history
- add defaultservicename to config test file

Signed-off-by: Hui Kang <[email protected]>
  • Loading branch information
huikang authored and Paulo Janotti committed Jan 15, 2020
1 parent 21a70d6 commit b04b16d
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 114 deletions.
2 changes: 2 additions & 0 deletions exporter/zipkinexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ type Config struct {
// http://some.url:9411/api/v2/spans).
URL string `mapstructure:"url"`
Format string `mapstructure:"format"`

DefaultServiceName string `mapstructure:"default_service_name"`
}
13 changes: 3 additions & 10 deletions exporter/zipkinexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,14 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Exporter) (exporter.TraceExporter, error) {
cfg := config.(*Config)
zc := config.(*Config)

if cfg.URL == "" {
if zc.URL == "" {
// TODO https://github.com/open-telemetry/opentelemetry-collector/issues/215
return nil, errors.New("exporter config requires a non-empty 'url'")
}
// <missing service name> is used if the zipkin span is not carrying the name of the service, which shouldn't happen
// in normal circumstances. It happens only due to (bad) conversions between formats. The current value is a
// clear indication that somehow the name of the service was lost in translation.
ze, err := newZipkinExporter(cfg.URL, "<missing service name>", 0, cfg.Format)
if err != nil {
return nil, err
}

return ze, nil
return NewTraceExporter(logger, config)
}

// CreateMetricsExporter creates a metrics exporter based on this config.
Expand Down
1 change: 1 addition & 0 deletions exporter/zipkinexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ exporters:
zipkin/2:
url: "https://somedest:1234/api/v2/spans"
format: proto
default_service_name: test_name

service:
pipelines:
Expand Down
130 changes: 73 additions & 57 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@
package zipkinexporter

import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"strconv"
"sync"
"time"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
"go.opencensus.io/trace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
"github.com/open-telemetry/opentelemetry-collector/observability"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace"
spandatatranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/spandata"
"github.com/open-telemetry/opentelemetry-collector/translator/trace/zipkin"
Expand All @@ -44,40 +46,65 @@ import (
// Zipkin servers and then transform them back to the final form when creating an
// OpenCensus spandata.
type zipkinExporter struct {
// mu protects the fields below
mu sync.Mutex

defaultServiceName string

reporter zipkinreporter.Reporter
url string
client *http.Client
serializer zipkinreporter.SpanSerializer
}

// Default values for Zipkin endpoint.
const (
defaultTimeout = time.Second * 5

defaultServiceName string = "<missing service name>"

DefaultZipkinEndpointHostPort = "localhost:9411"
DefaultZipkinEndpointURL = "http://" + DefaultZipkinEndpointHostPort + "/api/v2/spans"
)

func newZipkinExporter(finalEndpointURI, defaultServiceName string, uploadPeriod time.Duration, format string) (*zipkinExporter, error) {
var opts []zipkinhttp.ReporterOption
if uploadPeriod > 0 {
opts = append(opts, zipkinhttp.BatchInterval(uploadPeriod))
// NewTraceExporter creates an zipkin trace exporter.
func NewTraceExporter(logger *zap.Logger, config configmodels.Exporter) (exporter.TraceExporter, error) {
ze, err := createZipkinExporter(logger, config)
if err != nil {
return nil, err
}
zexp, err := exporterhelper.NewTraceExporter(
config,
ze.PushTraceData,
exporterhelper.WithTracing(true),
exporterhelper.WithMetrics(true))
if err != nil {
return nil, err
}

return zexp, nil
}

func createZipkinExporter(logger *zap.Logger, config configmodels.Exporter) (*zipkinExporter, error) {
zCfg := config.(*Config)

serviceName := defaultServiceName
if zCfg.DefaultServiceName != "" {
serviceName = zCfg.DefaultServiceName
}

ze := &zipkinExporter{
defaultServiceName: serviceName,
url: zCfg.URL,
client: &http.Client{Timeout: defaultTimeout},
}
// default is json
switch format {

switch zCfg.Format {
case "json":
break
ze.serializer = zipkinreporter.JSONSerializer{}
case "proto":
opts = append(opts, zipkinhttp.Serializer(zipkinproto.SpanSerializer{}))
ze.serializer = zipkinproto.SpanSerializer{}
default:
return nil, fmt.Errorf("%s is not one of json or proto", format)
}
reporter := zipkinhttp.NewReporter(finalEndpointURI, opts...)
zle := &zipkinExporter{
defaultServiceName: defaultServiceName,
reporter: reporter,
return nil, fmt.Errorf("%s is not one of json or proto", zCfg.Format)
}
return zle, nil

return ze, nil
}

// zipkinEndpointFromAttributes extracts zipkin endpoint information
Expand Down Expand Up @@ -160,48 +187,37 @@ func extractStringAttribute(
return value, ok
}

func (ze *zipkinExporter) Start(host component.Host) error {
return nil
}

func (ze *zipkinExporter) Shutdown() error {
ze.mu.Lock()
defer ze.mu.Unlock()

return ze.reporter.Close()
}

func (ze *zipkinExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) (zerr error) {
ctx, span := trace.StartSpan(ctx,
"opencensus.service.exporter.zipkin.ExportTrace",
trace.WithSampler(trace.NeverSample()))

defer func() {
if zerr != nil && span.IsRecordingEvents() {
span.SetStatus(trace.Status{Code: trace.StatusCodeInternal, Message: zerr.Error()})
}
span.End()
}()

goodSpans := 0
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) {
tbatch := []*zipkinmodel.SpanModel{}
for _, span := range td.Spans {
sd, err := spandatatranslator.ProtoSpanToOCSpanData(span)
if err != nil {
return consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(err)
}
zs := ze.zipkinSpan(td.Node, sd)
// ze.reporter can get closed in the midst of a Send
// so avoid a read/write during that mutation.
ze.mu.Lock()
ze.reporter.Send(zs)
ze.mu.Unlock()
goodSpans++
tbatch = append(tbatch, &zs)
}

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), err
}

// And finally record metrics on the number of exported spans.
observability.RecordMetricsForTraceExporter(observability.ContextWithExporterName(ctx, "zipkin"), len(td.Spans), len(td.Spans)-goodSpans)
req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
if err != nil {
return len(td.Spans), err
}
req.Header.Set("Content-Type", ze.serializer.ContentType())

return nil
resp, err := ze.client.Do(req)
if err != nil {
return len(td.Spans), err
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return len(td.Spans), fmt.Errorf("failed the request with status code %d", resp.StatusCode)
}
return 0, nil
}

// This code from down below is mostly copied from
Expand Down
108 changes: 61 additions & 47 deletions exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
Expand Down Expand Up @@ -149,20 +149,25 @@ func TestZipkinEndpointFromNode(t *testing.T) {
// The rest of the fields should match up exactly
func TestZipkinExporter_roundtripJSON(t *testing.T) {
buf := new(bytes.Buffer)
sizes := []int64{}
cst := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(buf, r.Body)
s, _ := io.Copy(buf, r.Body)
sizes = append(sizes, s)
r.Body.Close()
}))
defer cst.Close()

tes, err := newZipkinExporter(cst.URL, "", time.Millisecond, "json")
config := &Config{
URL: cst.URL,
Format: "json",
}
tes, err := NewTraceExporter(zap.NewNop(), config)
assert.NoError(t, err)
require.NotNil(t, tes)

// The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use
// a mock to ensure that this happens as intended.
mzr := newMockZipkinReporter(cst.URL)
tes.reporter = mzr

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := processor.NewTraceFanOutConnector([]consumer.TraceConsumer{tes})
Expand All @@ -185,46 +190,47 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
require.NoError(t, mzr.Flush())

// We expect back the exact JSON that was received
want := testutils.GenerateNormalizedJSON(t, `
[{
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db",
"kind": "CLIENT","name": "get",
"timestamp": 1472470996199000,"duration": 207000,
"localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"},
"remoteEndpoint": {"serviceName": "backend","ipv4": "192.168.99.101","port": 9000},
"annotations": [
{"timestamp": 1472470996238000,"value": "foo"},
{"timestamp": 1472470996403000,"value": "bar"}
],
"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"}
},
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91386","id": "4d1e00c0db9010db",
"kind": "SERVER","name": "put",
"timestamp": 1472470996199000,"duration": 207000,
"localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"},
"remoteEndpoint": {"serviceName": "frontend", "ipv4": "192.168.99.101","port": 9000},
"annotations": [
{"timestamp": 1472470996238000,"value": "foo"},
{"timestamp": 1472470996403000,"value": "bar"}
],
"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"}
},
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91386",
"id": "4d1e00c0db9010db",
"kind": "SERVER",
"name": "put",
"timestamp": 1472470996199000,
"duration": 207000
}]`)

// Finally we need to inspect the output
gotBytes, _ := ioutil.ReadAll(buf)
got := testutils.GenerateNormalizedJSON(t, string(gotBytes))
if got != want {
t.Errorf("RoundTrip result do not match:\nGot\n %s\n\nWant\n: %s\n", got, want)
wants := []string{`
[{
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db",
"kind": "CLIENT","name": "get",
"timestamp": 1472470996199000,"duration": 207000,
"localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"},
"remoteEndpoint": {"serviceName": "backend","ipv4": "192.168.99.101","port": 9000},
"annotations": [
{"timestamp": 1472470996238000,"value": "foo"},
{"timestamp": 1472470996403000,"value": "bar"}
],
"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"}
},
{
"traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91386","id": "4d1e00c0db9010db",
"kind": "SERVER","name": "put",
"timestamp": 1472470996199000,"duration": 207000,
"localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"},
"remoteEndpoint": {"serviceName": "frontend", "ipv4": "192.168.99.101","port": 9000},
"annotations": [
{"timestamp": 1472470996238000,"value": "foo"},
{"timestamp": 1472470996403000,"value": "bar"}
],
"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"}
}]
`, `
[{
"traceId": "4d1e00c0db9010db86154a4ba6e91385",
"parentId": "86154a4ba6e91386",
"id": "4d1e00c0db9010db",
"kind": "SERVER",
"name": "put",
"timestamp": 1472470996199000,
"duration": 207000
}]
`}
for i, s := range wants {
want := testutils.GenerateNormalizedJSON(t, s)
gotBytes := buf.Next(int(sizes[i]))
got := testutils.GenerateNormalizedJSON(t, string(gotBytes))
assert.Equal(t, want, got)
}
}

Expand Down Expand Up @@ -360,7 +366,12 @@ const zipkinSpansJSONJavaLibrary = `
`

func TestZipkinExporter_invalidFormat(t *testing.T) {
_, err := newZipkinExporter("", "", 0, "foobar")
config := &Config{
URL: "1.2.3.4",
Format: "foobar",
}
f := &Factory{}
_, err := f.CreateTraceExporter(zap.NewNop(), config)
require.Error(t, err)
}

Expand All @@ -375,13 +386,16 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
}))
defer cst.Close()

tes, err := newZipkinExporter(cst.URL, "", time.Millisecond, "proto")
config := &Config{
URL: cst.URL,
Format: "proto",
}
tes, err := NewTraceExporter(zap.NewNop(), config)
require.NoError(t, err)

// The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use
// a mock to ensure that this happens as intended.
mzr := newMockZipkinReporter(cst.URL)
tes.reporter = mzr

mzr.serializer = zipkinproto.SpanSerializer{}

Expand Down

0 comments on commit b04b16d

Please sign in to comment.