From 01c66516afe7328aca8ca3dbb6a55075830f3693 Mon Sep 17 00:00:00 2001 From: Harshith Mente <109957201+joeyyy09@users.noreply.github.com> Date: Mon, 16 Sep 2024 14:50:09 +0530 Subject: [PATCH] [receiver/kafkareceiver] Add otlp_json support in kafka receiver (#34840) **Description:** The current features dont support otlp_json in receivers/kafkareceivers. Add support for otlp_json which accepts json formated for Otel Collector kafka receiver **Link to tracking Issue:** #33627 **Testing:** Added test files for the same. **Documentation:** --------- Signed-off-by: joeyyy09 Co-authored-by: Ziqi Zhao Co-authored-by: Yuri Shkuro --- .chloggen/otlp_logs.yaml | 29 ++++++++++++++++++++++ receiver/kafkareceiver/README.md | 1 + receiver/kafkareceiver/unmarshaler.go | 12 +++++---- receiver/kafkareceiver/unmarshaler_test.go | 3 +++ 4 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 .chloggen/otlp_logs.yaml diff --git a/.chloggen/otlp_logs.yaml b/.chloggen/otlp_logs.yaml new file mode 100644 index 000000000000..239dacb1b21e --- /dev/null +++ b/.chloggen/otlp_logs.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for `otlp_json` encoding to Kafka receiver. The payload is deserialized into OpenTelemetry traces using JSON format." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33627] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This encoding allows the Kafka receiver to handle trace data in JSON format, + enabling integration with systems that export traces as JSON-encoded data. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 98d8e56377f8..38c014fe8937 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -31,6 +31,7 @@ The following settings can be optionally configured: Only one telemetry type may be used for a given topic. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. + - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index dbf029e63563..793848d94c8f 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -17,7 +17,6 @@ import ( type TracesUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (ptrace.Traces, error) - // Encoding of the serialized messages. Encoding() string } @@ -26,7 +25,6 @@ type TracesUnmarshaler interface { type MetricsUnmarshaler interface { // Unmarshal deserializes the message body into traces Unmarshal([]byte) (pmetric.Metrics, error) - // Encoding of the serialized messages Encoding() string } @@ -35,14 +33,12 @@ type MetricsUnmarshaler interface { type LogsUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (plog.Logs, error) - // Encoding of the serialized messages. Encoding() string } type LogsUnmarshalerWithEnc interface { LogsUnmarshaler - // WithEnc sets the character encoding (UTF-8, GBK, etc.) of the unmarshaler. WithEnc(string) (LogsUnmarshalerWithEnc, error) } @@ -50,6 +46,7 @@ type LogsUnmarshalerWithEnc interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, "otlp_json") jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") @@ -57,6 +54,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") return map[string]TracesUnmarshaler{ otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, @@ -67,20 +65,24 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, "otlp_json") return map[string]MetricsUnmarshaler{ - otlpPb.Encoding(): otlpPb, + otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, } } func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, "otlp_json") raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() return map[string]LogsUnmarshaler{ azureResourceLogs.Encoding(): azureResourceLogs, otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, raw.Encoding(): raw, text.Encoding(): text, json.Encoding(): json, diff --git a/receiver/kafkareceiver/unmarshaler_test.go b/receiver/kafkareceiver/unmarshaler_test.go index fd1f998ee0a7..bb86ab8dfcd5 100644 --- a/receiver/kafkareceiver/unmarshaler_test.go +++ b/receiver/kafkareceiver/unmarshaler_test.go @@ -14,6 +14,7 @@ import ( func TestDefaultTracesUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "jaeger_proto", "jaeger_json", "zipkin_proto", @@ -34,6 +35,7 @@ func TestDefaultTracesUnMarshaler(t *testing.T) { func TestDefaultMetricsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", } marshalers := defaultMetricsUnmarshalers() assert.Equal(t, len(expectedEncodings), len(marshalers)) @@ -49,6 +51,7 @@ func TestDefaultMetricsUnMarshaler(t *testing.T) { func TestDefaultLogsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "raw", "text", "json",