Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more encodings for Kafka in OTel Ingester #2580

Merged
merged 2 commits into from
Oct 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/opentelemetry/app/exporter/badgerexporter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.


// Package badgerexporter implements Jaeger Badger storage as OpenTelemetry exporter.
package badgerexporter
13 changes: 13 additions & 0 deletions cmd/opentelemetry/app/exporter/kafkaexporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,25 @@ package kafkaexporter

import (
"flag"
"fmt"

"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// encodingOTLPProto is used for spans encoded as OTLP Protobuf.
encodingOTLPProto = "otlp-proto"
)

// AddFlags adds Ingester flags.
func AddFlags(flags *flag.FlagSet) {
opts := &kafka.Options{}
opts.AddOTELFlags(flags)
// Modify kafka.producer.encoding flag
flags.Lookup("kafka.producer.encoding").Usage = fmt.Sprintf(
`Encoding of spans ("%s", "%s" or "%s") sent to kafka.`,
kafka.EncodingJSON,
kafka.EncodingProto,
encodingOTLPProto,
)
}
17 changes: 15 additions & 2 deletions cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/kafkaexporter"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/receiver/kafkareceiver"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand Down Expand Up @@ -59,7 +58,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := &kafka.Options{}
opts.InitFromViper(f.Viper)

cfg.Encoding = kafkareceiver.MustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Topic = opts.Topic
cfg.Brokers = opts.Config.Brokers
cfg.ProtocolVersion = opts.Config.ProtocolVersion
Expand Down Expand Up @@ -127,3 +126,17 @@ func (f Factory) CreateLogsExporter(
) (component.LogsExporter, error) {
return f.Wrapped.CreateLogsExporter(ctx, params, cfg)
}

// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
switch jaegerEncoding {
case kafka.EncodingProto:
XSAM marked this conversation as resolved.
Show resolved Hide resolved
return "jaeger_proto"
case kafka.EncodingJSON:
return "jaeger_json"
case encodingOTLPProto:
return "otlp_proto"
}

panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -86,3 +87,37 @@ func TestLoadConfigAndFlags(t *testing.T) {
assert.Equal(t, "/etc/foo", kafkaCfg.Authentication.Kerberos.ConfigPath)
assert.Equal(t, "from-jaeger-config", kafkaCfg.Authentication.Kerberos.Username)
}

func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
tests := []struct {
in string
expected string
expectsPanic bool
}{
{
in: kafka.EncodingProto,
expected: "jaeger_proto",
},
{
in: kafka.EncodingJSON,
expected: "jaeger_json",
},
{
in: encodingOTLPProto,
expected: "otlp_proto",
},
{
in: "not-an-encoding",
expectsPanic: true,
},
}

for _, tt := range tests {
if tt.expectsPanic {
assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) })
continue
}

assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in))
}
}
19 changes: 19 additions & 0 deletions cmd/opentelemetry/app/receiver/kafkareceiver/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,30 @@ package kafkareceiver

import (
"flag"
"fmt"
"strings"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// encodingZipkinProto is used for spans encoded as Zipkin Protobuf.
encodingZipkinProto = "zipkin-proto"
// encodingZipkinJSON is used for spans encoded as Zipkin JSON.
encodingZipkinJSON = "zipkin-json"
// encodingOTLPProto is used for spans encoded as OTLP Protobuf.
encodingOTLPProto = "otlp-proto"
)

// AddFlags adds Ingester flags.
func AddFlags(flags *flag.FlagSet) {
ingesterApp.AddOTELFlags(flags)
// Modify kafka.consumer.encoding flag
flags.Lookup(ingesterApp.KafkaConsumerConfigPrefix + ingesterApp.SuffixEncoding).Usage = fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(
append(kafka.AllEncodings,
encodingZipkinJSON,
encodingZipkinProto,
encodingOTLPProto,
), "\", \""))
}
14 changes: 11 additions & 3 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {

cfg.Brokers = opts.Brokers
cfg.ClientID = opts.ClientID
cfg.Encoding = MustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.GroupID = opts.GroupID
cfg.Topic = opts.Topic
cfg.ProtocolVersion = opts.ProtocolVersion
Expand Down Expand Up @@ -138,13 +138,21 @@ func (f Factory) CreateLogsReceiver(
return f.Wrapped.CreateLogsReceiver(ctx, params, cfg, nextConsumer)
}

// MustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
switch jaegerEncoding {
case kafka.EncodingProto:
return "jaeger_proto"
case kafka.EncodingJSON:
return "jaeger_json"
case encodingOTLPProto:
return "otlp_proto"
case encodingZipkinProto:
return "zipkin_proto"
case encodingZipkinJSON:
return "zipkin_json"
case kafka.EncodingZipkinThrift:
return "zipkin_thrift"
}

panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
Expand Down
29 changes: 18 additions & 11 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
in: kafka.EncodingJSON,
expected: "jaeger_json",
},
{
in: encodingOTLPProto,
expected: "otlp_proto",
},
{
in: encodingZipkinProto,
expected: "zipkin_proto",
},
{
in: encodingZipkinJSON,
expected: "zipkin_json",
},
{
in: kafka.EncodingZipkinThrift,
expected: "zipkin_thrift",
},
{
in: "not-an-encoding",
expectsPanic: true,
Expand All @@ -112,19 +128,10 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {

for _, tt := range tests {
if tt.expectsPanic {
assertPanic(t, func() { MustOtelEncodingForJaegerEncoding(tt.in) })
assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) })
continue
}

assert.Equal(t, tt.expected, MustOtelEncodingForJaegerEncoding(tt.in))
assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in))
}
}

func assertPanic(t *testing.T, f func()) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
f()
}