Skip to content

Commit

Permalink
Add HTTP/JSON to the otlp exporter (#1586)
Browse files Browse the repository at this point in the history
* Add HTTP/JSON to the otlp exporter

Co-Authored-By: Roy van de Water <[email protected]>

* PR fixup

Co-authored-by: Roy van de Water <[email protected]>
  • Loading branch information
MadVikingGod and royvandewater authored Mar 5, 2021
1 parent 62e2a0f commit 7153ef2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

## Added

- Added `Marshler` config option to `otlphttp` to enable otlp over json or protobufs. (#1586)
### Removed

- Removed the exported `SimpleSpanProcessor` and `BatchSpanProcessor` structs.
Expand Down
24 changes: 20 additions & 4 deletions exporters/otlp/otlphttp/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"strings"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
colmetricspb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
Expand All @@ -37,7 +40,8 @@ import (
tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
)

const contentType = "application/x-protobuf"
const contentTypeProto = "application/x-protobuf"
const contentTypeJSON = "application/json"

// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
Expand Down Expand Up @@ -142,7 +146,7 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
pbRequest := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: rms,
}
rawRequest, err := pbRequest.Marshal()
rawRequest, err := d.marshal(pbRequest)
if err != nil {
return err
}
Expand All @@ -158,13 +162,21 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
pbRequest := &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
}
rawRequest, err := pbRequest.Marshal()
rawRequest, err := d.marshal(pbRequest)
if err != nil {
return err
}
return d.send(ctx, rawRequest, d.cfg.tracesURLPath)
}

func (d *driver) marshal(msg proto.Message) ([]byte, error) {
if d.cfg.marshaler == MarshalJSON {
s, err := (&jsonpb.Marshaler{}).MarshalToString(msg)
return []byte(s), err
}
return proto.Marshal(msg)
}

func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath)
var cancel context.CancelFunc
Expand Down Expand Up @@ -267,7 +279,11 @@ func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Head
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
headers.Set("Content-Type", contentType)
if d.cfg.marshaler == MarshalJSON {
headers.Set("Content-Type", contentTypeJSON)
} else {
headers.Set("Content-Type", contentTypeProto)
}
requestReader := bytes.NewBuffer(rawRequest)
switch d.cfg.compression {
case NoCompression:
Expand Down
6 changes: 6 additions & 0 deletions exporters/otlp/otlphttp/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func TestEndToEnd(t *testing.T) {
ExpectedHeaders: testHeaders,
},
},
{
name: "with json encoding",
opts: []otlphttp.Option{
otlphttp.WithMarshal(otlphttp.MarshalJSON),
},
},
}

for _, tc := range tests {
Expand Down
34 changes: 28 additions & 6 deletions exporters/otlp/otlphttp/mock_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -109,15 +110,25 @@ func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectormetricpb.ExportMetricsServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {
request, err := unmarshalMetricsRequest(rawRequest, r.Header.Get("content-type"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.metricLock.Lock()
defer c.metricLock.Unlock()
c.metricsStorage.AddMetrics(&request)
c.metricsStorage.AddMetrics(request)
}

func unmarshalMetricsRequest(rawRequest []byte, contentType string) (*collectormetricpb.ExportMetricsServiceRequest, error) {
request := &collectormetricpb.ExportMetricsServiceRequest{}
if contentType == "application/json" {
err := jsonpb.UnmarshalString(string(rawRequest), request)
return request, err
}
err := request.Unmarshal(rawRequest)
return request, err
}

func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
Expand All @@ -140,15 +151,26 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectortracepb.ExportTraceServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {

request, err := unmarshalTraceRequest(rawRequest, r.Header.Get("content-type"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.spanLock.Lock()
defer c.spanLock.Unlock()
c.spansStorage.AddSpans(&request)
c.spansStorage.AddSpans(request)
}

func unmarshalTraceRequest(rawRequest []byte, contentType string) (*collectortracepb.ExportTraceServiceRequest, error) {
request := &collectortracepb.ExportTraceServiceRequest{}
if contentType == "application/json" {
err := jsonpb.UnmarshalString(string(rawRequest), request)
return request, err
}
err := request.Unmarshal(rawRequest)
return request, err
}

func (c *mockCollector) checkHeaders(r *http.Request) bool {
Expand Down
24 changes: 24 additions & 0 deletions exporters/otlp/otlphttp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ const (
DefaultBackoff time.Duration = 300 * time.Millisecond
)

// Marshaler describes the kind of message format sent to the collector
type Marshaler int

const (
// MarshalProto tells the driver to send using the protobuf binary format.
MarshalProto Marshaler = iota
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)

type config struct {
endpoint string
compression Compression
Expand All @@ -58,6 +68,7 @@ type config struct {
tlsCfg *tls.Config
insecure bool
headers map[string]string
marshaler Marshaler
}

// Option applies an option to the HTTP driver.
Expand Down Expand Up @@ -201,3 +212,16 @@ func (headersOption) private() {}
func WithHeaders(headers map[string]string) Option {
return (headersOption)(headers)
}

type marshalerOption Marshaler

func (o marshalerOption) Apply(cfg *config) {
cfg.marshaler = Marshaler(o)
}
func (marshalerOption) private() {}

// WithMarshal tells the driver which wire format to use when sending to the
// collector. If unset, MarshalProto will be used
func WithMarshal(m Marshaler) Option {
return marshalerOption(m)
}

0 comments on commit 7153ef2

Please sign in to comment.