Skip to content

Commit

Permalink
Report Tag cache size as part of internal metrics (#189)
Browse files Browse the repository at this point in the history
Additional internal metrics will be emitted:

- `tally.internal.num-tag-cache` will report the number of tags
- `tally.internal.counter-cardinality` will report the number of counters across all scopes
- `tally.internal.gauge-cardinality` will report the number of gauges across all scopes
- `tally.internal.histogram-cardinality` will report the number of histograms across all scopes

Other changes:
- refactored tests to use internal metric counts defined in variables
  • Loading branch information
brawndou authored Jan 13, 2023
1 parent 2dbfcc6 commit c347bbf
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 68 deletions.
7 changes: 7 additions & 0 deletions internal/cache/tag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (c *TagCache) Set(key uint64, tslice []m3thrift.MetricTag) []m3thrift.Metri
return tslice
}

// Len returns the size of the cache,
func (c *TagCache) Len() int {
c.mtx.RLock()
defer c.mtx.RUnlock()
return len(c.entries)
}

// TagMapKey generates a new key based on tags.
func TagMapKey(tags map[string]string) uint64 {
return identity.StringStringMap(tags)
Expand Down
4 changes: 3 additions & 1 deletion m3/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type reporter struct {
numMetricsCounter tally.CachedCount
numWriteErrors atomic.Int64
numWriteErrorsCounter tally.CachedCount
numTagCacheCounter tally.CachedCount
}

// Options is a set of options for the M3 reporter.
Expand Down Expand Up @@ -291,7 +292,7 @@ func NewReporter(opts Options) (Reporter, error) {
r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags)
r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags)
r.numWriteErrorsCounter = r.AllocateCounter("tally.internal.num-write-errors", internalTags)

