From 0b67271aa15f5531a429d7d4598c705afefb4a55 Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Tue, 17 Mar 2020 13:49:25 -0600 Subject: [PATCH] [Beta] Adding client side aggregation to the client This should reduce by a lot the number of packets (and therefore the number of packets drop) between the client and the Agent. This should also improve performances in hot path when sampling the same metrics. Using fnv1a hash for the aggregator gives us a average ~12% improvement on the aggregation benchmarks. --- .travis.yml | 1 - statsd/aggregator.go | 158 ++++++++++++++++++++++++++++++++ statsd/aggregator_test.go | 131 ++++++++++++++++++++++++++ statsd/metrics.go | 125 +++++++++++++++++++++++++ statsd/metrics_test.go | 131 ++++++++++++++++++++++++++ statsd/options.go | 58 +++++++++--- statsd/statsd.go | 93 ++++++++++++------- statsd/statsd_benchmark_test.go | 100 ++++++++++++++++---- 8 files changed, 730 insertions(+), 67 deletions(-) create mode 100644 statsd/aggregator.go create mode 100644 statsd/aggregator_test.go create mode 100644 statsd/metrics.go create mode 100644 statsd/metrics_test.go diff --git a/.travis.yml b/.travis.yml index 2825bad3e..c0d60c963 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: go go: - - 1.7 - 1.8 - 1.9 - 1.10.x diff --git a/statsd/aggregator.go b/statsd/aggregator.go new file mode 100644 index 000000000..f63e3239c --- /dev/null +++ b/statsd/aggregator.go @@ -0,0 +1,158 @@ +package statsd + +import ( + "strings" + "sync" + "time" +) + +type ( + countsMap map[string]*countMetric + gaugesMap map[string]*gaugeMetric + setsMap map[string]*setMetric +) + +type aggregator struct { + client *Client + + counts countsMap + countsM sync.RWMutex + + gauges gaugesMap + gaugesM sync.RWMutex + + sets setsMap + setsM sync.RWMutex + + closed chan struct{} + exited chan struct{} +} + +func newAggregator(c *Client) *aggregator { + return &aggregator{ + client: c, + counts: countsMap{}, + gauges: gaugesMap{}, + sets: setsMap{}, + closed: make(chan struct{}), + exited: make(chan struct{}), + } +} + +func (a *aggregator) start(flushInterval time.Duration) { + ticker := time.NewTicker(flushInterval) + + go func() { + for { + select { + case <-ticker.C: + a.sendMetrics() + case <-a.closed: + close(a.exited) + return + } + } + }() +} + +func (a *aggregator) sendMetrics() { + for _, m := range a.flushMetrics() { + a.client.send(m) + } +} + +func (a *aggregator) stop() { + close(a.closed) + <-a.exited + a.sendMetrics() +} + +func (a *aggregator) flushMetrics() []metric { + metrics := []metric{} + + // We reset the values to avoid sending 'zero' values for metrics not + // sampled during this flush interval + + a.setsM.Lock() + sets := a.sets + a.sets = setsMap{} + a.setsM.Unlock() + + for _, s := range sets { + metrics = append(metrics, s.flushUnsafe()...) + } + + a.gaugesM.Lock() + gauges := a.gauges + a.gauges = gaugesMap{} + a.gaugesM.Unlock() + + for _, g := range gauges { + metrics = append(metrics, g.flushUnsafe()) + } + + a.countsM.RLock() + counts := a.counts + a.counts = countsMap{} + a.countsM.RUnlock() + + for _, c := range counts { + metrics = append(metrics, c.flushUnsafe()) + } + + return metrics +} + +func getContext(name string, tags []string) string { + return name + ":" + strings.Join(tags, ",") +} + +func (a *aggregator) count(name string, value int64, tags []string, rate float64) error { + context := getContext(name, tags) + a.countsM.RLock() + if count, found := a.counts[context]; found { + count.sample(value) + a.countsM.RUnlock() + return nil + } + a.countsM.RUnlock() + + a.countsM.Lock() + a.counts[context] = newCountMetric(name, value, tags, rate) + a.countsM.Unlock() + return nil +} + +func (a *aggregator) gauge(name string, value float64, tags []string, rate float64) error { + context := getContext(name, tags) + a.gaugesM.RLock() + if gauge, found := a.gauges[context]; found { + gauge.sample(value) + a.gaugesM.RUnlock() + return nil + } + a.gaugesM.RUnlock() + + gauge := newGaugeMetric(name, value, tags, rate) + + a.gaugesM.Lock() + a.gauges[context] = gauge + a.gaugesM.Unlock() + return nil +} + +func (a *aggregator) set(name string, value string, tags []string, rate float64) error { + context := getContext(name, tags) + a.setsM.RLock() + if set, found := a.sets[context]; found { + set.sample(value) + a.setsM.RUnlock() + return nil + } + a.setsM.RUnlock() + + a.setsM.Lock() + a.sets[context] = newSetMetric(name, value, tags, rate) + a.setsM.Unlock() + return nil +} diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go new file mode 100644 index 000000000..9590596fe --- /dev/null +++ b/statsd/aggregator_test.go @@ -0,0 +1,131 @@ +package statsd + +import ( + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAggregatorSample(t *testing.T) { + a := newAggregator(nil) + + tags := []string{"tag1", "tag2"} + + a.gauge("gaugeTest", 21, tags, 1) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + + a.count("countTest", 21, tags, 1) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + + a.set("setTest", "value1", tags, 1) + assert.Len(t, a.sets, 1) + assert.Contains(t, a.sets, "setTest:tag1,tag2") + + a.gauge("gaugeTest", 123, tags, 1) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + + a.count("countTest", 10, tags, 1) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + + a.set("setTest", "value1", tags, 1) + assert.Len(t, a.sets, 1) + assert.Contains(t, a.sets, "setTest:tag1,tag2") +} + +func TestAggregatorFlush(t *testing.T) { + a := newAggregator(nil) + + tags := []string{"tag1", "tag2"} + + a.gauge("gaugeTest1", 21, tags, 1) + a.gauge("gaugeTest1", 10, tags, 1) + a.gauge("gaugeTest2", 15, tags, 1) + + a.count("countTest1", 21, tags, 1) + a.count("countTest1", 10, tags, 1) + a.count("countTest2", 1, tags, 1) + + a.set("setTest1", "value1", tags, 1) + a.set("setTest1", "value1", tags, 1) + a.set("setTest1", "value2", tags, 1) + a.set("setTest2", "value1", tags, 1) + + metrics := a.flushMetrics() + + assert.Len(t, a.gauges, 0) + assert.Len(t, a.counts, 0) + assert.Len(t, a.sets, 0) + + assert.Len(t, metrics, 7) + + sort.Slice(metrics, func(i, j int) bool { + if metrics[i].metricType == metrics[j].metricType { + res := strings.Compare(metrics[i].name, metrics[j].name) + // this happens fo set + if res == 0 { + return strings.Compare(metrics[i].svalue, metrics[j].svalue) != 1 + } + return res != 1 + } + return metrics[i].metricType < metrics[j].metricType + }) + + assert.Equal(t, metrics, []metric{ + metric{ + metricType: gauge, + name: "gaugeTest1", + tags: tags, + rate: 1, + fvalue: float64(10), + }, + metric{ + metricType: gauge, + name: "gaugeTest2", + tags: tags, + rate: 1, + fvalue: float64(15), + }, + metric{ + metricType: count, + name: "countTest1", + tags: tags, + rate: 1, + ivalue: int64(31), + }, + metric{ + metricType: count, + name: "countTest2", + tags: tags, + rate: 1, + ivalue: int64(1), + }, + metric{ + metricType: set, + name: "setTest1", + tags: tags, + rate: 1, + svalue: "value1", + }, + metric{ + metricType: set, + name: "setTest1", + tags: tags, + rate: 1, + svalue: "value2", + }, + metric{ + metricType: set, + name: "setTest2", + tags: tags, + rate: 1, + svalue: "value1", + }, + }) + +} diff --git a/statsd/metrics.go b/statsd/metrics.go new file mode 100644 index 000000000..4f582e978 --- /dev/null +++ b/statsd/metrics.go @@ -0,0 +1,125 @@ +package statsd + +import ( + "math" + "sync" + "sync/atomic" +) + +/* +Those are metrics type that can be aggregated on the client side: + - Gauge + - Count + - Set +*/ + +type countMetric struct { + value int64 + name string + tags []string + rate float64 +} + +func newCountMetric(name string, value int64, tags []string, rate float64) *countMetric { + return &countMetric{ + value: value, + name: name, + tags: tags, + rate: rate, + } +} + +func (c *countMetric) sample(v int64) { + atomic.AddInt64(&c.value, v) +} + +func (c *countMetric) flushUnsafe() metric { + return metric{ + metricType: count, + name: c.name, + tags: c.tags, + rate: c.rate, + ivalue: c.value, + } +} + +// Gauge + +type gaugeMetric struct { + value uint64 + name string + tags []string + rate float64 +} + +func newGaugeMetric(name string, value float64, tags []string, rate float64) *gaugeMetric { + return &gaugeMetric{ + value: math.Float64bits(value), + name: name, + tags: tags, + rate: rate, + } +} + +func (g *gaugeMetric) sample(v float64) { + atomic.StoreUint64(&g.value, math.Float64bits(v)) +} + +func (g *gaugeMetric) flushUnsafe() metric { + return metric{ + metricType: gauge, + name: g.name, + tags: g.tags, + rate: g.rate, + fvalue: math.Float64frombits(g.value), + } +} + +// Set + +type setMetric struct { + data map[string]struct{} + name string + tags []string + rate float64 + sync.Mutex +} + +func newSetMetric(name string, value string, tags []string, rate float64) *setMetric { + set := &setMetric{ + data: map[string]struct{}{}, + name: name, + tags: tags, + rate: rate, + } + set.data[value] = struct{}{} + return set +} + +func (s *setMetric) sample(v string) { + s.Lock() + defer s.Unlock() + s.data[v] = struct{}{} +} + +// Sets are aggregated on the agent side too. We flush the keys so a set from +// multiple application can be correctly aggregated on the agent side. +func (s *setMetric) flushUnsafe() []metric { + if len(s.data) == 0 { + return nil + } + + metrics := make([]metric, len(s.data)) + i := 0 + for value := range s.data { + metrics[i] = metric{ + metricType: set, + name: s.name, + tags: s.tags, + rate: s.rate, + svalue: value, + } + i++ + } + return metrics +} diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go new file mode 100644 index 000000000..981ae7806 --- /dev/null +++ b/statsd/metrics_test.go @@ -0,0 +1,131 @@ +package statsd + +import ( + "math" + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewCountMetric(t *testing.T) { + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1) + assert.Equal(t, c.value, int64(21)) + assert.Equal(t, c.name, "test") + assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.rate, 1.0) +} + +func TestCountMetricSample(t *testing.T) { + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1) + c.sample(12) + assert.Equal(t, c.value, int64(33)) + assert.Equal(t, c.name, "test") + assert.Equal(t, c.tags, []string{"tag1", "tag2"}) + assert.Equal(t, c.rate, 1.0) +} + +func TestFlushUnsafeCountMetricSample(t *testing.T) { + c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1) + m := c.flushUnsafe() + assert.Equal(t, m.metricType, count) + assert.Equal(t, m.ivalue, int64(21)) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.rate, 1.0) + + c.sample(12) + m = c.flushUnsafe() + assert.Equal(t, m.metricType, count) + assert.Equal(t, m.ivalue, int64(33)) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.rate, 1.0) +} + +func TestNewGaugeMetric(t *testing.T) { + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1) + assert.Equal(t, math.Float64frombits(g.value), float64(21)) + assert.Equal(t, g.name, "test") + assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.rate, 1.0) +} + +func TestGaugeMetricSample(t *testing.T) { + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1) + g.sample(12) + assert.Equal(t, math.Float64frombits(g.value), float64(12)) + assert.Equal(t, g.name, "test") + assert.Equal(t, g.tags, []string{"tag1", "tag2"}) + assert.Equal(t, g.rate, 1.0) +} + +func TestFlushUnsafeGaugeMetricSample(t *testing.T) { + g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1) + m := g.flushUnsafe() + assert.Equal(t, m.metricType, gauge) + assert.Equal(t, m.fvalue, float64(21)) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.rate, 1.0) + + g.sample(12) + m = g.flushUnsafe() + assert.Equal(t, m.metricType, gauge) + assert.Equal(t, m.fvalue, float64(12)) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.tags, []string{"tag1", "tag2"}) + assert.Equal(t, m.rate, 1.0) +} + +func TestNewSetMetric(t *testing.T) { + s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1) + assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, []string{"tag1", "tag2"}) + assert.Equal(t, s.rate, 1.0) +} + +func TestSetMetricSample(t *testing.T) { + s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1) + s.sample("value2") + assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}, "value2": struct{}{}}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, []string{"tag1", "tag2"}) + assert.Equal(t, s.rate, 1.0) +} + +func TestFlushUnsafeSetMetricSample(t *testing.T) { + s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1) + m := s.flushUnsafe() + + require.Len(t, m, 1) + + assert.Equal(t, m[0].metricType, set) + assert.Equal(t, m[0].svalue, "value1") + assert.Equal(t, m[0].name, "test") + assert.Equal(t, m[0].tags, []string{"tag1", "tag2"}) + assert.Equal(t, m[0].rate, 1.0) + + s.sample("value1") + s.sample("value2") + m = s.flushUnsafe() + + sort.Slice(m, func(i, j int) bool { + return strings.Compare(m[i].svalue, m[j].svalue) != 1 + }) + + require.Len(t, m, 2) + assert.Equal(t, m[0].metricType, set) + assert.Equal(t, m[0].svalue, "value1") + assert.Equal(t, m[0].name, "test") + assert.Equal(t, m[0].tags, []string{"tag1", "tag2"}) + assert.Equal(t, m[0].rate, 1.0) + assert.Equal(t, m[1].metricType, set) + assert.Equal(t, m[1].svalue, "value2") + assert.Equal(t, m[1].name, "test") + assert.Equal(t, m[1].tags, []string{"tag1", "tag2"}) + assert.Equal(t, m[1].rate, 1.0) +} diff --git a/statsd/options.go b/statsd/options.go index ec75880a3..eee95a738 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -30,6 +30,10 @@ var ( DefaultReceivingMode = MutexMode // DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics DefaultChannelModeBufferSize = 4096 + // DefaultAggregationFlushInterval is the default interval for the aggregator to flush metrics. + DefaultAggregationFlushInterval = 3 * time.Second + // DefaultAggregation + DefaultAggregation = false ) // Options contains the configuration options for a client. @@ -86,22 +90,28 @@ type Options struct { ReceiveMode ReceivingMode // ChannelModeBufferSize is the size of the channel holding incoming metrics ChannelModeBufferSize int + // AggregationFlushInterval is the interval for the aggregator to flush metrics + AggregationFlushInterval time.Duration + // [beta] Aggregation enables/disables client side aggregation + Aggregation bool } func resolveOptions(options []Option) (*Options, error) { o := &Options{ - Namespace: DefaultNamespace, - Tags: DefaultTags, - MaxBytesPerPayload: DefaultMaxBytesPerPayload, - MaxMessagesPerPayload: DefaultMaxMessagesPerPayload, - BufferPoolSize: DefaultBufferPoolSize, - BufferFlushInterval: DefaultBufferFlushInterval, - BufferShardCount: DefaultBufferShardCount, - SenderQueueSize: DefaultSenderQueueSize, - WriteTimeoutUDS: DefaultWriteTimeoutUDS, - Telemetry: DefaultTelemetry, - ReceiveMode: DefaultReceivingMode, - ChannelModeBufferSize: DefaultChannelModeBufferSize, + Namespace: DefaultNamespace, + Tags: DefaultTags, + MaxBytesPerPayload: DefaultMaxBytesPerPayload, + MaxMessagesPerPayload: DefaultMaxMessagesPerPayload, + BufferPoolSize: DefaultBufferPoolSize, + BufferFlushInterval: DefaultBufferFlushInterval, + BufferShardCount: DefaultBufferShardCount, + SenderQueueSize: DefaultSenderQueueSize, + WriteTimeoutUDS: DefaultWriteTimeoutUDS, + Telemetry: DefaultTelemetry, + ReceiveMode: DefaultReceivingMode, + ChannelModeBufferSize: DefaultChannelModeBufferSize, + AggregationFlushInterval: DefaultAggregationFlushInterval, + Aggregation: DefaultAggregation, } for _, option := range options { @@ -220,3 +230,27 @@ func WithChannelModeBufferSize(bufferSize int) Option { return nil } } + +// WithoutAggregationInterval set the aggregation interval +func WithoutAggregationInterval(interval time.Duration) Option { + return func(o *Options) error { + o.AggregationFlushInterval = interval + return nil + } +} + +// WithClientSideAggregation enables client side aggregation. Client side aggregation is a beta feature. +func WithClientSideAggregation() Option { + return func(o *Options) error { + o.Aggregation = true + return nil + } +} + +// WithoutClientSideAggregation disables client side aggregation. +func WithoutClientSideAggregation() Option { + return func(o *Options) error { + o.Aggregation = false + return nil + } +} diff --git a/statsd/statsd.go b/statsd/statsd.go index e4155169a..9e44e4ecc 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -216,6 +216,7 @@ type Client struct { bufferShards []*worker closerLock sync.Mutex receiveMode ReceivingMode + agg *aggregator } // ClientMetrics contains metrics about the client @@ -288,6 +289,10 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro Tags: o.Tags, telemetryTags: []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:" + writerName}, } + if o.Aggregation { + c.agg = newAggregator(&c) + c.agg.start(o.AggregationFlushInterval) + } // Inject values of DD_* environment variables as global tags. for envName, tagName := range ddEnvTagsMapping { @@ -373,7 +378,7 @@ func (c *Client) telemetry() { select { case <-ticker.C: for _, m := range c.flushTelemetry() { - c.addMetric(m) + c.send(m) } case <-c.stop: ticker.Stop() @@ -432,32 +437,14 @@ func (c *Client) FlushTelemetryMetrics() ClientMetrics { } } -func (c *Client) globalTags() []string { - if c != nil { - return c.Tags - } - return nil -} - -func (c *Client) namespace() string { - if c != nil { - return c.Namespace - } - return "" -} - -func (c *Client) addMetric(m metric) error { - if c != nil { - atomic.AddUint64(&c.metrics.TotalMetrics, 1) - } - return c.send(m) -} - func (c *Client) send(m metric) error { if c == nil { return ErrNoClient } + m.globalTags = c.Tags + m.namespace = c.Namespace + h := hashString32(m.name) worker := c.bufferShards[h%uint32(len(c.bufferShards))] @@ -474,22 +461,44 @@ func (c *Client) send(m metric) error { // Gauge measures the value of a metric at a particular time. func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { - return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate}) + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.metrics.TotalMetrics, 1) + if c.agg != nil { + return c.agg.gauge(name, value, tags, rate) + } + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate}) } // Count tracks how many times something happened per second. func (c *Client) Count(name string, value int64, tags []string, rate float64) error { - return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: count, name: name, ivalue: value, tags: tags, rate: rate}) + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.metrics.TotalMetrics, 1) + if c.agg != nil { + return c.agg.count(name, value, tags, rate) + } + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate}) } // Histogram tracks the statistical distribution of a set of values on each host. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { - return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.metrics.TotalMetrics, 1) + return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) } // Distribution tracks the statistical distribution of a set of values across your infrastructure. func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error { - return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.metrics.TotalMetrics, 1) + return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) } // Decr is just Count of -1 @@ -504,7 +513,14 @@ func (c *Client) Incr(name string, tags []string, rate float64) error { // Set counts the number of unique elements in a group. func (c *Client) Set(name string, value string, tags []string, rate float64) error { - return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: set, name: name, svalue: value, tags: tags, rate: rate}) + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.metrics.TotalMetrics, 1) + if c.agg != nil { + return c.agg.set(name, value, tags, rate) + } + return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate}) } // Timing sends timing information, it is an alias for TimeInMilliseconds @@ -515,15 +531,20 @@ func (c *Client) Timing(name string, value time.Duration, tags []string, rate fl // TimeInMilliseconds sends timing information in milliseconds. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { - return c.addMetric(metric{namespace: c.namespace(), globalTags: c.globalTags(), metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.metrics.TotalMetrics, 1) + return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) } // Event sends the provided Event. func (c *Client) Event(e *Event) error { - if c != nil { - atomic.AddUint64(&c.metrics.TotalEvents, 1) + if c == nil { + return ErrNoClient } - return c.send(metric{globalTags: c.globalTags(), metricType: event, evalue: e, rate: 1}) + atomic.AddUint64(&c.metrics.TotalEvents, 1) + return c.send(metric{metricType: event, evalue: e, rate: 1}) } // SimpleEvent sends an event with the provided title and text. @@ -534,10 +555,11 @@ func (c *Client) SimpleEvent(title, text string) error { // ServiceCheck sends the provided ServiceCheck. func (c *Client) ServiceCheck(sc *ServiceCheck) error { - if c != nil { - atomic.AddUint64(&c.metrics.TotalServiceChecks, 1) + if c == nil { + return ErrNoClient } - return c.send(metric{globalTags: c.globalTags(), metricType: serviceCheck, scvalue: sc, rate: 1}) + atomic.AddUint64(&c.metrics.TotalServiceChecks, 1) + return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1}) } // SimpleServiceCheck sends an serviceCheck with the provided name and status. @@ -574,6 +596,9 @@ func (c *Client) Close() error { c.wg.Wait() // Finally flush any remaining metrics that may have come in at the last moment + if c.agg != nil { + c.agg.stop() + } c.Flush() return c.sender.close() diff --git a/statsd/statsd_benchmark_test.go b/statsd/statsd_benchmark_test.go index 24f22e1e5..35eafd54d 100644 --- a/statsd/statsd_benchmark_test.go +++ b/statsd/statsd_benchmark_test.go @@ -53,13 +53,9 @@ func setupUDPClientServer(b *testing.B, options []statsd.Option) (*statsd.Client return client, conn } -func setupClient(b *testing.B, transport string, sendingMode statsd.ReceivingMode) (*statsd.Client, io.Closer) { +func setupClient(b *testing.B, transport string, extraOptions []statsd.Option) (*statsd.Client, io.Closer) { options := []statsd.Option{statsd.WithMaxMessagesPerPayload(1024), statsd.WithoutTelemetry()} - if sendingMode == statsd.MutexMode { - options = append(options, statsd.WithMutexMode()) - } else { - options = append(options, statsd.WithChannelMode()) - } + options = append(options, extraOptions...) if transport == "udp" { return setupUDPClientServer(b, options) @@ -67,8 +63,8 @@ func setupClient(b *testing.B, transport string, sendingMode statsd.ReceivingMod return setupUDSClientServer(b, options) } -func benchmarkStatsdDifferentMetrics(b *testing.B, transport string, sendingMode statsd.ReceivingMode) { - client, conn := setupClient(b, transport, sendingMode) +func benchmarkStatsdDifferentMetrics(b *testing.B, transport string, extraOptions ...statsd.Option) { + client, conn := setupClient(b, transport, extraOptions) defer conn.Close() n := int32(0) @@ -89,8 +85,8 @@ func benchmarkStatsdDifferentMetrics(b *testing.B, transport string, sendingMode client.Close() } -func benchmarkStatsdSameMetrics(b *testing.B, transport string, sendingMode statsd.ReceivingMode) { - client, conn := setupClient(b, transport, sendingMode) +func benchmarkStatsdSameMetrics(b *testing.B, transport string, extraOptions ...statsd.Option) { + client, conn := setupClient(b, transport, extraOptions) defer conn.Close() b.ResetTimer() @@ -108,32 +104,96 @@ func benchmarkStatsdSameMetrics(b *testing.B, transport string, sendingMode stat client.Close() } -// UDP +/* +UDP with the same metric +*/ + +// blocking + no aggregation func BenchmarkStatsdUDPSameMetricMutex(b *testing.B) { - benchmarkStatsdSameMetrics(b, "udp", statsd.MutexMode) + benchmarkStatsdSameMetrics(b, "udp", statsd.WithMutexMode(), statsd.WithoutClientSideAggregation()) } + +// dropping + no aggregation func BenchmarkStatsdUDPSameMetricChannel(b *testing.B) { - benchmarkStatsdSameMetrics(b, "udp", statsd.ChannelMode) + benchmarkStatsdSameMetrics(b, "udp", statsd.WithChannelMode(), statsd.WithoutClientSideAggregation()) +} + +// blocking + aggregation +func BenchmarkStatsdUDPSameMetricMutexAggregation(b *testing.B) { + benchmarkStatsdSameMetrics(b, "udp", statsd.WithMutexMode(), statsd.WithClientSideAggregation()) +} + +// dropping + aggregation +func BenchmarkStatsdUDPSameMetricChannelAggregation(b *testing.B) { + benchmarkStatsdSameMetrics(b, "udp", statsd.WithChannelMode(), statsd.WithClientSideAggregation()) } +/* +UDP with the different metrics +*/ + +// blocking + no aggregation func BenchmarkStatsdUDPDifferentMetricMutex(b *testing.B) { - benchmarkStatsdDifferentMetrics(b, "udp", statsd.MutexMode) + benchmarkStatsdDifferentMetrics(b, "udp", statsd.WithMutexMode(), statsd.WithoutClientSideAggregation()) } + +// dropping + no aggregation func BenchmarkStatsdUDPDifferentMetricChannel(b *testing.B) { - benchmarkStatsdDifferentMetrics(b, "udp", statsd.ChannelMode) + benchmarkStatsdDifferentMetrics(b, "udp", statsd.WithChannelMode(), statsd.WithoutClientSideAggregation()) } -// UDS +// blocking + aggregation +func BenchmarkStatsdUDPDifferentMetricMutexAggregation(b *testing.B) { + benchmarkStatsdDifferentMetrics(b, "udp", statsd.WithMutexMode(), statsd.WithClientSideAggregation()) +} + +// dropping + aggregation +func BenchmarkStatsdUDPDifferentMetricChannelAggregation(b *testing.B) { + benchmarkStatsdDifferentMetrics(b, "udp", statsd.WithChannelMode(), statsd.WithClientSideAggregation()) +} + +/* +UDS with the same metric +*/ +// blocking + no aggregation func BenchmarkStatsdUDSSameMetricMutex(b *testing.B) { - benchmarkStatsdSameMetrics(b, "uds", statsd.MutexMode) + benchmarkStatsdSameMetrics(b, "uds", statsd.WithMutexMode(), statsd.WithoutClientSideAggregation()) } + +// dropping + no aggregation func BenchmarkStatsdUDSSameMetricChannel(b *testing.B) { - benchmarkStatsdSameMetrics(b, "uds", statsd.ChannelMode) + benchmarkStatsdSameMetrics(b, "uds", statsd.WithChannelMode(), statsd.WithoutClientSideAggregation()) +} + +// blocking + aggregation +func BenchmarkStatsdUDSSameMetricMutexAggregation(b *testing.B) { + benchmarkStatsdSameMetrics(b, "uds", statsd.WithMutexMode(), statsd.WithClientSideAggregation()) +} + +// dropping + aggregation +func BenchmarkStatsdUDSSameMetricChannelAggregation(b *testing.B) { + benchmarkStatsdSameMetrics(b, "uds", statsd.WithChannelMode(), statsd.WithClientSideAggregation()) } +/* +UDS with different metrics +*/ +// blocking + no aggregation func BenchmarkStatsdUDPSifferentMetricMutex(b *testing.B) { - benchmarkStatsdDifferentMetrics(b, "uds", statsd.MutexMode) + benchmarkStatsdDifferentMetrics(b, "uds", statsd.WithMutexMode(), statsd.WithoutClientSideAggregation()) } + +// dropping + no aggregation func BenchmarkStatsdUDSDifferentMetricChannel(b *testing.B) { - benchmarkStatsdDifferentMetrics(b, "uds", statsd.ChannelMode) + benchmarkStatsdDifferentMetrics(b, "uds", statsd.WithChannelMode(), statsd.WithoutClientSideAggregation()) +} + +// blocking + aggregation +func BenchmarkStatsdUDPSifferentMetricMutexAggregation(b *testing.B) { + benchmarkStatsdDifferentMetrics(b, "uds", statsd.WithMutexMode(), statsd.WithClientSideAggregation()) +} + +// dropping + aggregation +func BenchmarkStatsdUDSDifferentMetricChannelAggregation(b *testing.B) { + benchmarkStatsdDifferentMetrics(b, "uds", statsd.WithChannelMode(), statsd.WithClientSideAggregation()) }