diff --git a/.chloggen/otlp_logs.yaml b/.chloggen/otlp_logs.yaml new file mode 100644 index 000000000000..239dacb1b21e --- /dev/null +++ b/.chloggen/otlp_logs.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for `otlp_json` encoding to Kafka receiver. The payload is deserialized into OpenTelemetry traces using JSON format." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33627] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This encoding allows the Kafka receiver to handle trace data in JSON format, + enabling integration with systems that export traces as JSON-encoded data. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 98d8e56377f8..38c014fe8937 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -31,6 +31,7 @@ The following settings can be optionally configured: Only one telemetry type may be used for a given topic. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. + - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index dbf029e63563..793848d94c8f 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -17,7 +17,6 @@ import ( type TracesUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (ptrace.Traces, error) - // Encoding of the serialized messages. Encoding() string } @@ -26,7 +25,6 @@ type TracesUnmarshaler interface { type MetricsUnmarshaler interface { // Unmarshal deserializes the message body into traces Unmarshal([]byte) (pmetric.Metrics, error) - // Encoding of the serialized messages Encoding() string } @@ -35,14 +33,12 @@ type MetricsUnmarshaler interface { type LogsUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (plog.Logs, error) - // Encoding of the serialized messages. Encoding() string } type LogsUnmarshalerWithEnc interface { LogsUnmarshaler - // WithEnc sets the character encoding (UTF-8, GBK, etc.) of the unmarshaler. WithEnc(string) (LogsUnmarshalerWithEnc, error) } @@ -50,6 +46,7 @@ type LogsUnmarshalerWithEnc interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, "otlp_json") jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") @@ -57,6 +54,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") return map[string]TracesUnmarshaler{ otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, @@ -67,20 +65,24 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, "otlp_json") return map[string]MetricsUnmarshaler{ - otlpPb.Encoding(): otlpPb, + otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, } } func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, "otlp_json") raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() return map[string]LogsUnmarshaler{ azureResourceLogs.Encoding(): azureResourceLogs, otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, raw.Encoding(): raw, text.Encoding(): text, json.Encoding(): json, diff --git a/receiver/kafkareceiver/unmarshaler_test.go b/receiver/kafkareceiver/unmarshaler_test.go index fd1f998ee0a7..bb86ab8dfcd5 100644 --- a/receiver/kafkareceiver/unmarshaler_test.go +++ b/receiver/kafkareceiver/unmarshaler_test.go @@ -14,6 +14,7 @@ import ( func TestDefaultTracesUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "jaeger_proto", "jaeger_json", "zipkin_proto", @@ -34,6 +35,7 @@ func TestDefaultTracesUnMarshaler(t *testing.T) { func TestDefaultMetricsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", } marshalers := defaultMetricsUnmarshalers() assert.Equal(t, len(expectedEncodings), len(marshalers)) @@ -49,6 +51,7 @@ func TestDefaultMetricsUnMarshaler(t *testing.T) { func TestDefaultLogsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "raw", "text", "json",