Skip to content

Commit

Permalink
Use the new marshaler/unmarshaler in Kafka exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 9, 2021
1 parent ce31fce commit 91a2d97
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 164 deletions.
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestWithMarshalers(t *testing.T) {
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpTracesPbMarshaler).Encoding()
cfg.Encoding = defaultEncoding
exporter, err := f.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
Expand Down
13 changes: 7 additions & 6 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/otlp"
"go.opentelemetry.io/collector/internal/testdata"
)

Expand Down Expand Up @@ -124,7 +125,7 @@ func TestTracesPusher(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: &otlpTracesPbMarshaler{},
marshaler: newPdataTracesMarshaler(otlp.NewProtobufTracesMarshaler(), defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -141,7 +142,7 @@ func TestTracesPusher_err(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: &otlpTracesPbMarshaler{},
marshaler: newPdataTracesMarshaler(otlp.NewProtobufTracesMarshaler(), defaultEncoding),
logger: zap.NewNop(),
}
t.Cleanup(func() {
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestMetricsDataPusher(t *testing.T) {

p := kafkaMetricsProducer{
producer: producer,
marshaler: &otlpMetricsPbMarshaler{},
marshaler: newPdataMetricsMarshaler(otlp.NewProtobufMetricsMarshaler(), defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -188,7 +189,7 @@ func TestMetricsDataPusher_err(t *testing.T) {

p := kafkaMetricsProducer{
producer: producer,
marshaler: &otlpMetricsPbMarshaler{},
marshaler: newPdataMetricsMarshaler(otlp.NewProtobufMetricsMarshaler(), defaultEncoding),
logger: zap.NewNop(),
}
t.Cleanup(func() {
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestLogsDataPusher(t *testing.T) {

p := kafkaLogsProducer{
producer: producer,
marshaler: &otlpLogsPbMarshaler{},
marshaler: newPdataLogsMarshaler(otlp.NewProtobufLogsMarshaler(), defaultEncoding),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -235,7 +236,7 @@ func TestLogsDataPusher_err(t *testing.T) {

p := kafkaLogsProducer{
producer: producer,
marshaler: &otlpLogsPbMarshaler{},
marshaler: newPdataLogsMarshaler(otlp.NewProtobufLogsMarshaler(), defaultEncoding),
logger: zap.NewNop(),
}
t.Cleanup(func() {
Expand Down
13 changes: 7 additions & 6 deletions exporter/kafkaexporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/Shopify/sarama"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/otlp"
)

// TracesMarshaler marshals traces into Message array.
Expand Down Expand Up @@ -49,28 +50,28 @@ type LogsMarshaler interface {

// tracesMarshalers returns map of supported encodings with TracesMarshaler.
func tracesMarshalers() map[string]TracesMarshaler {
otlppb := &otlpTracesPbMarshaler{}
otlpPb := newPdataTracesMarshaler(otlp.NewProtobufTracesMarshaler(), defaultEncoding)
jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
return map[string]TracesMarshaler{
otlppb.Encoding(): otlppb,
otlpPb.Encoding(): otlpPb,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
}
}

// metricsMarshalers returns map of supported encodings and MetricsMarshaler
func metricsMarshalers() map[string]MetricsMarshaler {
otlppb := &otlpMetricsPbMarshaler{}
otlpPb := newPdataMetricsMarshaler(otlp.NewProtobufMetricsMarshaler(), defaultEncoding)
return map[string]MetricsMarshaler{
otlppb.Encoding(): otlppb,
otlpPb.Encoding(): otlpPb,
}
}

// logsMarshalers returns map of supported encodings and LogsMarshaler
func logsMarshalers() map[string]LogsMarshaler {
otlppb := &otlpLogsPbMarshaler{}
otlpPb := newPdataLogsMarshaler(otlp.NewProtobufLogsMarshaler(), defaultEncoding)
return map[string]LogsMarshaler{
otlppb.Encoding(): otlppb,
otlpPb.Encoding(): otlpPb,
}
}
84 changes: 0 additions & 84 deletions exporter/kafkaexporter/otlp_marshaler.go

This file was deleted.

67 changes: 0 additions & 67 deletions exporter/kafkaexporter/otlp_marshaler_test.go

This file was deleted.

109 changes: 109 additions & 0 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"github.com/Shopify/sarama"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/model"
)

type pdataLogsMarshaler struct {
marshaler model.LogsMarshaler
encoding string
}

func (p pdataLogsMarshaler) Marshal(ld pdata.Logs, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.Marshal(ld)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
}

func (p pdataLogsMarshaler) Encoding() string {
return p.encoding
}

func newPdataLogsMarshaler(marshaler model.LogsMarshaler, encoding string) LogsMarshaler {
return pdataLogsMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

type pdataMetricsMarshaler struct {
marshaler model.MetricsMarshaler
encoding string
}

func (p pdataMetricsMarshaler) Marshal(ld pdata.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.Marshal(ld)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
}

func (p pdataMetricsMarshaler) Encoding() string {
return p.encoding
}

func newPdataMetricsMarshaler(marshaler model.MetricsMarshaler, encoding string) MetricsMarshaler {
return pdataMetricsMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

type pdataTracesMarshaler struct {
marshaler model.TracesMarshaler
encoding string
}

func (p pdataTracesMarshaler) Marshal(td pdata.Traces, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.Marshal(td)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
}

func (p pdataTracesMarshaler) Encoding() string {
return p.encoding
}

func newPdataTracesMarshaler(marshaler model.TracesMarshaler, encoding string) TracesMarshaler {
return pdataTracesMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

0 comments on commit 91a2d97

Please sign in to comment.