Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Emit aggregate usage metrics #3123

Merged
merged 12 commits into from
Jan 27, 2021
76 changes: 73 additions & 3 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"math"
"sync"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
)

Expand All @@ -42,10 +45,56 @@ type aggregatedResults struct {
idPool ident.Pool
bytesPool pool.CheckedBytesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool
encodedDocReader docs.EncodedDocumentReader

iOpts instrument.Options
metrics usageMetrics
}

type usageMetrics struct {
total tally.Counter

totalTerms tally.Counter
dedupedTerms tally.Counter

totalFields tally.Counter
dedupedFields tally.Counter
}

type noopCounter struct{}

var _ tally.Counter = (*noopCounter)(nil)

func (*noopCounter) Inc(_ int64) {}

func newUsageMetrics(ns ident.ID, iOpts instrument.Options) usageMetrics {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to reset this with every aggregate result? Is the namespace important enough?

if ns == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm can we have a catch-all metric tag for these instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that, but figured that potentially having a bunch of metrics under namespace="undefined" would end up kinda confusing and not add much additional information; can add it if you think it would be worth having though 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, when is ns not set? Mostly curious if we want to account for these or not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly when we clear out or finalize the results, or if incorrectly initializing when taking an AggregateResults out of the pool, also happened a lot in tests so had to have some sensible defaults for it without updating all the tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rather not drop metrics. I don't think an undefined namespace is that confusing. it would allow you to sum across the metrics without namespace, which is probably the common case.

noop := &noopCounter{}
return usageMetrics{
total: noop,
totalTerms: noop,
dedupedTerms: noop,
totalFields: noop,
dedupedFields: noop,
}
}

scope := iOpts.MetricsScope()
buildCounter := func(val string) tally.Counter {
return scope.
Tagged(map[string]string{"type": val, "namespace": ns.String()}).
Counter("aggregated-results")
}

return usageMetrics{
total: buildCounter("total"),
totalTerms: buildCounter("total-terms"),
dedupedTerms: buildCounter("deduped-terms"),
totalFields: buildCounter("total-fields"),
dedupedFields: buildCounter("deduped-fields"),
}
}

// NewAggregateResults returns a new AggregateResults object.
Expand All @@ -54,14 +103,17 @@ func NewAggregateResults(
aggregateOpts AggregateResultsOptions,
opts Options,
) AggregateResults {
iOpts := opts.InstrumentOptions()
return &aggregatedResults{
nsID: namespaceID,
aggregateOpts: aggregateOpts,
iOpts: iOpts,
resultsMap: newAggregateResultsMap(opts.IdentifierPool()),
idPool: opts.IdentifierPool(),
bytesPool: opts.CheckedBytesPool(),
pool: opts.AggregateResultsPool(),
valuesPool: opts.AggregateValuesPool(),
metrics: newUsageMetrics(namespaceID, iOpts),
}
}

Expand Down Expand Up @@ -97,6 +149,7 @@ func (r *aggregatedResults) Reset(
r.totalDocsCount = 0
r.size = 0

r.metrics = newUsageMetrics(nsID, r.iOpts)
// NB: could do keys+value in one step but I'm trying to avoid
// using an internal method of a code-gen'd type.
r.Unlock()
Expand All @@ -110,6 +163,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
r.Lock()
defer r.Unlock()

// NB: init total count with batch length, since each aggregated entry
// will have one field.
totalCount := len(batch)
for _, entry := range batch {
totalCount += len(entry.Terms)
}

r.metrics.total.Inc(int64(totalCount))
remainingDocs := int(math.MaxInt64)
if r.aggregateOpts.DocsLimit != 0 {
remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount
Expand All @@ -119,7 +180,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
if remainingDocs <= 0 {
for idx := 0; idx < len(batch); idx++ {
batch[idx].Field.Finalize()
r.metrics.totalFields.Inc(1)
for _, term := range batch[idx].Terms {
r.metrics.totalTerms.Inc(1)
term.Finalize()
}
}
Expand All @@ -138,9 +201,12 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
docs := 0
numInserts := 0
for _, entry := range batch {
r.metrics.totalFields.Inc(1)

if docs >= remainingDocs || numInserts >= remainingInserts {
entry.Field.Finalize()
for _, term := range entry.Terms {
r.metrics.totalTerms.Inc(1)
term.Finalize()
}

Expand All @@ -154,6 +220,8 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
aggValues, ok := r.resultsMap.Get(f)
if !ok {
if remainingInserts > numInserts {
r.metrics.dedupedFields.Inc(1)

numInserts++
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
Expand All @@ -175,12 +243,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)

valuesMap := aggValues.Map()
for _, t := range entry.Terms {
r.metrics.totalTerms.Inc(1)
if remainingDocs > docs {
docs++
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
if remainingInserts > numInserts {
r.metrics.dedupedTerms.Inc(1)
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
Expand Down
Loading