Skip to content

Commit

Permalink
[receiver/discovery] Fix metrics-only collection (#5359)
Browse files Browse the repository at this point in the history
I missed some conditions in previous PR. If the receiver was configured on metrics pipeline only with some discovered services, it would block collector shutdown's. This change fixes that. It's not a typical use case to enable it on metrics pipeline only, so we don't need to test it extensively. I'll be covered to some extend in the following PRs.
  • Loading branch information
dmitryax authored Sep 13, 2024
1 parent 7c0adeb commit 5208240
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
12 changes: 8 additions & 4 deletions internal/receiver/discoveryreceiver/metric_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type metricsConsumer struct {
}

func newMetricsConsumer(logger *zap.Logger, cfg *Config, correlations *correlationStore, nextConsumer consumer.Metrics) *metricsConsumer {
return &metricsConsumer{
evaluator: newEvaluator(logger, cfg, correlations,
mc := &metricsConsumer{nextConsumer: nextConsumer}
if correlations != nil {
mc.evaluator = newEvaluator(logger, cfg, correlations,
// TODO: provide more capable env w/ resource and metric attributes
func(pattern string) map[string]any {
return map[string]any{"name": pattern}
}),
nextConsumer: nextConsumer,
})
}
return mc
}

func (m *metricsConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -71,6 +72,9 @@ func (m *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics
// evaluateMetrics parses the provided Metrics and returns plog.Logs with a single log record if it matches
// against the first applicable configured Status match rule.
func (m *metricsConsumer) evaluateMetrics(md pmetric.Metrics) {
if m.evaluator == nil {
return
}
if ce := m.logger.Check(zapcore.DebugLevel, "evaluating metrics"); ce != nil {
if mbytes, err := jsonMarshaler.MarshalMetrics(md); err == nil {
ce.Write(zap.ByteString("metrics", mbytes))
Expand Down
45 changes: 27 additions & 18 deletions internal/receiver/discoveryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,21 @@ func (d *discoveryReceiver) Start(ctx context.Context, host component.Host) (err
return fmt.Errorf("failed obtaining observables from host: %w", err)
}

correlations := newCorrelationStore(d.logger, d.config.CorrelationTTL)
d.endpointTracker = newEndpointTracker(d.observables, d.config, d.logger, d.pLogs, correlations)
d.endpointTracker.start()

d.metricsConsumer = newMetricsConsumer(d.logger, d.config, correlations, d.nextMetricsConsumer)
var correlations *correlationStore
if d.nextLogsConsumer != nil {
correlations = newCorrelationStore(d.logger, d.config.CorrelationTTL)
if d.nextLogsConsumer != nil {
d.endpointTracker = newEndpointTracker(d.observables, d.config, d.logger, d.pLogs, correlations)
d.endpointTracker.start()
}

if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, correlations); err != nil {
return fmt.Errorf("failed creating statement evaluator: %w", err)
if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, correlations); err != nil {
return fmt.Errorf("failed creating statement evaluator: %w", err)
}
}

d.metricsConsumer = newMetricsConsumer(d.logger, d.config, correlations, d.nextMetricsConsumer)

if err = d.createAndSetReceiverCreator(); err != nil {
return fmt.Errorf("failed creating internal receiver_creator: %w", err)
}
Expand Down Expand Up @@ -172,18 +177,22 @@ func (d *discoveryReceiver) createAndSetReceiverCreator() error {
return err
}
id := component.MustNewIDWithName(receiverCreatorFactory.Type().String(), d.settings.ID.String())
// receiverCreatorConfig.SetIDName(d.settings.ID.String())
ts := component.TelemetrySettings{
Logger: d.logger,
TracerProvider: tnoop.NewTracerProvider(),
LeveledMeterProvider: func(configtelemetry.Level) metric.MeterProvider { return mnoop.NewMeterProvider() },
}
if d.statementEvaluator != nil {
// TODO: Introduce a wrapper logger that combines the receiver_creator logger with the statement evaluator logger
// in a way that we avoid flooding the logs with errors but still provide enough information to debug issues.
ts.Logger = d.statementEvaluator.evaluatedLogger.With(
zap.String("kind", "receiver"),
zap.String("name", id.String()),
)
}
receiverCreatorSettings := receiver.Settings{
ID: id,
TelemetrySettings: component.TelemetrySettings{
Logger: d.statementEvaluator.evaluatedLogger.With(
zap.String("kind", "receiver"),
zap.String("name", id.String()),
),
TracerProvider: tnoop.NewTracerProvider(),
LeveledMeterProvider: func(configtelemetry.Level) metric.MeterProvider { return mnoop.NewMeterProvider() },
MetricsLevel: configtelemetry.LevelDetailed,
},
ID: id,
TelemetrySettings: ts,
BuildInfo: component.BuildInfo{
Command: "discovery",
Version: "latest",
Expand Down

0 comments on commit 5208240

Please sign in to comment.