Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Tag cache size as part of internal metrics #189

Merged
merged 25 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/cache/tag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (c *TagCache) Set(key uint64, tslice []m3thrift.MetricTag) []m3thrift.Metri
return tslice
}

// Len returns the size of the cache,
func (c *TagCache) Len() int {
c.mtx.RLock()
defer c.mtx.RUnlock()
return len(c.entries)
}

// TagMapKey generates a new key based on tags.
func TagMapKey(tags map[string]string) uint64 {
return identity.StringStringMap(tags)
Expand Down
121 changes: 75 additions & 46 deletions m3/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"time"

"github.com/pkg/errors"
tally "github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/internal/cache"
customtransport "github.com/uber-go/tally/v4/m3/customtransports"
m3thrift "github.com/uber-go/tally/v4/m3/thrift/v2"
Expand Down Expand Up @@ -126,13 +126,14 @@ type reporter struct {
tagCache *cache.TagCache
wg sync.WaitGroup

batchSizeHistogram tally.CachedHistogram
numBatches atomic.Int64
numBatchesCounter tally.CachedCount
numMetrics atomic.Int64
numMetricsCounter tally.CachedCount
numWriteErrors atomic.Int64
numWriteErrorsCounter tally.CachedCount
batchSizeHistogram tally.CachedHistogram
numBatches atomic.Int64
numBatchesCounter tally.CachedCount
numMetrics atomic.Int64
numMetricsCounter tally.CachedCount
numWriteErrors atomic.Int64
numWriteErrorsCounter tally.CachedCount
numTagCardinalityCounter tally.CachedCount
}

// Options is a set of options for the M3 reporter.
Expand Down Expand Up @@ -226,10 +227,12 @@ func NewReporter(opts Options) (Reporter, error) {
}

for k, v := range tagm {
tags = append(tags, m3thrift.MetricTag{
Name: k,
Value: v,
})
tags = append(
tags, m3thrift.MetricTag{
Name: k,
Value: v,
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
}

// Calculate size of common tags
Expand Down Expand Up @@ -261,10 +264,12 @@ func NewReporter(opts Options) (Reporter, error) {
return nil, errCommonTagSize
}

buckets := tally.ValueBuckets(append(
[]float64{0.0},
tally.MustMakeExponentialValueBuckets(2.0, 2.0, 11)...,
))
buckets := tally.ValueBuckets(
append(
[]float64{0.0},
tally.MustMakeExponentialValueBuckets(2.0, 2.0, 11)...,
),
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved

r := &reporter{
buckets: tally.BucketPairs(buckets),
Expand All @@ -291,6 +296,7 @@ func NewReporter(opts Options) (Reporter, error) {
r.numBatchesCounter = r.AllocateCounter("tally.internal.num-batches", internalTags)
r.numMetricsCounter = r.AllocateCounter("tally.internal.num-metrics", internalTags)
r.numWriteErrorsCounter = r.AllocateCounter("tally.internal.num-write-errors", internalTags)
r.numTagCardinalityCounter = r.AllocateCounter("tally.internal.tag-cardinality", internalTags)

r.wg.Add(1)
go func() {
Expand Down Expand Up @@ -373,10 +379,12 @@ func (r *reporter) AllocateHistogram(
) tally.CachedHistogram {
var (
_, isDuration = buckets.(tally.DurationBuckets)
bucketIDLen = int(math.Max(
float64(ndigits(buckets.Len())),
float64(_minMetricBucketIDTagLength),
))
bucketIDLen = int(
math.Max(
float64(ndigits(buckets.Len())),
float64(_minMetricBucketIDTagLength),
),
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
bucketIDFmt = "%0" + strconv.Itoa(bucketIDLen) + "d"
cachedValueBuckets []cachedHistogramBucket
cachedDurationBuckets []cachedHistogramBucket
Expand Down Expand Up @@ -631,10 +639,12 @@ func (r *reporter) flush(mets []m3thrift.Metric) []m3thrift.Metric {

r.numBatches.Inc()

err := r.client.EmitMetricBatchV2(m3thrift.MetricBatch{
Metrics: mets,
CommonTags: r.commonTags,
})
err := r.client.EmitMetricBatchV2(
m3thrift.MetricBatch{
Metrics: mets,
CommonTags: r.commonTags,
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
r.numWriteErrors.Inc()
}
Expand All @@ -655,10 +665,12 @@ func (r *reporter) convertTags(tags map[string]string) []m3thrift.MetricTag {
if !ok {
mtags = r.resourcePool.getMetricTagSlice()
for k, v := range tags {
mtags = append(mtags, m3thrift.MetricTag{
Name: r.stringInterner.Intern(k),
Value: r.stringInterner.Intern(v),
})
mtags = append(
mtags, m3thrift.MetricTag{
Name: r.stringInterner.Intern(k),
Value: r.stringInterner.Intern(v),
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
}
mtags = r.tagCache.Set(key, mtags)
}
Expand All @@ -674,9 +686,11 @@ func (r *reporter) reportInternalMetrics() {
batchSize = float64(metrics) / float64(batches)
)

bucket := sort.Search(len(r.buckets), func(i int) bool {
return r.buckets[i].UpperBoundValue() >= batchSize
})
bucket := sort.Search(
len(r.buckets), func(i int) bool {
return r.buckets[i].UpperBoundValue() >= batchSize
},
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above: revert


var value float64
if bucket < len(r.buckets) {
Expand All @@ -689,12 +703,19 @@ func (r *reporter) reportInternalMetrics() {
r.numBatchesCounter.ReportCount(batches)
r.numMetricsCounter.ReportCount(metrics)
r.numWriteErrorsCounter.ReportCount(writeErrors)
r.numTagCardinalityCounter.ReportCount(int64(r.tagCache.Len()))
}

func (r *reporter) timeLoop() {
t := time.NewTicker(_timeResolution)
defer t.Stop()
for !r.done.Load() {
r.now.Store(time.Now().UnixNano())
time.Sleep(_timeResolution)
select {
case <-t.C:
case <-r.donech:
return
}
}
}

Expand Down Expand Up @@ -739,9 +760,11 @@ func (h cachedHistogram) ValueBucket(
) tally.CachedHistogramBucket {
var (
n = len(h.cachedValueBuckets)
idx = sort.Search(n, func(i int) bool {
return h.cachedValueBuckets[i].valueUpperBound >= bucketUpperBound
})
idx = sort.Search(
n, func(i int) bool {
return h.cachedValueBuckets[i].valueUpperBound >= bucketUpperBound
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
)

if idx == n {
Expand All @@ -758,10 +781,12 @@ func (h cachedHistogram) ValueBucket(
rep = cm.reporter
)

return reportSamplesFunc(func(value int64) {
m.Value.Count = value
rep.reportCopyMetric(m, size, bucket, bucketID)
})
return reportSamplesFunc(
func(value int64) {
m.Value.Count = value
rep.reportCopyMetric(m, size, bucket, bucketID)
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
}

func (h cachedHistogram) DurationBucket(
Expand All @@ -770,9 +795,11 @@ func (h cachedHistogram) DurationBucket(
) tally.CachedHistogramBucket {
var (
n = len(h.cachedDurationBuckets)
idx = sort.Search(n, func(i int) bool {
return h.cachedDurationBuckets[i].durationUpperBound >= bucketUpperBound
})
idx = sort.Search(
n, func(i int) bool {
return h.cachedDurationBuckets[i].durationUpperBound >= bucketUpperBound
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
)

if idx == n {
Expand All @@ -789,10 +816,12 @@ func (h cachedHistogram) DurationBucket(
rep = cm.reporter
)

return reportSamplesFunc(func(value int64) {
m.Value.Count = value
rep.reportCopyMetric(m, size, bucket, bucketID)
})
return reportSamplesFunc(
func(value int64) {
m.Value.Count = value
rep.reportCopyMetric(m, size, bucket, bucketID)
},
)
brawndou marked this conversation as resolved.
Show resolved Hide resolved
}

type cachedHistogramBucket struct {
Expand Down
2 changes: 1 addition & 1 deletion m3/reporter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func testProcessFlushOnExit(t *testing.T, i int) {

require.Equal(t, 1, len(server.Service.getBatches()))
require.NotNil(t, server.Service.getBatches()[0])
require.Equal(t, 7, len(server.Service.getBatches()[0].GetMetrics()))
require.Equal(t, 8, len(server.Service.getBatches()[0].GetMetrics()))
metrics := server.Service.getBatches()[0].GetMetrics()
fmt.Printf("Test %d emitted:\n%v\n", i, metrics)
}
13 changes: 7 additions & 6 deletions m3/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
var (
localListenAddr = &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)}
defaultCommonTags = map[string]string{"env": "test", "host": "test"}
internalMetrics = 5 // Additional metrics the reporter sends in a batch - USE THIS IN TESTS, NOT A MAGIC NUMBER.
brawndou marked this conversation as resolved.
Show resolved Hide resolved
)

var protocols = []Protocol{Compact, Binary}
Expand Down Expand Up @@ -116,9 +117,9 @@ func TestReporter(t *testing.T) {

// Validate metrics
emittedCounters := batches[0].GetMetrics()
require.Equal(t, 5, len(emittedCounters))
require.Equal(t, 6, len(emittedCounters))
brawndou marked this conversation as resolved.
Show resolved Hide resolved
emittedTimers := batches[1].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, internalMetrics+1, len(emittedTimers))

emittedCounter, emittedTimer := emittedCounters[0], emittedTimers[0]
if emittedCounter.GetName() == "my-timer" {
Expand Down Expand Up @@ -512,13 +513,13 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
c1 := r.AllocateCounter("counterWithTags", tags)

// Report the counter with tags to take the last slot.
wg.Add(5)
wg.Add(internalMetrics + 1)
c1.ReportCount(1)
r.Flush()
wg.Wait()

// Empty flush to ensure the copied metric is released.
wg.Add(4)
wg.Add(internalMetrics)
r.Flush()
for {
rep := r.(*reporter)
Expand All @@ -533,15 +534,15 @@ func TestReporterResetTagsAfterReturnToPool(t *testing.T) {
c2 := r.AllocateCounter("counterWithNoTags", nil)

// Report the counter with no tags.
wg.Add(5)
wg.Add(internalMetrics + 1)
c2.ReportCount(1)
r.Flush()
wg.Wait()

// Verify that first reported counter has tags and the second
// reported counter has no tags.
metrics := server.Service.getMetrics()
require.Equal(t, 14, len(metrics)) // 2 test metrics, 4x3 internal metrics
require.Equal(t, 2+3*internalMetrics, len(metrics)) // 2 test metrics, 3 rounds of internal metrics

var filtered []m3thrift.Metric
for _, metric := range metrics {
Expand Down
6 changes: 3 additions & 3 deletions m3/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestScope(t *testing.T) {
require.NotNil(t, server.Service.getBatches()[0])

emittedTimers := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, internalMetrics+1, len(emittedTimers))
require.Equal(t, "honk.dazzle", emittedTimers[0].GetName())
}

Expand All @@ -108,7 +108,7 @@ func TestScopeCounter(t *testing.T) {
require.NotNil(t, server.Service.getBatches()[0])

emittedTimers := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, internalMetrics+1, len(emittedTimers))
require.Equal(t, "honk.foobar", emittedTimers[0].GetName())
}

Expand All @@ -133,7 +133,7 @@ func TestScopeGauge(t *testing.T) {
require.NotNil(t, server.Service.getBatches()[0])

emittedTimers := server.Service.getBatches()[0].GetMetrics()
require.Equal(t, 5, len(emittedTimers))
require.Equal(t, internalMetrics+1, len(emittedTimers))
require.Equal(t, "honk.foobaz", emittedTimers[0].GetName())
}

Expand Down
56 changes: 55 additions & 1 deletion scope_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Uber Technologies, Inc.
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -578,6 +578,60 @@ func TestHistogramSharedBucketMetrics(t *testing.T) {
require.Equal(t, 0, len(unseen), fmt.Sprintf("%v", unseen))
}

func TestConcurrentUpdates(t *testing.T) {
var (
r = newTestStatsReporter()
wg = &sync.WaitGroup{}
workerCount = 20
scopeCount = 4
countersPerScope = 4
counterIncrs = 5000
rs = newRootScope(
ScopeOptions{
Prefix: "",
Tags: nil,
CachedReporter: r,
}, 0,
)
scopes = []Scope{rs}
counters []Counter
)

// Instantiate Subscopes.
for i := 1; i < scopeCount; i++ {
scopes = append(scopes, rs.SubScope(fmt.Sprintf("subscope_%d", i)))
}

// Instantiate Counters.
for sNum, s := range scopes {
for cNum := 0; cNum < countersPerScope; cNum++ {
counters = append(counters, s.Counter(fmt.Sprintf("scope_%d_counter_%d", sNum, cNum)))
}
}

// Instantiate workers.
r.cg.Add(scopeCount * countersPerScope)
for worker := 0; worker < workerCount; worker++ {
wg.Add(1)
go func() {
defer wg.Done()
// Counter should have counterIncrs * workerCount.
for i := 0; i < counterIncrs*len(counters); i++ {
counters[i%len(counters)].Inc(1)
}
}()
}

wg.Wait()
rs.reportRegistry()
r.WaitAll()

wantVal := int64(workerCount * counterIncrs)
for _, gotCounter := range r.getCounters() {
assert.Equal(t, gotCounter.val, wantVal)
}
}

func TestCounterSanitized(t *testing.T) {
r := newTestStatsReporter()

Expand Down