r.numTagCacheCounter = r.AllocateCounter("tally.internal.num-tag-cache", internalTags)
r.wg.Add(1)
go func() {
defer r.wg.Done()
Expand Down Expand Up @@ -689,6 +690,7 @@ func (r *reporter) reportInternalMetrics() {
r.numBatchesCounter.ReportCount(batches)
r.numMetricsCounter.ReportCount(metrics)
r.numWriteErrorsCounter.ReportCount(writeErrors)
r.numTagCacheCounter.ReportCount(int64(r.tagCache.Len()))
}

func (r *reporter) timeLoop() {
Expand Down
3 changes: 2 additions & 1 deletion m3/reporter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func testProcessFlushOnExit(t *testing.T, i int) {

require.Equal(t, 1, len(server.Service.getBatches()))
require.NotNil(t, server.Service.getBatches()[0])
require.Equal(t, 7, len(server.Service.getBatches()[0].GetMetrics()))
// 3 metrics are emitted by mainFileFmt plus various other internal metrics.
require.Equal(t, internalMetrics+cardinalityMetrics+3, len(server.Service.getBatches()[0].GetMetrics()))
metrics := server.Service.getBatches()[0].GetMetrics()
fmt.Printf("Test %d emitted:\n%v\n", i, metrics)
}
15 changes: 9 additions & 6 deletions m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ var (

var protocols = []Protocol{Compact, Binary}

const internalMetrics = 5 // Additional metrics the reporter sends in a batch - use this, not a magic number.
const cardinalityMetrics = 3 // Additional metrics emitted by the scope registry.

// TestReporter tests the reporter works as expected with both compact and binary protocols
func TestReporter(t *testing.T) {
for _, protocol := range protocols {
Expand Down Expand Up @@ -116,9 +119,9 @@ func TestReporter(t *testing.T) {

// Validate metrics
emittedCounters := batches[0].GetMetrics()
require.Equal(t, 5, len(emittedCounters))
require.Equal(t, internalMetrics+1, len(emittedCounters))
emittedTimers := batches[1].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, internalMetrics+1, len(emittedTimers))

emittedCounter, emittedTimer := emittedCounters[0], emittedTimers[0]
if emittedCounter.GetName() == "my-timer" {
Expand Down Expand Up @@ -512,13 +515,13 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
c1 := r.AllocateCounter("counterWithTags", tags)

// Report the counter with tags to take the last slot.
wg.Add(5)
wg.Add(internalMetrics + 1)
c1.ReportCount(1)
r.Flush()
wg.Wait()

// Empty flush to ensure the copied metric is released.
wg.Add(4)
wg.Add(internalMetrics)
r.Flush()
for {
rep := r.(*reporter)
Expand All @@ -533,15 +536,15 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
c2 := r.AllocateCounter("counterWithNoTags", nil)

// Report the counter with no tags.
wg.Add(5)
wg.Add(internalMetrics + 1)
c2.ReportCount(1)
r.Flush()
wg.Wait()

// Verify that first reported counter has tags and the second
// reported counter has no tags.
metrics := server.Service.getMetrics()
require.Equal(t, 14, len(metrics)) // 2 test metrics, 4x3 internal metrics
require.Equal(t, 2+3*internalMetrics, len(metrics)) // 2 test metrics, 3 rounds of internal metrics

var filtered []m3thrift.Metric
for _, metric := range metrics {
Expand Down
14 changes: 7 additions & 7 deletions m3/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestScope(t *testing.T) {
require.NotNil(t, server.Service.getBatches()[0])

emittedTimers := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, internalMetrics+cardinalityMetrics+1, len(emittedTimers))
require.Equal(t, "honk.dazzle", emittedTimers[0].GetName())
}

Expand All @@ -107,9 +107,9 @@ func TestScopeCounter(t *testing.T) {
require.Equal(t, 1, len(server.Service.getBatches()))
require.NotNil(t, server.Service.getBatches()[0])

emittedTimers := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, "honk.foobar", emittedTimers[0].GetName())
emittedMetrics := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, internalMetrics+cardinalityMetrics+1, len(emittedMetrics))
require.Equal(t, "honk.foobar", emittedMetrics[cardinalityMetrics].GetName())
}

// TestScopeGauge tests that scope works as expected
Expand All @@ -132,9 +132,9 @@ func TestScopeGauge(t *testing.T) {
require.Equal(t, 1, len(server.Service.getBatches()))
require.NotNil(t, server.Service.getBatches()[0])

emittedTimers := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, "honk.foobaz", emittedTimers[0].GetName())
emittedMetrics := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, internalMetrics+cardinalityMetrics+1, len(emittedMetrics))
require.Equal(t, "honk.foobaz", emittedMetrics[cardinalityMetrics].GetName())
}

func BenchmarkScopeReportTimer(b *testing.B) {
Expand Down
21 changes: 11 additions & 10 deletions scope.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Uber Technologies, Inc.
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -95,14 +95,15 @@ type scope struct {

// ScopeOptions is a set of options to construct a scope.
type ScopeOptions struct {
Tags map[string]string
Prefix string
Reporter StatsReporter
CachedReporter CachedStatsReporter
Separator string
DefaultBuckets Buckets
SanitizeOptions *SanitizeOptions
registryShardCount uint
Tags map[string]string
Prefix string
Reporter StatsReporter
CachedReporter CachedStatsReporter
Separator string
DefaultBuckets Buckets
SanitizeOptions *SanitizeOptions
registryShardCount uint
skipInternalMetrics bool
}

// NewRootScope creates a new root Scope with a set of options and
Expand Down Expand Up @@ -172,7 +173,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
s.tags = s.copyAndSanitizeMap(opts.Tags)

// Register the root scope
s.registry = newScopeRegistryWithShardCount(s, opts.registryShardCount)
s.registry = newScopeRegistryWithShardCount(s, opts.registryShardCount, opts.skipInternalMetrics)

if interval > 0 {
s.wg.Add(1)
Expand Down
72 changes: 64 additions & 8 deletions scope_registry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Uber Technologies, Inc.
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand All @@ -22,48 +22,61 @@ package tally

import (
"hash/maphash"
"log"
"runtime"
"sync"
"unsafe"

"go.uber.org/atomic"
)

var scopeRegistryKey = keyForPrefixedStringMaps
var (
scopeRegistryKey = keyForPrefixedStringMaps

// Metrics related.
internalTags = map[string]string{"version": Version}
counterCardinalityName = "tally.internal.counter-cardinality"
gaugeCardinalityName = "tally.internal.gauge-cardinality"
histogramCardinalityName = "tally.internal.histogram-cardinality"
)

type scopeRegistry struct {
seed maphash.Seed
root *scope
// We need a subscope per GOPROC so that we can take advantage of all the cpu available to the application.
subscopes []*scopeBucket
// Toggles internal metrics reporting.
skipInternalMetrics bool
}

type scopeBucket struct {
mu sync.RWMutex
s map[string]*scope
}

func newScopeRegistryWithShardCount(root *scope, shardCount uint) *scopeRegistry {
func newScopeRegistryWithShardCount(root *scope, shardCount uint, skipInternalMetrics bool) *scopeRegistry {
if shardCount == 0 {
shardCount = uint(runtime.GOMAXPROCS(-1))
}

r := &scopeRegistry{
root: root,
subscopes: make([]*scopeBucket, shardCount),
seed: maphash.MakeSeed(),
root: root,
subscopes: make([]*scopeBucket, shardCount),
seed: maphash.MakeSeed(),
skipInternalMetrics: skipInternalMetrics,
}

for i := uint(0); i < shardCount; i++ {
r.subscopes[i] = &scopeBucket{
s: make(map[string]*scope),
}
r.subscopes[i].s[scopeRegistryKey(root.prefix, root.tags)] = root
}

return r
}

func (r *scopeRegistry) Report(reporter StatsReporter) {
defer r.purgeIfRootClosed()
r.reportInternalMetrics()

for _, subscopeBucket := range r.subscopes {
subscopeBucket.mu.RLock()
Expand All @@ -83,6 +96,7 @@ func (r *scopeRegistry) Report(reporter StatsReporter) {

func (r *scopeRegistry) CachedReport() {
defer r.purgeIfRootClosed()
r.reportInternalMetrics()

for _, subscopeBucket := range r.subscopes {
subscopeBucket.mu.RLock()
Expand Down Expand Up @@ -211,3 +225,45 @@ func (r *scopeRegistry) removeWithRLock(subscopeBucket *scopeBucket, key string)
defer subscopeBucket.mu.Unlock()
delete(subscopeBucket.s, key)
}

// Records internal Metrics' cardinalities.
func (r *scopeRegistry) reportInternalMetrics() {
if r.skipInternalMetrics {
return
}

counters, gauges, histograms := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}
rootCounters, rootGauges, rootHistograms := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}
r.ForEachScope(func(ss *scope) {
counterSliceLen, gaugeSliceLen, histogramSliceLen := int64(len(ss.countersSlice)), int64(len(ss.gaugesSlice)), int64(len(ss.histogramsSlice))
if ss.root { // Root scope is referenced across all buckets.
rootCounters.Store(counterSliceLen)
rootGauges.Store(gaugeSliceLen)
rootHistograms.Store(histogramSliceLen)
return
}
counters.Add(counterSliceLen)
gauges.Add(gaugeSliceLen)
histograms.Add(histogramSliceLen)
})

counters.Add(rootCounters.Load())
gauges.Add(rootGauges.Load())
histograms.Add(rootHistograms.Load())
log.Printf("counters: %v, gauges: %v, histograms: %v\n", counters.Load(), gauges.Load(), histograms.Load())

if r.root.reporter != nil {
r.root.reporter.ReportCounter(counterCardinalityName, internalTags, counters.Load())
r.root.reporter.ReportCounter(gaugeCardinalityName, internalTags, gauges.Load())
r.root.reporter.ReportCounter(histogramCardinalityName, internalTags, histograms.Load())
}

if r.root.cachedReporter != nil {
numCounters := r.root.cachedReporter.AllocateCounter(counterCardinalityName, internalTags)
numGauges := r.root.cachedReporter.AllocateCounter(gaugeCardinalityName, internalTags)
numHistograms := r.root.cachedReporter.AllocateCounter(histogramCardinalityName, internalTags)
numCounters.ReportCount(counters.Load())
numGauges.ReportCount(gauges.Load())
numHistograms.ReportCount(histograms.Load())
}
}
Loading

0 comments on commit c347bbf

Please sign in to comment.