Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit histogram aggregates globally when "veneurglobalonly" is set #390

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
** Datadog and LightStep sinks now emit `veneur.sink.span_flush_total_duration_ns` for span flush duration and tag it with `sink`
** Datadog, Kafka, MetricExtraction, and LightStep sinks now emit `sink.spans_flushed_total` for metric flush counts and tag it with `sink`
* Veneur's internal metrics are no longer tagged with `veneurlocalonly`. This means that percentile metrics (such as timers) will now be aggregated globally.
* Histograms tagged with `veneurglobalonly` will only output global aggregates, rather than sending them locally. Aggregations such as "sum", "min", or "max" will be submitted from the global Veneur without a host tag.

## Bugfixes
* LightStep sink was hardcoded to use plaintext, now adjusts based on URL scheme (http versus https). Thanks [gphat](https://github.com/gphat)!
Expand All @@ -34,6 +35,7 @@

## Improvements
* Updated Datadog span sink to latest version in Datadog tracing agent. Thanks, [gphat](https://github.com/gphat)!
* Histograms tagged with `veneurglobalonly` submit agregates from the global Veneur rather than flushing locally to Datadog (fixes #155). Thanks, [noahgoldman](https://github.com/noahgoldman)!

# 2.0.0, 2018-01-09

Expand Down
143 changes: 82 additions & 61 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ func (s *Server) Flush(ctx context.Context) {
// don't publish percentiles if we're a local veneur; that's the global
// veneur's job
var percentiles []float64
var finalMetrics []samplers.InterMetric
aggregates := s.HistogramAggregates

if !s.IsLocal() {
percentiles = s.HistogramPercentiles
aggregates = samplers.HistogramAggregates{}
}

tempMetrics, ms := s.tallyMetrics(percentiles)

finalMetrics = s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms)
finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, aggregates, tempMetrics, ms)

span.Add(s.computeMetricsFlushCounts(ms)...)

Expand Down Expand Up @@ -104,8 +105,10 @@ type metricsSummary struct {
totalSets int
totalTimers int

totalGlobalCounters int
totalGlobalGauges int
totalGlobalCounters int
totalGlobalGauges int
totalGlobalHistograms int
totalGlobalTimers int

totalLocalHistograms int
totalLocalSets int
Expand Down Expand Up @@ -139,6 +142,8 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu

ms.totalGlobalCounters += len(wm.globalCounters)
ms.totalGlobalGauges += len(wm.globalGauges)
ms.totalGlobalHistograms += len(wm.globalHistograms)
ms.totalGlobalTimers += len(wm.globalTimers)

ms.totalLocalHistograms += len(wm.localHistograms)
ms.totalLocalSets += len(wm.localSets)
Expand All @@ -157,12 +162,16 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu
// 'local-only' histograms.
ms.totalLocalSets + (ms.totalLocalTimers+ms.totalLocalHistograms)*(s.HistogramAggregates.Count+len(s.HistogramPercentiles))

// Global instances also flush sets and global counters, so be sure and add
// them to the total size
// Global instances also flush sets and global counters, as well as their
// aggregates if they are "global". Add them here to the total size
if !s.IsLocal() {
ms.totalLength += ms.totalSets
ms.totalLength += ms.totalGlobalCounters
ms.totalLength += ms.totalGlobalGauges
// On the global instance, each "global" histogram and timer will
// report each aggregate and percentile
ms.totalLength += (ms.totalGlobalHistograms + ms.totalGlobalTimers) *
(s.HistogramAggregates.Count + len(percentiles))
}

return tempMetrics, ms
Expand All @@ -171,7 +180,7 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu
// generateInterMetrics calls the Flush method on each
// counter/gauge/histogram/timer/set in order to
// generate an InterMetric corresponding to that value
func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.InterMetric {
func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64, aggregates samplers.HistogramAggregates, tempMetrics []WorkerMetrics, ms metricsSummary) []samplers.InterMetric {

span, _ := trace.StartSpanFromContext(ctx, "")
defer span.ClientFinish(s.TraceClient)
Expand All @@ -187,10 +196,10 @@ func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64
// if we're a local veneur, then percentiles=nil, and only the local
// parts (count, min, max) will be flushed
for _, h := range wm.histograms {
finalMetrics = append(finalMetrics, h.Flush(s.interval, percentiles, s.HistogramAggregates)...)
finalMetrics = append(finalMetrics, h.Flush(s.interval, percentiles, aggregates)...)
}
for _, t := range wm.timers {
finalMetrics = append(finalMetrics, t.Flush(s.interval, percentiles, s.HistogramAggregates)...)
finalMetrics = append(finalMetrics, t.Flush(s.interval, percentiles, aggregates)...)
}

// local-only samplers should be flushed in their entirety, since they
Expand Down Expand Up @@ -227,6 +236,15 @@ func (s *Server) generateInterMetrics(ctx context.Context, percentiles []float64
for _, gg := range wm.globalGauges {
finalMetrics = append(finalMetrics, gg.Flush()...)
}

// If this is global, always submit both percentiles and
// aggregates for "global" histogram types
for _, gh := range wm.globalHistograms {
finalMetrics = append(finalMetrics, gh.Flush(s.interval, s.HistogramPercentiles, s.HistogramAggregates)...)
}
for _, gt := range wm.globalTimers {
finalMetrics = append(finalMetrics, gt.Flush(s.interval, s.HistogramPercentiles, s.HistogramAggregates)...)
}
}
}

Expand Down Expand Up @@ -267,8 +285,10 @@ func (s *Server) computeGlobalMetricsFlushCounts(ms metricsSummary) []*ssf.SSFSa
ssf.Count(flushTotalMetric, float32(ms.totalGlobalCounters), map[string]string{"metric_type": "global_counter"}),
ssf.Count(flushTotalMetric, float32(ms.totalGlobalGauges), map[string]string{"metric_type": "global_gauge"}),
ssf.Count(flushTotalMetric, float32(ms.totalHistograms), map[string]string{"metric_type": "histogram"}),
ssf.Count(flushTotalMetric, float32(ms.totalGlobalHistograms), map[string]string{"metric_type": "histogram"}),
ssf.Count(flushTotalMetric, float32(ms.totalSets), map[string]string{"metric_type": "set"}),
ssf.Count(flushTotalMetric, float32(ms.totalTimers), map[string]string{"metric_type": "timer"}),
ssf.Count(flushTotalMetric, float32(ms.totalGlobalTimers), map[string]string{"metric_type": "timer"}),
}
}

Expand All @@ -277,75 +297,45 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
defer span.ClientFinish(s.TraceClient)
jmLength := 0
for _, wm := range wms {
jmLength += len(wm.globalCounters)
jmLength += len(wm.globalGauges)
jmLength += len(wm.histograms)
jmLength += len(wm.globalHistograms)
jmLength += len(wm.sets)
jmLength += len(wm.timers)
jmLength += len(wm.globalTimers)
}

jsonMetrics := make([]samplers.JSONMetric, 0, jmLength)
exportStart := time.Now()
for _, wm := range wms {
for _, count := range wm.globalCounters {
jm, err := count.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "counter",
"name": count.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
jsonMetrics = s.appendJSONMetric(count, jsonMetrics, counterTypeName,
samplers.GlobalOnly)
}
for _, gauge := range wm.globalGauges {
jm, err := gauge.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "gauge",
"name": gauge.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
jsonMetrics = s.appendJSONMetric(gauge, jsonMetrics, gaugeTypeName,
samplers.GlobalOnly)
}
for _, histo := range wm.histograms {
jm, err := histo.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "histogram",
"name": histo.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
jsonMetrics = s.appendJSONMetric(histo, jsonMetrics, histogramTypeName,
samplers.MixedScope)
}
for _, histo := range wm.globalHistograms {
jsonMetrics = s.appendJSONMetric(histo, jsonMetrics, histogramTypeName,
samplers.GlobalOnly)
}
for _, set := range wm.sets {
jm, err := set.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "set",
"name": set.Name,
}).Error("Could not export metric")
continue
}
jsonMetrics = append(jsonMetrics, jm)
jsonMetrics = s.appendJSONMetric(set, jsonMetrics, setTypeName,
samplers.MixedScope)
}
for _, timer := range wm.timers {
jm, err := timer.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": "timer",
"name": timer.Name,
}).Error("Could not export metric")
continue
}
// the exporter doesn't know that these two are "different"
jm.Type = "timer"
jsonMetrics = append(jsonMetrics, jm)
jsonMetrics = s.appendJSONMetric(timer, jsonMetrics, timerTypeName,
samplers.MixedScope)
}
for _, timer := range wm.globalTimers {
jsonMetrics = s.appendJSONMetric(timer, jsonMetrics, timerTypeName,
samplers.GlobalOnly)
}
}
span.Add(ssf.Timing("forward.duration_ns", time.Since(exportStart), time.Nanosecond, map[string]string{"part": "export"}),
Expand All @@ -370,3 +360,34 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
func (s *Server) flushTraces(ctx context.Context) {
s.SpanWorker.Flush()
}

// exporter is any metric that can be exported (and has a name)
type exporter interface {
Export() (samplers.JSONMetric, error)
GetName() string
}

// appendJSONMetric appends a JSONMetric exported by the input metric m, or
// does nothing and logs if the export fails.
func (s *Server) appendJSONMetric(
m exporter,
metrics []samplers.JSONMetric,
mType string,
scope samplers.MetricScope,
) []samplers.JSONMetric {
jm, err := m.Export()
if err != nil {
log.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"type": mType,
"name": m.GetName(),
"scope": scope,
}).Error("Could not export metric")
return metrics
}

jm.Type = mType
jm.Scope = scope

return append(metrics, jm)
}
Loading