diff --git a/exporter/prometheus/prometheus.go b/exporter/prometheus/prometheus.go index 1496c3f5c..4c2f4dc78 100644 --- a/exporter/prometheus/prometheus.go +++ b/exporter/prometheus/prometheus.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "net/http" + "sort" "sync" "go.opencensus.io/internal" @@ -231,16 +232,40 @@ func (c *collector) toMetric(desc *prometheus.Desc, v *view.View, row *view.Row) switch data := row.Data.(type) { case *view.CountData: return prometheus.NewConstMetric(desc, prometheus.CounterValue, float64(*data), tagValues(row.Tags)...) + case *view.DistributionData: points := make(map[float64]uint64) + // Histograms are cumulative in Prometheus. + // 1. Sort buckets in ascending order but, retain + // their indices for reverse lookup later on. + // TODO: If there is a guarantee that distribution elements + // are always sorted, then skip the sorting. + indicesMap := make(map[float64]int) + buckets := make([]float64, 0, len(v.Aggregation.Buckets)) for i, b := range v.Aggregation.Buckets { - points[b] = uint64(data.CountPerBucket[i]) + if _, ok := indicesMap[b]; !ok { + indicesMap[b] = i + buckets = append(buckets, b) + } + } + sort.Float64s(buckets) + + // 2. Now that the buckets are sorted by magnitude + // we can create cumulative indicesmap them back by reverse index + cumCount := uint64(0) + for _, b := range buckets { + i := indicesMap[b] + cumCount += uint64(data.CountPerBucket[i]) + points[b] = cumCount } return prometheus.NewConstHistogram(desc, uint64(data.Count), data.Sum(), points, tagValues(row.Tags)...) + case *view.SumData: return prometheus.NewConstMetric(desc, prometheus.UntypedValue, float64(*data), tagValues(row.Tags)...) + case *view.LastValueData: return prometheus.NewConstMetric(desc, prometheus.UntypedValue, data.Value, tagValues(row.Tags)...) + default: return nil, fmt.Errorf("aggregation %T is not yet supported", v.Aggregation) } diff --git a/exporter/prometheus/prometheus_test.go b/exporter/prometheus/prometheus_test.go index bbf390a35..d08bd7893 100644 --- a/exporter/prometheus/prometheus_test.go +++ b/exporter/prometheus/prometheus_test.go @@ -210,9 +210,11 @@ func TestMetricsEndpointOutput(t *testing.T) { vc.createAndAppend(m.Name(), m.Description(), nil, m, view.Count()) } - if err := view.Subscribe(vc...); err != nil { + if err := view.Register(vc...); err != nil { t.Fatalf("failed to create views: %v", err) } + defer view.Unregister(vc...) + view.SetReportingPeriod(time.Millisecond) for _, m := range measures { @@ -262,3 +264,90 @@ func TestMetricsEndpointOutput(t *testing.T) { } } } + +func TestCumulativenessFromHistograms(t *testing.T) { + exporter, err := newExporter(Options{}) + if err != nil { + t.Fatalf("failed to create prometheus exporter: %v", err) + } + view.RegisterExporter(exporter) + reportPeriod := time.Millisecond + view.SetReportingPeriod(reportPeriod) + + m := stats.Float64("tests/bills", "payments by denomination", stats.UnitNone) + v := &view.View{ + Name: "cash/register", + Description: "this is a test", + Measure: m, + + // Intentionally used repeated elements in the ascending distribution. + // to ensure duplicate distribution items are handles. + Aggregation: view.Distribution(1, 5, 5, 5, 5, 10, 20, 50, 100, 250), + } + + if err := view.Register(v); err != nil { + t.Fatalf("Register error: %v", err) + } + defer view.Unregister(v) + + // Give the reporter ample time to process registration + <-time.After(2 * reportPeriod) + + values := []float64{0.25, 245.67, 12, 1.45, 199.9, 7.69, 187.12} + // We want the results that look like this: + // 1: [0.25] | 1 + prev(i) = 1 + 0 = 1 + // 5: [1.45] | 1 + prev(i) = 1 + 1 = 2 + // 10: [] | 1 + prev(i) = 1 + 2 = 3 + // 20: [12] | 1 + prev(i) = 1 + 3 = 4 + // 50: [] | 0 + prev(i) = 0 + 4 = 4 + // 100: [] | 0 + prev(i) = 0 + 4 = 4 + // 250: [187.12, 199.9, 245.67] | 3 + prev(i) = 3 + 4 = 7 + wantLines := []string{ + `opencensus_cash_register_bucket{le="1"} 1`, + `opencensus_cash_register_bucket{le="5"} 2`, + `opencensus_cash_register_bucket{le="10"} 3`, + `opencensus_cash_register_bucket{le="20"} 4`, + `opencensus_cash_register_bucket{le="50"} 4`, + `opencensus_cash_register_bucket{le="100"} 4`, + `opencensus_cash_register_bucket{le="250"} 7`, + `opencensus_cash_register_bucket{le="+Inf"} 7`, + `opencensus_cash_register_sum 654.0799999999999`, // Summation of the input values + `opencensus_cash_register_count 7`, + } + + ctx := context.Background() + ms := make([]stats.Measurement, len(values)) + for _, value := range values { + mx := m.M(value) + ms = append(ms, mx) + } + stats.Record(ctx, ms...) + + // Give the recorder ample time to process recording + <-time.After(3 * reportPeriod) + + cst := httptest.NewServer(exporter) + defer cst.Close() + res, err := http.Get(cst.URL) + if err != nil { + t.Fatalf("http.Get error: %v", err) + } + blob, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Read body error: %v", err) + } + str := strings.Trim(string(blob), "\n") + lines := strings.Split(str, "\n") + nonComments := make([]string, 0, len(lines)) + for _, line := range lines { + if !strings.Contains(line, "#") { + nonComments = append(nonComments, line) + } + } + + got := strings.Join(nonComments, "\n") + want := strings.Join(wantLines, "\n") + if got != want { + t.Fatalf("\ngot:\n%s\n\nwant:\n%s\n", got, want) + } +}