Skip to content

Commit

Permalink
otelcolconvert: support converting probabilistic_sampler processor (#…
Browse files Browse the repository at this point in the history
…6481)

Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Feb 22, 2024
1 parent c5bfe59 commit c58709a
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, probabilisticSamplerProcessorConverter{})
}

type probabilisticSamplerProcessorConverter struct{}

func (probabilisticSamplerProcessorConverter) Factory() component.Factory {
return probabilisticsamplerprocessor.NewFactory()
}

func (probabilisticSamplerProcessorConverter) InputComponentName() string {
return "otelcol.processor.probabilistic_sampler"
}

func (probabilisticSamplerProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toProbabilisticSamplerProcessor(state, id, cfg.(*probabilisticsamplerprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "probabilistic_sampler"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toProbabilisticSamplerProcessor(state *state, id component.InstanceID, cfg *probabilisticsamplerprocessor.Config) *probabilistic_sampler.Arguments {
var (
nextTraces = state.Next(id, component.DataTypeTraces)
nextLogs = state.Next(id, component.DataTypeLogs)
)

return &probabilistic_sampler.Arguments{
SamplingPercentage: cfg.SamplingPercentage,
HashSeed: cfg.HashSeed,
AttributeSource: string(cfg.AttributeSource),
FromAttribute: cfg.FromAttribute,
SamplingPriority: cfg.SamplingPriority,
Output: &otelcol.ConsumerArguments{
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
},
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
otelcol.receiver.otlp "default" {
grpc { }

http { }

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.processor.probabilistic_sampler.default.input]
traces = [otelcol.processor.probabilistic_sampler.default.input]
}
}

otelcol.processor.probabilistic_sampler "default" {
output {
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
receivers:
otlp:
protocols:
grpc:
http:

exporters:
otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

processors:
probabilistic_sampler:

service:
pipelines:
metrics:
receivers: [otlp]
processors: []
exporters: [otlp]
logs:
receivers: [otlp]
processors: [probabilistic_sampler]
exporters: [otlp]
traces:
receivers: [otlp]
processors: [probabilistic_sampler]
exporters: [otlp]

0 comments on commit c58709a

Please sign in to comment.