From e1ece092aa333135bd36af70ae1eeb1d2ec6f63d Mon Sep 17 00:00:00 2001
From: Thomas Hamm
Date: Sun, 8 Sep 2024 17:06:12 +0200
Subject: [PATCH] [receiver/kafkareceiver] Add encoding extensions support
---
.../kafkareceiver-encoding-extensions.yaml | 28 +++
receiver/kafkareceiver/README.md | 2 +-
receiver/kafkareceiver/factory.go | 69 +-----
receiver/kafkareceiver/factory_test.go | 12 +-
receiver/kafkareceiver/kafka_receiver.go | 102 +++++++--
receiver/kafkareceiver/kafka_receiver_test.go | 200 +++++++++++++++---
receiver/kafkareceiver/unmarshaler.go | 42 ++++
7 files changed, 333 insertions(+), 122 deletions(-)
create mode 100644 .chloggen/kafkareceiver-encoding-extensions.yaml
diff --git a/.chloggen/kafkareceiver-encoding-extensions.yaml b/.chloggen/kafkareceiver-encoding-extensions.yaml
new file mode 100644
index 000000000000..60cd65dc5028
--- /dev/null
+++ b/.chloggen/kafkareceiver-encoding-extensions.yaml
@@ -0,0 +1,28 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: enhancement
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: kafkareceiver
+
+# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Add support for encoding extensions in the Kafka receiver.
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [33888]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext: |
+ This change adds support for encoding extensions in the Kafka receiver. Loading extensions takes precedence over the internally supported encodings.
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: [user]
diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md
index 495fb3b422ae..98d8e56377f8 100644
--- a/receiver/kafkareceiver/README.md
+++ b/receiver/kafkareceiver/README.md
@@ -29,7 +29,7 @@ The following settings can be optionally configured:
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from.
Only one telemetry type may be used for a given topic.
-- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
+- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go
index a6fe7ace9f74..a7749d45809a 100644
--- a/receiver/kafkareceiver/factory.go
+++ b/receiver/kafkareceiver/factory.go
@@ -54,40 +54,9 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaReceiverFactory)
-// withTracesUnmarshalers adds Unmarshalers.
-func withTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption {
- return func(factory *kafkaReceiverFactory) {
- for _, unmarshaler := range tracesUnmarshalers {
- factory.tracesUnmarshalers[unmarshaler.Encoding()] = unmarshaler
- }
- }
-}
-
-// withMetricsUnmarshalers adds MetricsUnmarshalers.
-func withMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption {
- return func(factory *kafkaReceiverFactory) {
- for _, unmarshaler := range metricsUnmarshalers {
- factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
- }
- }
-}
-
-// withLogsUnmarshalers adds LogsUnmarshalers.
-func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
- return func(factory *kafkaReceiverFactory) {
- for _, unmarshaler := range logsUnmarshalers {
- factory.logsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
- }
- }
-}
-
// NewFactory creates Kafka receiver factory.
func NewFactory(options ...FactoryOption) receiver.Factory {
- f := &kafkaReceiverFactory{
- tracesUnmarshalers: map[string]TracesUnmarshaler{},
- metricsUnmarshalers: map[string]MetricsUnmarshaler{},
- logsUnmarshalers: map[string]LogsUnmarshaler{},
- }
+ f := &kafkaReceiverFactory{}
for _, o := range options {
o(f)
}
@@ -133,11 +102,7 @@ func createDefaultConfig() component.Config {
}
}
-type kafkaReceiverFactory struct {
- tracesUnmarshalers map[string]TracesUnmarshaler
- metricsUnmarshalers map[string]MetricsUnmarshaler
- logsUnmarshalers map[string]LogsUnmarshaler
-}
+type kafkaReceiverFactory struct{}
func (f *kafkaReceiverFactory) createTracesReceiver(
_ context.Context,
@@ -145,20 +110,12 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
cfg component.Config,
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
- for encoding, unmarshal := range defaultTracesUnmarshalers() {
- f.tracesUnmarshalers[encoding] = unmarshal
- }
-
oCfg := *(cfg.(*Config))
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
- unmarshaler := f.tracesUnmarshalers[oCfg.Encoding]
- if unmarshaler == nil {
- return nil, errUnrecognizedEncoding
- }
- r, err := newTracesReceiver(oCfg, set, unmarshaler, nextConsumer)
+ r, err := newTracesReceiver(oCfg, set, nextConsumer)
if err != nil {
return nil, err
}
@@ -171,20 +128,12 @@ func (f *kafkaReceiverFactory) createMetricsReceiver(
cfg component.Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
- for encoding, unmarshal := range defaultMetricsUnmarshalers() {
- f.metricsUnmarshalers[encoding] = unmarshal
- }
-
oCfg := *(cfg.(*Config))
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
- unmarshaler := f.metricsUnmarshalers[oCfg.Encoding]
- if unmarshaler == nil {
- return nil, errUnrecognizedEncoding
- }
- r, err := newMetricsReceiver(oCfg, set, unmarshaler, nextConsumer)
+ r, err := newMetricsReceiver(oCfg, set, nextConsumer)
if err != nil {
return nil, err
}
@@ -197,20 +146,12 @@ func (f *kafkaReceiverFactory) createLogsReceiver(
cfg component.Config,
nextConsumer consumer.Logs,
) (receiver.Logs, error) {
- for encoding, unmarshaler := range defaultLogsUnmarshalers(set.BuildInfo.Version, set.Logger) {
- f.logsUnmarshalers[encoding] = unmarshaler
- }
-
oCfg := *(cfg.(*Config))
if oCfg.Topic == "" {
oCfg.Topic = defaultLogsTopic
}
- unmarshaler, err := getLogsUnmarshaler(oCfg.Encoding, f.logsUnmarshalers)
- if err != nil {
- return nil, err
- }
- r, err := newLogsReceiver(oCfg, set, unmarshaler, nextConsumer)
+ r, err := newLogsReceiver(oCfg, set, nextConsumer)
if err != nil {
return nil, err
}
diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go
index 87c435edba2a..d49ee096391f 100644
--- a/receiver/kafkareceiver/factory_test.go
+++ b/receiver/kafkareceiver/factory_test.go
@@ -37,7 +37,7 @@ func TestCreateTracesReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
- f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()}
+ f := kafkaReceiverFactory{}
r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
require.NoError(t, err)
// no available broker
@@ -46,7 +46,7 @@ func TestCreateTracesReceiver(t *testing.T) {
func TestWithTracesUnmarshalers(t *testing.T) {
unmarshaler := &customTracesUnmarshaler{}
- f := NewFactory(withTracesUnmarshalers(unmarshaler))
+ f := NewFactory()
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
@@ -76,7 +76,7 @@ func TestCreateMetricsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
- f := kafkaReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()}
+ f := kafkaReceiverFactory{}
r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
require.NoError(t, err)
// no available broker
@@ -85,7 +85,7 @@ func TestCreateMetricsReceiver(t *testing.T) {
func TestWithMetricsUnmarshalers(t *testing.T) {
unmarshaler := &customMetricsUnmarshaler{}
- f := NewFactory(withMetricsUnmarshalers(unmarshaler))
+ f := NewFactory()
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
@@ -115,7 +115,7 @@ func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
- f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())}
+ f := kafkaReceiverFactory{}
r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
require.NoError(t, err)
// no available broker
@@ -146,7 +146,7 @@ func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) {
func TestWithLogsUnmarshalers(t *testing.T) {
unmarshaler := &customLogsUnmarshaler{}
- f := NewFactory(withLogsUnmarshalers(unmarshaler))
+ f := NewFactory()
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go
index d4554b5437f5..36720bad8af2 100644
--- a/receiver/kafkareceiver/kafka_receiver.go
+++ b/receiver/kafkareceiver/kafka_receiver.go
@@ -14,6 +14,9 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
+ "go.opentelemetry.io/collector/pdata/plog"
+ "go.opentelemetry.io/collector/pdata/pmetric"
+ "go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel/attribute"
@@ -100,11 +103,7 @@ var _ receiver.Traces = (*kafkaTracesConsumer)(nil)
var _ receiver.Metrics = (*kafkaMetricsConsumer)(nil)
var _ receiver.Logs = (*kafkaLogsConsumer)(nil)
-func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesUnmarshaler, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) {
- if unmarshaler == nil {
- return nil, errUnrecognizedEncoding
- }
-
+func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
@@ -114,7 +113,6 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
- unmarshaler: unmarshaler,
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
@@ -170,6 +168,22 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
if err != nil {
return err
}
+ // extensions take precedence over internal encodings
+ if unmarshaler, errExt := LoadEncodingExtension[ptrace.Unmarshaler](
+ host,
+ c.config.Encoding,
+ ); errExt == nil {
+ c.unmarshaler = &tracesEncodingUnmarshaler{
+ unmarshaler: *unmarshaler,
+ encoding: c.config.Encoding,
+ }
+ }
+ if unmarshaler, ok := defaultTracesUnmarshalers()[c.config.Encoding]; c.unmarshaler == nil && ok {
+ c.unmarshaler = unmarshaler
+ }
+ if c.unmarshaler == nil {
+ return errUnrecognizedEncoding
+ }
// consumerGroup may be set in tests to inject fake implementation.
if c.consumerGroup == nil {
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
@@ -229,11 +243,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
return c.consumerGroup.Close()
}
-func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler MetricsUnmarshaler, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) {
- if unmarshaler == nil {
- return nil, errUnrecognizedEncoding
- }
-
+func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
@@ -243,7 +253,6 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
- unmarshaler: unmarshaler,
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
@@ -267,6 +276,22 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
if err != nil {
return err
}
+ // extensions take precedence over internal encodings
+ if unmarshaler, errExt := LoadEncodingExtension[pmetric.Unmarshaler](
+ host,
+ c.config.Encoding,
+ ); errExt == nil {
+ c.unmarshaler = &metricsEncodingUnmarshaler{
+ unmarshaler: *unmarshaler,
+ encoding: c.config.Encoding,
+ }
+ }
+ if unmarshaler, ok := defaultMetricsUnmarshalers()[c.config.Encoding]; c.unmarshaler == nil && ok {
+ c.unmarshaler = unmarshaler
+ }
+ if c.unmarshaler == nil {
+ return errUnrecognizedEncoding
+ }
// consumerGroup may be set in tests to inject fake implementation.
if c.consumerGroup == nil {
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
@@ -326,11 +351,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
return c.consumerGroup.Close()
}
-func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
- if unmarshaler == nil {
- return nil, errUnrecognizedEncoding
- }
-
+func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
@@ -340,7 +361,6 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
- unmarshaler: unmarshaler,
settings: set,
autocommitEnabled: config.AutoCommit.Enable,
messageMarking: config.MessageMarking,
@@ -364,6 +384,25 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
if err != nil {
return err
}
+ // extensions take precedence over internal encodings
+ if unmarshaler, errExt := LoadEncodingExtension[plog.Unmarshaler](
+ host,
+ c.config.Encoding,
+ ); errExt == nil {
+ c.unmarshaler = &logsEncodingUnmarshaler{
+ unmarshaler: *unmarshaler,
+ encoding: c.config.Encoding,
+ }
+ }
+ if unmarshaler, errInt := getLogsUnmarshaler(
+ c.config.Encoding,
+ defaultLogsUnmarshalers(c.settings.BuildInfo.Version, c.settings.Logger),
+ ); c.unmarshaler == nil && errInt == nil {
+ c.unmarshaler = unmarshaler
+ }
+ if c.unmarshaler == nil {
+ return errUnrecognizedEncoding
+ }
// consumerGroup may be set in tests to inject fake implementation.
if c.consumerGroup == nil {
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
@@ -720,3 +759,30 @@ func toSaramaInitialOffset(initialOffset string) (int64, error) {
return 0, errInvalidInitialOffset
}
}
+
+// LoadEncodingExtension tries to load an available extension for the given encoding.
+func LoadEncodingExtension[T any](host component.Host, encoding string) (*T, error) {
+ extensionID, err := EncodingToComponentID(encoding)
+ if err != nil {
+ return nil, err
+ }
+ encodingExtension, ok := host.GetExtensions()[*extensionID]
+ if !ok {
+ return nil, fmt.Errorf("unknown encoding extension %q", encoding)
+ }
+ unmarshaler, ok := encodingExtension.(T)
+ if !ok {
+ return nil, fmt.Errorf("extension %q is not an unmarshaler", encoding)
+ }
+ return &unmarshaler, nil
+}
+
+// EncodingToComponentID converts an encoding string to a component ID using the given encoding as type.
+func EncodingToComponentID(encoding string) (*component.ID, error) {
+ componentType, err := component.NewType(encoding)
+ if err != nil {
+ return nil, fmt.Errorf("invalid component type: %w", err)
+ }
+ id := component.NewID(componentType)
+ return &id, nil
+}
diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go
index 755fe2ea351a..7359f8b6e5ce 100644
--- a/receiver/kafkareceiver/kafka_receiver_test.go
+++ b/receiver/kafkareceiver/kafka_receiver_test.go
@@ -13,6 +13,7 @@ import (
"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumertest"
@@ -39,21 +40,20 @@ func TestNewTracesReceiver_version_err(t *testing.T) {
Encoding: defaultEncoding,
ProtocolVersion: "none",
}
- unmarshaler := defaultTracesUnmarshalers()[c.Encoding]
- r, err := newTracesReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newTracesReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
}
func TestNewTracesReceiver_encoding_err(t *testing.T) {
- c := Config{
- Encoding: "foo",
- }
- unmarshaler := defaultTracesUnmarshalers()[c.Encoding]
- r, err := newTracesReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ c := createDefaultConfig().(*Config)
+ c.Encoding = "foo"
+ r, err := newTracesReceiver(*c, receivertest.NewNopSettings(), consumertest.NewNop())
+ require.NoError(t, err)
+ require.NotNil(t, r)
+ err = r.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
- assert.Nil(t, r)
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
}
@@ -72,9 +72,9 @@ func TestNewTracesReceiver_err_auth_type(t *testing.T) {
Full: false,
},
}
- unmarshaler := defaultTracesUnmarshalers()[c.Encoding]
- r, err := newTracesReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newTracesReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Contains(t, err.Error(), "failed to load TLS config")
}
@@ -84,9 +84,9 @@ func TestNewTracesReceiver_initial_offset_err(t *testing.T) {
InitialOffset: "foo",
Encoding: defaultEncoding,
}
- unmarshaler := defaultTracesUnmarshalers()[c.Encoding]
- r, err := newTracesReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newTracesReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.EqualError(t, err, errInvalidInitialOffset.Error())
@@ -94,6 +94,7 @@ func TestNewTracesReceiver_initial_offset_err(t *testing.T) {
func TestTracesReceiverStart(t *testing.T) {
c := kafkaTracesConsumer{
+ config: Config{Encoding: defaultEncoding},
nextConsumer: consumertest.NewNop(),
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
@@ -131,6 +132,7 @@ func TestTracesReceiver_error(t *testing.T) {
expectedErr := errors.New("handler error")
c := kafkaTracesConsumer{
+ config: Config{Encoding: defaultEncoding},
nextConsumer: consumertest.NewNop(),
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
@@ -363,13 +365,34 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
wg.Wait()
}
+func TestTracesReceiver_encoding_extension(t *testing.T) {
+ zcore, logObserver := observer.New(zapcore.ErrorLevel)
+ logger := zap.New(zcore)
+ settings := receivertest.NewNopSettings()
+ settings.Logger = logger
+
+ expectedErr := errors.New("handler error")
+ c := kafkaTracesConsumer{
+ config: Config{Encoding: "traces_encoding"},
+ nextConsumer: consumertest.NewNop(),
+ settings: settings,
+ consumerGroup: &testConsumerGroup{err: expectedErr},
+ telemetryBuilder: nopTelemetryBuilder(t),
+ }
+
+ require.NoError(t, c.Start(context.Background(), &testComponentHost{}))
+ require.NoError(t, c.Shutdown(context.Background()))
+ assert.Eventually(t, func() bool {
+ return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
+ }, 10*time.Second, time.Millisecond*100)
+}
+
func TestNewMetricsReceiver_version_err(t *testing.T) {
c := Config{
Encoding: defaultEncoding,
ProtocolVersion: "none",
}
- unmarshaler := defaultMetricsUnmarshalers()[c.Encoding]
- r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
@@ -379,9 +402,10 @@ func TestNewMetricsReceiver_encoding_err(t *testing.T) {
c := Config{
Encoding: "foo",
}
- unmarshaler := defaultMetricsUnmarshalers()[c.Encoding]
- _, err := newMetricsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
- require.Error(t, err)
+ r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
+ require.NoError(t, err)
+ require.NotNil(t, r)
+ err = r.Start(context.Background(), componenttest.NewNopHost())
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
}
@@ -400,9 +424,9 @@ func TestNewMetricsExporter_err_auth_type(t *testing.T) {
Full: false,
},
}
- unmarshaler := defaultMetricsUnmarshalers()[c.Encoding]
- r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
@@ -413,9 +437,9 @@ func TestNewMetricsReceiver_initial_offset_err(t *testing.T) {
InitialOffset: "foo",
Encoding: defaultEncoding,
}
- unmarshaler := defaultMetricsUnmarshalers()[c.Encoding]
- r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newMetricsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.EqualError(t, err, errInvalidInitialOffset.Error())
@@ -448,6 +472,7 @@ func TestMetricsReceiver_error(t *testing.T) {
expectedErr := errors.New("handler error")
c := kafkaMetricsConsumer{
+ config: Config{Encoding: defaultEncoding},
nextConsumer: consumertest.NewNop(),
settings: settings,
consumerGroup: &testConsumerGroup{err: expectedErr},
@@ -678,14 +703,36 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
wg.Wait()
}
+func TestMetricsReceiver_encoding_extension(t *testing.T) {
+ zcore, logObserver := observer.New(zapcore.ErrorLevel)
+ logger := zap.New(zcore)
+ settings := receivertest.NewNopSettings()
+ settings.Logger = logger
+
+ expectedErr := errors.New("handler error")
+ c := kafkaMetricsConsumer{
+ config: Config{Encoding: "metrics_encoding"},
+ nextConsumer: consumertest.NewNop(),
+ settings: settings,
+ consumerGroup: &testConsumerGroup{err: expectedErr},
+ telemetryBuilder: nopTelemetryBuilder(t),
+ }
+
+ require.NoError(t, c.Start(context.Background(), &testComponentHost{}))
+ require.NoError(t, c.Shutdown(context.Background()))
+ assert.Eventually(t, func() bool {
+ return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
+ }, 10*time.Second, time.Millisecond*100)
+}
+
func TestNewLogsReceiver_version_err(t *testing.T) {
c := Config{
Encoding: defaultEncoding,
ProtocolVersion: "none",
}
- unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding]
- r, err := newLogsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newLogsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
}
@@ -694,10 +741,11 @@ func TestNewLogsReceiver_encoding_err(t *testing.T) {
c := Config{
Encoding: "foo",
}
- unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding]
- r, err := newLogsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
- require.Error(t, err)
- assert.Nil(t, r)
+ r, err := newLogsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
+ require.NoError(t, err)
+ require.NotNil(t, r)
+ err = r.Start(context.Background(), componenttest.NewNopHost())
+ assert.Error(t, err)
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
}
@@ -716,9 +764,9 @@ func TestNewLogsExporter_err_auth_type(t *testing.T) {
Full: false,
},
}
- unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding]
- r, err := newLogsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newLogsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
@@ -729,9 +777,9 @@ func TestNewLogsReceiver_initial_offset_err(t *testing.T) {
InitialOffset: "foo",
Encoding: defaultEncoding,
}
- unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding]
- r, err := newLogsReceiver(c, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newLogsReceiver(c, receivertest.NewNopSettings(), consumertest.NewNop())
require.NoError(t, err)
+ require.NotNil(t, r)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.EqualError(t, err, errInvalidInitialOffset.Error())
@@ -739,6 +787,7 @@ func TestNewLogsReceiver_initial_offset_err(t *testing.T) {
func TestLogsReceiverStart(t *testing.T) {
c := kafkaLogsConsumer{
+ config: *createDefaultConfig().(*Config),
nextConsumer: consumertest.NewNop(),
settings: receivertest.NewNopSettings(),
consumerGroup: &testConsumerGroup{},
@@ -1123,12 +1172,36 @@ func TestCreateLogsReceiver_encoding_text_error(t *testing.T) {
cfg := Config{
Encoding: "text_uft-8",
}
- unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[cfg.Encoding]
- _, err := newLogsReceiver(cfg, receivertest.NewNopSettings(), unmarshaler, consumertest.NewNop())
+ r, err := newLogsReceiver(cfg, receivertest.NewNopSettings(), consumertest.NewNop())
+ require.NoError(t, err)
+ require.NotNil(t, r)
+ err = r.Start(context.Background(), componenttest.NewNopHost())
// encoding error comes first
assert.Error(t, err, "unsupported encoding")
}
+func TestLogsReceiver_encoding_extension(t *testing.T) {
+ zcore, logObserver := observer.New(zapcore.ErrorLevel)
+ logger := zap.New(zcore)
+ settings := receivertest.NewNopSettings()
+ settings.Logger = logger
+
+ expectedErr := errors.New("handler error")
+ c := kafkaLogsConsumer{
+ config: Config{Encoding: "logs_encoding"},
+ nextConsumer: consumertest.NewNop(),
+ settings: settings,
+ consumerGroup: &testConsumerGroup{err: expectedErr},
+ telemetryBuilder: nopTelemetryBuilder(t),
+ }
+
+ require.NoError(t, c.Start(context.Background(), &testComponentHost{}))
+ require.NoError(t, c.Shutdown(context.Background()))
+ assert.Eventually(t, func() bool {
+ return logObserver.FilterField(zap.Error(expectedErr)).Len() > 0
+ }, 10*time.Second, time.Millisecond*100)
+}
+
func TestToSaramaInitialOffset_earliest(t *testing.T) {
saramaInitialOffset, err := toSaramaInitialOffset(offsetEarliest)
@@ -1304,3 +1377,64 @@ func nopTelemetryBuilder(t *testing.T) *metadata.TelemetryBuilder {
require.NoError(t, err)
return telemetryBuilder
}
+
+func TestLoadEncodingExtension_logs(t *testing.T) {
+ extension, err := LoadEncodingExtension[plog.Unmarshaler](&testComponentHost{}, "logs_encoding")
+ require.NoError(t, err)
+ require.NotNil(t, extension)
+}
+
+func TestLoadEncodingExtension_notfound_error(t *testing.T) {
+ extension, err := LoadEncodingExtension[plog.Unmarshaler](&testComponentHost{}, "logs_notfound")
+ require.Error(t, err)
+ require.Nil(t, extension)
+}
+
+func TestLoadEncodingExtension_nounmarshaler_error(t *testing.T) {
+ extension, err := LoadEncodingExtension[plog.Unmarshaler](&testComponentHost{}, "logs_nounmarshaler")
+ require.Error(t, err)
+ require.Nil(t, extension)
+}
+
+type testComponentHost struct{}
+
+func (h *testComponentHost) GetExtensions() map[component.ID]component.Component {
+ return map[component.ID]component.Component{
+ component.MustNewID("logs_encoding"): &nopComponent{},
+ component.MustNewID("logs_nounmarshaler"): &nopNoUnmarshalerComponent{},
+ component.MustNewID("metrics_encoding"): &nopComponent{},
+ component.MustNewID("traces_encoding"): &nopComponent{},
+ }
+}
+
+type nopComponent struct{}
+
+func (c *nopComponent) Start(_ context.Context, _ component.Host) error {
+ return nil
+}
+
+func (c *nopComponent) Shutdown(_ context.Context) error {
+ return nil
+}
+
+func (c *nopComponent) UnmarshalLogs(_ []byte) (plog.Logs, error) {
+ return plog.NewLogs(), nil
+}
+
+func (c *nopComponent) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) {
+ return pmetric.NewMetrics(), nil
+}
+
+func (c *nopComponent) UnmarshalTraces(_ []byte) (ptrace.Traces, error) {
+ return ptrace.NewTraces(), nil
+}
+
+type nopNoUnmarshalerComponent struct{}
+
+func (c *nopNoUnmarshalerComponent) Start(_ context.Context, _ component.Host) error {
+ return nil
+}
+
+func (c *nopNoUnmarshalerComponent) Shutdown(_ context.Context) error {
+ return nil
+}
diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go
index bf44be7b496e..dbf029e63563 100644
--- a/receiver/kafkareceiver/unmarshaler.go
+++ b/receiver/kafkareceiver/unmarshaler.go
@@ -86,3 +86,45 @@ func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]Logs
json.Encoding(): json,
}
}
+
+// tracesEncodingUnmarshaler is a wrapper around ptrace.Unmarshaler that implements TracesUnmarshaler.
+type tracesEncodingUnmarshaler struct {
+ unmarshaler ptrace.Unmarshaler
+ encoding string
+}
+
+func (t *tracesEncodingUnmarshaler) Unmarshal(data []byte) (ptrace.Traces, error) {
+ return t.unmarshaler.UnmarshalTraces(data)
+}
+
+func (t *tracesEncodingUnmarshaler) Encoding() string {
+ return t.encoding
+}
+
+// metricsEncodingUnmarshaler is a wrapper around pmetric.Unmarshaler that implements MetricsUnmarshaler.
+type metricsEncodingUnmarshaler struct {
+ unmarshaler pmetric.Unmarshaler
+ encoding string
+}
+
+func (m *metricsEncodingUnmarshaler) Unmarshal(data []byte) (pmetric.Metrics, error) {
+ return m.unmarshaler.UnmarshalMetrics(data)
+}
+
+func (m *metricsEncodingUnmarshaler) Encoding() string {
+ return m.encoding
+}
+
+// logsEncodingUnmarshaler is a wrapper around plog.Unmarshaler that implements LogsUnmarshaler.
+type logsEncodingUnmarshaler struct {
+ unmarshaler plog.Unmarshaler
+ encoding string
+}
+
+func (l *logsEncodingUnmarshaler) Unmarshal(data []byte) (plog.Logs, error) {
+ return l.unmarshaler.UnmarshalLogs(data)
+}
+
+func (l *logsEncodingUnmarshaler) Encoding() string {
+ return l.encoding
+}