Skip to content

Commit

Permalink
Merge pull request #132 from ggambetti/ggambetti/metrics-shutdown
Browse files Browse the repository at this point in the history
Adds Shutdown to Metrics API
  • Loading branch information
banks authored Apr 26, 2022
2 parents d1e5690 + 4fbf9d3 commit 5d4d6f5
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 6 deletions.
9 changes: 9 additions & 0 deletions circonus/circonus.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m
s.metrics.RecordValue(flatKey, float64(val))
}

// Shutdown blocks while flushing metrics to the backend.
func (s *CirconusSink) Shutdown() {
// The version of circonus metrics in go.mod (v2.3.1), and the current
// version (v3.4.6) do not support a shutdown operation. Instead we call
// Flush which blocks until metrics are submitted to storage, and then exit
// as the README examples do.
s.metrics.Flush()
}

// Flattens key to Circonus metric name
func (s *CirconusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "`")
Expand Down
7 changes: 7 additions & 0 deletions circonus/circonus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net/http/httptest"
"strings"
"testing"

"github.com/armon/go-metrics"
)

func TestNewCirconusSink(t *testing.T) {
Expand Down Expand Up @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) {

}
}

func TestMetricSinkInterface(t *testing.T) {
var cs *CirconusSink
_ = metrics.MetricSink(cs)
}
5 changes: 5 additions & 0 deletions datadog/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
}

// Shutdown disables further metric collection, blocks to flush data, and tears down the sink.
func (s *DogStatsdSink) Shutdown() {
s.client.Close()
}

func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) {
key, parsedLabels := s.parseKey(key)
flatKey := s.flattenKey(key)
Expand Down
5 changes: 5 additions & 0 deletions datadog/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte,
t.Fatalf("Line %s does not match expected: %s", string(msg), expected)
}
}

func TestMetricSinkInterface(t *testing.T) {
var dd *DogStatsdSink
_ = metrics.MetricSink(dd)
}
4 changes: 4 additions & 0 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe
agg.Ingest(float64(val), i.rateDenom)
}

func (i *InmemSink) Shutdown() {
// Do nothing. InmemSink does not have cleanup associated with shutdown.
}

// Data is used to retrieve all the aggregated metrics
// Intervals may be in use, and a read lock should be acquired
func (i *InmemSink) Data() []*IntervalMetrics {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe
}
}

func (m *Metrics) Shutdown() {
m.sink.Shutdown()
}

// labelIsAllowed return true if a should be included in metric
// the caller should lock m.filterLock while calling this method
func (m *Metrics) labelIsAllowed(label *Label) bool {
Expand Down
11 changes: 10 additions & 1 deletion prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build go1.9
// +build go1.9

package prometheus
Expand All @@ -20,7 +21,7 @@ var (
// PrometheusSink.
DefaultPrometheusOpts = PrometheusOpts{
Expiration: 60 * time.Second,
Name: "default_prometheus_sink",
Name: "default_prometheus_sink",
}
)

Expand Down Expand Up @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe
}
}

// Shutdown is not implemented. PrometheusSink is in memory storage.
func (p *PrometheusSink) Shutdown() {
}

// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
Expand Down Expand Up @@ -446,6 +451,10 @@ func (s *PrometheusPushSink) flushMetrics() {
}()
}

// Shutdown tears down the PrometheusPushSink, and blocks while flushing metrics to the backend.
func (s *PrometheusPushSink) Shutdown() {
close(s.stopChan)
// Closing the channel only stops the running goroutine that pushes metrics.
// To minimize the chance of data loss pusher.Push is called one last time.
s.pusher.Push()
}
12 changes: 10 additions & 2 deletions prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) {
t.Fatalf("Unregister(sink) = false, want true")
}
}

// TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors
func TestMultiplePrometheusSink(t *testing.T) {
gaugeDef := GaugeDefinition{
Expand All @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) {
GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef),
SummaryDefinitions: append([]SummaryDefinition{}),
CounterDefinitions: append([]CounterDefinition{}),
Name: "sink1",
Name: "sink1",
}

sink1, err := NewPrometheusSinkFrom(cfg)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}

reg := prometheus.DefaultRegisterer
if reg == nil {
t.Fatalf("Expected default register to be non nil, got nil.")
Expand Down Expand Up @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) {
return true
})
}

func TestMetricSinkInterface(t *testing.T) {
var ps *PrometheusSink
_ = metrics.MetricSink(ps)
var pps *PrometheusPushSink
_ = metrics.MetricSink(pps)
}
12 changes: 12 additions & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type MetricSink interface {
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)

// Shutdown the metric sink, flush metrics to storage, and cleanup resources.
// Called immediately prior to application exit. Implementations must block
// until metrics are flushed to storage.
Shutdown()
}

// BlackholeSink is used to just blackhole messages
Expand All @@ -34,6 +39,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32)
func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) Shutdown() {}

// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
Expand Down Expand Up @@ -74,6 +80,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab
}
}

func (fh FanoutSink) Shutdown() {
for _, s := range fh {
s.Shutdown()
}
}

// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
Expand Down
13 changes: 10 additions & 3 deletions sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
type MockSink struct {
lock sync.Mutex

keys [][]string
vals []float32
labels [][]Label
shutdown bool
keys [][]string
vals []float32
labels [][]Label
}

func (m *MockSink) getKeys() [][]string {
Expand Down Expand Up @@ -63,6 +64,12 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label
m.vals = append(m.vals, val)
m.labels = append(m.labels, labels)
}
func (m *MockSink) Shutdown() {
m.lock.Lock()
defer m.lock.Unlock()

m.shutdown = true
}

func TestFanoutSink_Gauge(t *testing.T) {
m1 := &MockSink{}
Expand Down
12 changes: 12 additions & 0 deletions start.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,15 @@ func UpdateFilter(allow, block []string) {
func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
}

// Shutdown disables metric collection, then blocks while attempting to flush metrics to storage.
// WARNING: Not all MetricSink backends support this functionality, and calling this will cause them to leak resources.
// This is intended for use immediately prior to application exit.
func Shutdown() {
m := globalMetrics.Load().(*Metrics)
// Swap whatever MetricSink is currently active with a BlackholeSink. Callers must not have a
// reason to expect that calls to the library will successfully collect metrics after Shutdown
// has been called.
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
m.Shutdown()
}
20 changes: 20 additions & 0 deletions start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,26 @@ func Test_GlobalMetrics_UpdateFilter(t *testing.T) {
}
}

func Test_GlobalMetrics_Shutdown(t *testing.T) {
s := &MockSink{}
m := &Metrics{sink: s}
globalMetrics.Store(m)

Shutdown()

loaded := globalMetrics.Load()
metrics, ok := loaded.(*Metrics)
if !ok {
t.Fatalf("Expected globalMetrics to contain a Metrics pointer, but found: %v", loaded)
}
if metrics == m {
t.Errorf("Calling shutdown should have replaced the Metrics struct stored in globalMetrics")
}
if !s.shutdown {
t.Errorf("Expected Shutdown to have been called on MockSink")
}
}

// Benchmark_GlobalMetrics_Direct/direct-8 5000000 278 ns/op
// Benchmark_GlobalMetrics_Direct/atomic.Value-8 5000000 235 ns/op
func Benchmark_GlobalMetrics_Direct(b *testing.B) {
Expand Down

0 comments on commit 5d4d6f5

Please sign in to comment.