Skip to content

Commit

Permalink
[processorhelper] Add throughput metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 10, 2024
1 parent 65bebec commit c30fa02
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 27 deletions.
48 changes: 48 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ Number of spans that were dropped.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_incoming_log_record_size

Total size of log records passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_incoming_log_records

Number of log records passed to the processor.
Expand All @@ -62,6 +70,14 @@ Number of log records passed to the processor.
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_processor_incoming_metric_point_size

Total size of metric points passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_incoming_metric_points

Number of metric points passed to the processor.
Expand All @@ -70,6 +86,14 @@ Number of metric points passed to the processor.
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_processor_incoming_span_size

Total size of spans passed to the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_incoming_spans

Number of spans passed to the processor.
Expand Down Expand Up @@ -102,6 +126,14 @@ Number of spans that were inserted.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_processor_outgoing_log_record_size

Total size of log records emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_outgoing_log_records

Number of log records emitted from the processor.
Expand All @@ -110,6 +142,14 @@ Number of log records emitted from the processor.
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_processor_outgoing_metric_point_size

Total size of metric points emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_outgoing_metric_points

Number of metric points emitted from the processor.
Expand All @@ -118,6 +158,14 @@ Number of metric points emitted from the processor.
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_processor_outgoing_span_size

Total size of spans emitted from the processor.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_processor_outgoing_spans

Number of spans emitted from the processor.
Expand Down
83 changes: 63 additions & 20 deletions processor/processorhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -49,10 +50,16 @@ func NewLogsProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
marshaler := new(plog.ProtoMarshaler)
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
var recordsIn, recordsOut, bytesIn, bytesOut int

span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
recordsIn := ld.LogRecordCount()
recordsIn = ld.LogRecordCount()
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesIn = marshaler.LogsSize(ld)

Check warning on line 61 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L61

Added line #L61 was not covered by tests
}

ld, err = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
Expand All @@ -62,8 +69,12 @@ func NewLogsProcessor(
}
return err
}
recordsOut := ld.LogRecordCount()
recordsOut = ld.LogRecordCount()
obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut)
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesOut = marshaler.LogsSize(ld)
obs.recordInOutSize(ctx, component.DataTypeLogs, bytesIn, bytesOut)

Check warning on line 76 in processor/processorhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/logs.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,60 @@ telemetry:
value_type: int
monotonic: true

processor_incoming_span_size:
enabled: true
level: detailed
description: Total size of spans passed to the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_outgoing_span_size:
enabled: true
level: detailed
description: Total size of spans emitted from the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_incoming_metric_point_size:
enabled: true
level: detailed
description: Total size of metric points passed to the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_outgoing_metric_point_size:
enabled: true
level: detailed
description: Total size of metric points emitted from the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_incoming_log_record_size:
enabled: true
level: detailed
description: Total size of log records passed to the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_outgoing_log_record_size:
enabled: true
level: detailed
description: Total size of log records emitted from the processor.
unit: "By"
sum:
value_type: int
monotonic: true

processor_accepted_spans:
enabled: true
description: Number of spans successfully pushed into the next component in the pipeline.
Expand Down
16 changes: 13 additions & 3 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -49,11 +50,16 @@ func NewMetricsProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
marshaler := new(pmetric.ProtoMarshaler)
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
var pointsIn, pointsOut, bytesIn, bytesOut int

span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
pointsIn := md.DataPointCount()

pointsIn = md.DataPointCount()
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesIn = marshaler.MetricsSize(md)

Check warning on line 61 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L61

Added line #L61 was not covered by tests
}
md, err = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
Expand All @@ -62,8 +68,12 @@ func NewMetricsProcessor(
}
return err
}
pointsOut := md.DataPointCount()
pointsOut = md.DataPointCount()
obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut)
if obs.metricLevel >= configtelemetry.LevelDetailed {
bytesOut = marshaler.MetricsSize(md)
obs.recordInOutSize(ctx, component.DataTypeTraces, bytesIn, bytesOut)

Check warning on line 75 in processor/processorhelper/metrics.go

View check run for this annotation

Codecov / codecov/patch

processor/processorhelper/metrics.go#L74-L75

Added lines #L74 - L75 were not covered by tests
}
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
Expand Down
Loading

0 comments on commit c30fa02

Please sign in to comment.