From a5a78ff5f0472b817cc3b90728aa78d355545fd4 Mon Sep 17 00:00:00 2001 From: Ian Milligan Date: Tue, 2 Jun 2020 09:33:39 -0700 Subject: [PATCH 1/2] Remove call to time.Now() on worker thread when handling record reqs (#1210) Time is already recorded on the client side and stored in the currently unused recordReq.t field. Avoiding these repeated calls to time.Now while the worker is blocked can significantly reduce worker contention. --- stats/view/worker_commands.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index 0267e179a..8ec295d95 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -163,7 +163,7 @@ func (cmd *recordReq) handleCommand(w *worker) { } ref := w.getMeasureRef(m.Measure().Name()) for v := range ref.views { - v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now()) + v.addSample(cmd.tm, m.Value(), cmd.attachments, cmd.t) } } } From 895940823bd39d2621ca4f61a82f0123123d8df5 Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Mon, 1 Jun 2020 23:42:52 -0700 Subject: [PATCH 2/2] Update Meter to track and report Resource for metric data. --- stats/view/view_to_metric.go | 5 +- stats/view/view_to_metric_test.go | 4 +- stats/view/worker.go | 18 ++++- stats/view/worker_test.go | 105 ++++++++++++++++++++---------- 4 files changed, 95 insertions(+), 37 deletions(-) diff --git a/stats/view/view_to_metric.go b/stats/view/view_to_metric.go index 293c1646d..5e1656a1f 100644 --- a/stats/view/view_to_metric.go +++ b/stats/view/view_to_metric.go @@ -18,6 +18,8 @@ package view import ( "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats" ) @@ -125,7 +127,7 @@ func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Ti } } -func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric { +func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTime time.Time) *metricdata.Metric { if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 || v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 { startTime = time.Time{} @@ -144,6 +146,7 @@ func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricda m := &metricdata.Metric{ Descriptor: *v.metricDescriptor, TimeSeries: ts, + Resource: r, } return m } diff --git a/stats/view/view_to_metric_test.go b/stats/view/view_to_metric_test.go index 18c877117..b6df3a0f7 100644 --- a/stats/view/view_to_metric_test.go +++ b/stats/view/view_to_metric_test.go @@ -447,7 +447,7 @@ func Test_ViewToMetric(t *testing.T) { tc.vi.addSample(tag.FromContext(ctx), v, nil, now) } - gotMetric := viewToMetric(tc.vi, now, startTime) + gotMetric := viewToMetric(tc.vi, nil, now, startTime) if !cmp.Equal(gotMetric, tc.wantMetric) { // JSON format is strictly for checking the content when test fails. Do not use JSON // format to determine if the two values are same as it doesn't differentiate between @@ -509,7 +509,7 @@ func TestUnitConversionForAggCount(t *testing.T) { for _, tc := range tests { tc.vi.addSample(tag.FromContext(context.Background()), 5.0, nil, now) - gotMetric := viewToMetric(tc.vi, now, startTime) + gotMetric := viewToMetric(tc.vi, nil, now, startTime) gotUnit := gotMetric.Descriptor.Unit if !cmp.Equal(gotUnit, tc.wantUnit) { t.Errorf("Verify Unit: %s: Got:%v Want:%v", tc.name, gotUnit, tc.wantUnit) diff --git a/stats/view/worker.go b/stats/view/worker.go index 74e4a90fa..87cf8b48c 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats" @@ -47,6 +49,7 @@ type worker struct { c chan command quit, done chan bool mu sync.RWMutex + r *resource.Resource exportersMu sync.RWMutex exporters map[Exporter]struct{} @@ -91,6 +94,10 @@ type Meter interface { RegisterExporter(Exporter) // UnregisterExporter unregisters an exporter. UnregisterExporter(Exporter) + // SetResource may be used to set the Resource associated with this registry. + // This is intended to be used in cases where a single process exports metrics + // for multiple Resources, typically in a multi-tenant situation. + SetResource(*resource.Resource) // Start causes the Meter to start processing Record calls and aggregating // statistics as well as exporting data. @@ -249,6 +256,10 @@ func NewMeter() Meter { } } +func (w *worker) SetResource(r *resource.Resource) { + w.r = r +} + func (w *worker) Start() { go w.start() } @@ -368,7 +379,7 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric { startTime = w.startTimes[v] } - return viewToMetric(v, now, startTime) + return viewToMetric(v, w.r, now, startTime) } // Read reads all view data and returns them as metrics. @@ -387,6 +398,11 @@ func (w *worker) Read() []*metricdata.Metric { return metrics } +// Resource implements metricproducer.ResourceProducer. +func (w *worker) Resource() *resource.Resource { + return w.r +} + func (w *worker) RegisterExporter(e Exporter) { w.exportersMu.Lock() defer w.exportersMu.Unlock() diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index ee7f149aa..193aad0e2 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -18,10 +18,13 @@ package view import ( "context" "errors" + "sort" "sync" "testing" "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricexport" "go.opencensus.io/stats" @@ -123,8 +126,13 @@ func Test_Worker_MultiExport(t *testing.T) { // This test reports the same data for the default worker and a secondary // worker, and ensures that the stats are kept independently. + extraResource := resource.Resource{ + Type: "additional", + Labels: map[string]string{"key1": "value1", "key2": "value2"}, + } worker2 := NewMeter().(*worker) worker2.Start() + worker2.SetResource(&extraResource) m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit") key := tag.MustNewKey(("key")) @@ -162,50 +170,62 @@ func Test_Worker_MultiExport(t *testing.T) { } } - wantRows := []struct { - w Meter - view string - rows []*Row - }{{ - view: count.Name, - rows: []*Row{ + makeKey := func(r *resource.Resource, view string) string { + if r == nil { + r = &resource.Resource{} + } + return resource.EncodeLabels(r.Labels) + "/" + view + } + + // Format is Resource.Labels encoded as string, then + wantPartialData := map[string][]*Row{ + makeKey(nil, count.Name): []*Row{ {[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}}, {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, }, - }, { - view: sum.Name, - rows: []*Row{ - {nil, &SumData{Value: 7.5}}}, - }, { - w: worker2, - view: count.Name, - rows: []*Row{ + makeKey(nil, sum.Name): []*Row{ + {nil, &SumData{Value: 7.5}}, + }, + makeKey(&extraResource, count.Name): []*Row{ {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, }, - }} + } - for _, wantRow := range wantRows { - retrieve := RetrieveData - if wantRow.w != nil { - retrieve = wantRow.w.(*worker).RetrieveData - } - gotRows, err := retrieve(wantRow.view) - if err != nil { - t.Fatalf("RetrieveData(%v), got error %v", wantRow.view, err) + te := &testExporter{} + metricexport.NewReader().ReadAndExport(te) + for _, m := range te.metrics { + key := makeKey(m.Resource, m.Descriptor.Name) + want, ok := wantPartialData[key] + if !ok { + t.Errorf("Unexpected data for %q: %v", key, m) + continue } - for _, got := range gotRows { - if !containsRow(wantRow.rows, got) { - t.Errorf("%s: got row %#v; want none", wantRow.view, got) - break + gotTs := m.TimeSeries + sort.Sort(byLabel(gotTs)) + + for i, ts := range gotTs { + for j, label := range ts.LabelValues { + if want[i].Tags[j].Value != label.Value { + t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key) + } } - } - for _, want := range wantRow.rows { - if !containsRow(gotRows, want) { - t.Errorf("%s: got none, want %#v", wantRow.view, want) - break + switch wantValue := want[i].Data.(type) { + case *CountData: + got := ts.Points[0].Value.(int64) + if wantValue.Value != got { + t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue, got, ts, key) + } + case *SumData: + got := ts.Points[0].Value.(float64) + if wantValue.Value != got { + t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue, got, ts, key) + } + default: + t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key) } } } + // Verify that worker has not been computing sum: got, err := worker2.RetrieveData(sum.Name) if err == nil { @@ -577,9 +597,11 @@ func TestWorkerRace(t *testing.T) { } type testExporter struct { + metrics []*metricdata.Metric } func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + te.metrics = metrics return nil } @@ -619,3 +641,20 @@ func restart() { defaultWorker = NewMeter().(*worker) go defaultWorker.start() } + +// byTag implements sort.Interface for *metricdata.TimeSeries by Labels. +type byLabel []*metricdata.TimeSeries + +func (ts byLabel) Len() int { return len(ts) } +func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } +func (ts byLabel) Less(i, j int) bool { + if len(ts[i].LabelValues) != len(ts[j].LabelValues) { + return len(ts[i].LabelValues) < len(ts[j].LabelValues) + } + for k := range ts[i].LabelValues { + if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value { + return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value + } + } + return false +}