Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Fix Prometheus Histograms to be cumulative
Browse files Browse the repository at this point in the history
Fixes #649

The current Prometheus Histogram creator takes in
a mapping of bucket values to counts, having already
assumed that the creator made cumulative results i.e.
added the result of the largest minimum to the current.
Prometheus mentions this expectation at
https://godoc.org/github.com/prometheus/client_golang/prometheus#NewConstHistogram

Also added a test to ensure that we never
regress on this behavior.
  • Loading branch information
odeke-em committed Mar 30, 2018
1 parent 8069fe5 commit 3531d67
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 2 deletions.
27 changes: 26 additions & 1 deletion exporter/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"log"
"net/http"
"sort"
"sync"

"go.opencensus.io/internal"
Expand Down Expand Up @@ -231,16 +232,40 @@ func (c *collector) toMetric(desc *prometheus.Desc, v *view.View, row *view.Row)
switch data := row.Data.(type) {
case *view.CountData:
return prometheus.NewConstMetric(desc, prometheus.CounterValue, float64(*data), tagValues(row.Tags)...)

case *view.DistributionData:
points := make(map[float64]uint64)
// Histograms are cumulative in Prometheus.
// 1. Sort buckets in ascending order but, retain
// their indices for reverse lookup later on.
// TODO: If there is a guarantee that distribution elements
// are always sorted, then skip the sorting.
indicesMap := make(map[float64]int)
buckets := make([]float64, 0, len(v.Aggregation.Buckets))
for i, b := range v.Aggregation.Buckets {
points[b] = uint64(data.CountPerBucket[i])
if _, ok := indicesMap[b]; !ok {
indicesMap[b] = i
buckets = append(buckets, b)
}
}
sort.Float64s(buckets)

// 2. Now that the buckets are sorted by magnitude
// we can create cumulative indicesmap them back by reverse index
cumCount := uint64(0)
for _, b := range buckets {
i := indicesMap[b]
cumCount += uint64(data.CountPerBucket[i])
points[b] = cumCount
}
return prometheus.NewConstHistogram(desc, uint64(data.Count), data.Sum(), points, tagValues(row.Tags)...)

case *view.SumData:
return prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(*data), tagValues(row.Tags)...)

case *view.LastValueData:
return prometheus.NewConstMetric(desc, prometheus.UntypedValue, data.Value, tagValues(row.Tags)...)

default:
return nil, fmt.Errorf("aggregation %T is not yet supported", v.Aggregation)
}
Expand Down
91 changes: 90 additions & 1 deletion exporter/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,11 @@ func TestMetricsEndpointOutput(t *testing.T) {
vc.createAndAppend(m.Name(), m.Description(), nil, m, view.Count())
}

if err := view.Subscribe(vc...); err != nil {
if err := view.Register(vc...); err != nil {
t.Fatalf("failed to create views: %v", err)
}
defer view.Unregister(vc...)

view.SetReportingPeriod(time.Millisecond)

for _, m := range measures {
Expand Down Expand Up @@ -262,3 +264,90 @@ func TestMetricsEndpointOutput(t *testing.T) {
}
}
}

func TestCumulativenessFromHistograms(t *testing.T) {
exporter, err := newExporter(Options{})
if err != nil {
t.Fatalf("failed to create prometheus exporter: %v", err)
}
view.RegisterExporter(exporter)
reportPeriod := time.Millisecond
view.SetReportingPeriod(reportPeriod)

m := stats.Float64("tests/bills", "payments by denomination", stats.UnitNone)
v := &view.View{
Name: "cash/register",
Description: "this is a test",
Measure: m,

// Intentionally used repeated elements in the ascending distribution.
// to ensure duplicate distribution items are handles.
Aggregation: view.Distribution(1, 5, 5, 5, 5, 10, 20, 50, 100, 250),
}

if err := view.Register(v); err != nil {
t.Fatalf("Register error: %v", err)
}
defer view.Unregister(v)

// Give the reporter ample time to process registration
<-time.After(2 * reportPeriod)

values := []float64{0.25, 245.67, 12, 1.45, 199.9, 7.69, 187.12}
// We want the results that look like this:
// 1: [0.25] | 1 + prev(i) = 1 + 0 = 1
// 5: [1.45] | 1 + prev(i) = 1 + 1 = 2
// 10: [] | 1 + prev(i) = 1 + 2 = 3
// 20: [12] | 1 + prev(i) = 1 + 3 = 4
// 50: [] | 0 + prev(i) = 0 + 4 = 4
// 100: [] | 0 + prev(i) = 0 + 4 = 4
// 250: [187.12, 199.9, 245.67] | 3 + prev(i) = 3 + 4 = 7
wantLines := []string{
`opencensus_cash_register_bucket{le="1"} 1`,
`opencensus_cash_register_bucket{le="5"} 2`,
`opencensus_cash_register_bucket{le="10"} 3`,
`opencensus_cash_register_bucket{le="20"} 4`,
`opencensus_cash_register_bucket{le="50"} 4`,
`opencensus_cash_register_bucket{le="100"} 4`,
`opencensus_cash_register_bucket{le="250"} 7`,
`opencensus_cash_register_bucket{le="+Inf"} 7`,
`opencensus_cash_register_sum 654.0799999999999`, // Summation of the input values
`opencensus_cash_register_count 7`,
}

ctx := context.Background()
ms := make([]stats.Measurement, len(values))
for _, value := range values {
mx := m.M(value)
ms = append(ms, mx)
}
stats.Record(ctx, ms...)

// Give the recorder ample time to process recording
<-time.After(3 * reportPeriod)

cst := httptest.NewServer(exporter)
defer cst.Close()
res, err := http.Get(cst.URL)
if err != nil {
t.Fatalf("http.Get error: %v", err)
}
blob, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatalf("Read body error: %v", err)
}
str := strings.Trim(string(blob), "\n")
lines := strings.Split(str, "\n")
nonComments := make([]string, 0, len(lines))
for _, line := range lines {
if !strings.Contains(line, "#") {
nonComments = append(nonComments, line)
}
}

got := strings.Join(nonComments, "\n")
want := strings.Join(wantLines, "\n")
if got != want {
t.Fatalf("\ngot:\n%s\n\nwant:\n%s\n", got, want)
}
}

0 comments on commit 3531d67

Please sign in to comment.