From b37e8a9860f03b78baf2c3ca0edcbc6c7f8fd969 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Fri, 6 Sep 2024 16:42:55 -0700 Subject: [PATCH] `SetMeterProvider` might miss the delegation for instruments and registries (#5780) Closes #5757 This PR fixes an issue where `SetMeterProvider` might miss the delegation for instruments and registries. This bug brings a concurrent issue that could possibly make instruments and registries unable to operate correctly, such as recording, after using the `SetMeterProvider` method. The data put on these instruments and registries might be lost. --- CHANGELOG.md | 1 + internal/global/meter.go | 133 ++++++++++++++++++++-------------- internal/global/meter_test.go | 15 +++- 3 files changed, 92 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 258dd7faf2e..0a923abd899 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754) - Fix panic instruments creation when setting meter provider. (#5758) +- Fix an issue where `SetMeterProvider` in `go.opentelemetry.io/otel` might miss the delegation for instruments and registries. (#5780) ### Removed diff --git a/internal/global/meter.go b/internal/global/meter.go index c45248e64d1..f2fc3929b11 100644 --- a/internal/global/meter.go +++ b/internal/global/meter.go @@ -7,7 +7,6 @@ import ( "container/list" "reflect" "sync" - "sync/atomic" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" @@ -97,7 +96,7 @@ type meter struct { registry list.List - delegate atomic.Value // metric.Meter + delegate metric.Meter } type delegatedInstrument interface { @@ -123,12 +122,12 @@ type instID struct { // // It is guaranteed by the caller that this happens only once. func (m *meter) setDelegate(provider metric.MeterProvider) { - meter := provider.Meter(m.name, m.opts...) - m.delegate.Store(meter) - m.mtx.Lock() defer m.mtx.Unlock() + meter := provider.Meter(m.name, m.opts...) + m.delegate = meter + for _, inst := range m.instruments { inst.setDelegate(meter) } @@ -141,16 +140,18 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { m.registry.Remove(e) } - clear(m.instruments) + m.instruments = nil m.registry.Init() } func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64Counter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64Counter(name, options...) + } + i := &siCounter{name: name, opts: options} cfg := metric.NewInt64CounterConfig(options...) id := instID{ @@ -164,11 +165,13 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) } func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64UpDownCounter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64UpDownCounter(name, options...) + } + i := &siUpDownCounter{name: name, opts: options} cfg := metric.NewInt64UpDownCounterConfig(options...) id := instID{ @@ -182,11 +185,13 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou } func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64Histogram(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64Histogram(name, options...) + } + i := &siHistogram{name: name, opts: options} cfg := metric.NewInt64HistogramConfig(options...) id := instID{ @@ -200,11 +205,13 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti } func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64Gauge(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64Gauge(name, options...) + } + i := &siGauge{name: name, opts: options} cfg := metric.NewInt64GaugeConfig(options...) id := instID{ @@ -218,11 +225,13 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met } func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64ObservableCounter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64ObservableCounter(name, options...) + } + i := &aiCounter{name: name, opts: options} cfg := metric.NewInt64ObservableCounterConfig(options...) id := instID{ @@ -236,11 +245,13 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser } func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64ObservableUpDownCounter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64ObservableUpDownCounter(name, options...) + } + i := &aiUpDownCounter{name: name, opts: options} cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) id := instID{ @@ -254,11 +265,13 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 } func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Int64ObservableGauge(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Int64ObservableGauge(name, options...) + } + i := &aiGauge{name: name, opts: options} cfg := metric.NewInt64ObservableGaugeConfig(options...) id := instID{ @@ -272,11 +285,13 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa } func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64Counter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64Counter(name, options...) + } + i := &sfCounter{name: name, opts: options} cfg := metric.NewFloat64CounterConfig(options...) id := instID{ @@ -290,11 +305,13 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti } func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64UpDownCounter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64UpDownCounter(name, options...) + } + i := &sfUpDownCounter{name: name, opts: options} cfg := metric.NewFloat64UpDownCounterConfig(options...) id := instID{ @@ -308,11 +325,13 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow } func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64Histogram(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64Histogram(name, options...) + } + i := &sfHistogram{name: name, opts: options} cfg := metric.NewFloat64HistogramConfig(options...) id := instID{ @@ -326,11 +345,13 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram } func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64Gauge(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64Gauge(name, options...) + } + i := &sfGauge{name: name, opts: options} cfg := metric.NewFloat64GaugeConfig(options...) id := instID{ @@ -344,11 +365,13 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) } func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64ObservableCounter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64ObservableCounter(name, options...) + } + i := &afCounter{name: name, opts: options} cfg := metric.NewFloat64ObservableCounterConfig(options...) id := instID{ @@ -362,11 +385,13 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O } func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64ObservableUpDownCounter(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64ObservableUpDownCounter(name, options...) + } + i := &afUpDownCounter{name: name, opts: options} cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) id := instID{ @@ -380,11 +405,13 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl } func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - return del.Float64ObservableGauge(name, options...) - } m.mtx.Lock() defer m.mtx.Unlock() + + if m.delegate != nil { + return m.delegate.Float64ObservableGauge(name, options...) + } + i := &afGauge{name: name, opts: options} cfg := metric.NewFloat64ObservableGaugeConfig(options...) id := instID{ @@ -399,14 +426,14 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs // RegisterCallback captures the function that will be called during Collect. func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) { - if del, ok := m.delegate.Load().(metric.Meter); ok { - insts = unwrapInstruments(insts) - return del.RegisterCallback(f, insts...) - } - m.mtx.Lock() defer m.mtx.Unlock() + if m.delegate != nil { + insts = unwrapInstruments(insts) + return m.delegate.RegisterCallback(f, insts...) + } + reg := ®istration{instruments: insts, function: f} e := m.registry.PushBack(reg) reg.unreg = func() error { diff --git a/internal/global/meter_test.go b/internal/global/meter_test.go index a99213e3a9b..d3dd37e838f 100644 --- a/internal/global/meter_test.go +++ b/internal/global/meter_test.go @@ -83,6 +83,12 @@ func TestMeterConcurrentSafe(t *testing.T) { mtr.setDelegate(noop.NewMeterProvider()) close(finish) <-done + + // No instruments should be left after the meter is replaced. + assert.Empty(t, mtr.instruments) + + // No callbacks should be left after the meter is replaced. + assert.Zero(t, mtr.registry.Len()) } func TestUnregisterConcurrentSafe(t *testing.T) { @@ -162,8 +168,9 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (metric.Float64Co // This is to emulate a read from an exporter. func testCollect(t *testing.T, m metric.Meter) { if tMeter, ok := m.(*meter); ok { - m, ok = tMeter.delegate.Load().(metric.Meter) - if !ok { + // This changes the input m to the delegate. + m = tMeter.delegate + if m == nil { t.Error("meter was not delegated") return } @@ -261,7 +268,7 @@ func TestMeterDelegatesCalls(t *testing.T) { // Calls to Meter methods after setDelegate() should be executed by the delegate require.IsType(t, &meter{}, m) - tMeter := m.(*meter).delegate.Load().(*testMeter) + tMeter := m.(*meter).delegate.(*testMeter) require.NotNil(t, tMeter) assert.Equal(t, 1, tMeter.afCount) assert.Equal(t, 1, tMeter.afUDCount) @@ -309,7 +316,7 @@ func TestMeterDefersDelegations(t *testing.T) { // Calls to Meter() before setDelegate() should be the delegated type require.IsType(t, &meter{}, m) - tMeter := m.(*meter).delegate.Load().(*testMeter) + tMeter := m.(*meter).delegate.(*testMeter) require.NotNil(t, tMeter) assert.Equal(t, 1, tMeter.afCount) assert.Equal(t, 1, tMeter.afUDCount)