-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
metrics.go
79 lines (69 loc) · 2.49 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otlpjsonconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpjsonconnector"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
type connectorMetrics struct {
config Config
metricsConsumer consumer.Metrics
logger *zap.Logger
component.StartFunc
component.ShutdownFunc
}
// newMetricsConnector is a function to create a new connector for metrics extraction
func newMetricsConnector(set connector.Settings, config component.Config, metricsConsumer consumer.Metrics) *connectorMetrics {
set.TelemetrySettings.Logger.Info("Building otlpjson connector for metrics")
cfg := config.(*Config)
return &connectorMetrics{
config: *cfg,
logger: set.TelemetrySettings.Logger,
metricsConsumer: metricsConsumer,
}
}
// Capabilities implements the consumer interface.
func (c *connectorMetrics) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeLogs method is called for each instance of a log sent to the connector
func (c *connectorMetrics) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
// loop through the levels of logs
metricsUnmarshaler := &pmetric.JSONUnmarshaler{}
for i := 0; i < pl.ResourceLogs().Len(); i++ {
li := pl.ResourceLogs().At(i)
for j := 0; j < li.ScopeLogs().Len(); j++ {
logRecord := li.ScopeLogs().At(j)
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
value := token.AsString()
switch {
case metricRegex.MatchString(value):
var m pmetric.Metrics
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(value))
if err != nil {
c.logger.Error("could not extract metrics from otlp json", zap.Error(err))
continue
}
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
if err != nil {
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
}
case logRegex.MatchString(value), traceRegex.MatchString(value):
// If it's a log or trace payload, simply continue
continue
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
}
}
}
return nil
}