Skip to content

Commit

Permalink
beater/otlp: fix panic in parallel tests (#4911)
Browse files Browse the repository at this point in the history
When running beater in parallel tests (TestPublishIntegration),
beater/otlp code was sometimes panicking due to a race to
register metrics in the global monitoring registry.
  • Loading branch information
axw authored Mar 4, 2021
1 parent 0d5f795 commit 11296f1
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions beater/otlp/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package otlp

import (
"context"
"sync"

"github.com/pkg/errors"
"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -45,21 +46,21 @@ var (
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
)

func init() {
monitoring.NewFunc(gRPCMetricsRegistry, "consumer", collectMetricsMonitoring, monitoring.Report)
}

// RegisterGRPCServices registers OTLP consumer services with the given gRPC server.
func RegisterGRPCServices(grpcServer *grpc.Server, processor model.BatchProcessor, logger *logp.Logger) error {
consumer := &monitoredConsumer{
consumer: &otel.Consumer{Processor: processor},
logger: logger,
}

// TODO(axw) rather than registering and unregistering monitoring callbacks
// each time a new consumer is created, we should register one callback and
// have it aggregate metrics from the dynamic consumers.
//
// For now, we take the easy way out: we only have one OTLP gRPC service
// running at any time, so just unregister/register a new one.
gRPCMetricsRegistry.Remove("consumer")
monitoring.NewFunc(gRPCMetricsRegistry, "consumer", consumer.collectMetricsMonitoring, monitoring.Report)
// TODO(axw) stop assuming we have only one OTLP gRPC service running
// at any time, and instead aggregate metrics from consumers that are
// dynamically registered and unregistered.
setCurrentMonitoredConsumer(consumer)

traceReceiver := trace.New("otlp", consumer)
metricsReceiver := metrics.New("otlp", consumer)
Expand Down Expand Up @@ -112,3 +113,23 @@ func (c *monitoredConsumer) collectMetricsMonitoring(_ monitoring.Mode, V monito
monitoring.ReportInt(V, "unsupported_dropped", stats.UnsupportedMetricsDropped)
})
}

var (
currentMonitoredConsumerMu sync.RWMutex
currentMonitoredConsumer *monitoredConsumer
)

func setCurrentMonitoredConsumer(c *monitoredConsumer) {
currentMonitoredConsumerMu.Lock()
defer currentMonitoredConsumerMu.Unlock()
currentMonitoredConsumer = c
}

func collectMetricsMonitoring(mode monitoring.Mode, V monitoring.Visitor) {
currentMonitoredConsumerMu.RLock()
c := currentMonitoredConsumer
currentMonitoredConsumerMu.RUnlock()
if c != nil {
c.collectMetricsMonitoring(mode, V)
}
}

0 comments on commit 11296f1

Please sign in to comment.