-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
traces.go
79 lines (69 loc) · 2.47 KB
/
traces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otlpjsonconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpjsonconnector"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)
type connectorTraces struct {
config Config
tracesConsumer consumer.Traces
logger *zap.Logger
component.StartFunc
component.ShutdownFunc
}
// newTracesConnector is a function to create a new connector for traces extraction
func newTracesConnector(set connector.Settings, config component.Config, tracesConsumer consumer.Traces) *connectorTraces {
set.TelemetrySettings.Logger.Info("Building otlpjson connector for traces")
cfg := config.(*Config)
return &connectorTraces{
config: *cfg,
logger: set.TelemetrySettings.Logger,
tracesConsumer: tracesConsumer,
}
}
// Capabilities implements the consumer interface.
func (c *connectorTraces) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeLogs method is called for each instance of a log sent to the connector
func (c *connectorTraces) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
// loop through the levels of logs
tracesUnmarshaler := &ptrace.JSONUnmarshaler{}
for i := 0; i < pl.ResourceLogs().Len(); i++ {
li := pl.ResourceLogs().At(i)
for j := 0; j < li.ScopeLogs().Len(); j++ {
logRecord := li.ScopeLogs().At(j)
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
value := token.AsString()
switch {
case traceRegex.MatchString(value):
var t ptrace.Traces
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(value))
if err != nil {
c.logger.Error("could not extract traces from otlp json", zap.Error(err))
continue
}
err = c.tracesConsumer.ConsumeTraces(ctx, t)
if err != nil {
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
}
case metricRegex.MatchString(value), logRegex.MatchString(value):
// If it's a metric or log payload, continue to the next iteration
continue
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
}
}
}
return nil
}