Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Emit aggregate usage metrics #3123

Merged
merged 12 commits into from
Jan 27, 2021
15 changes: 9 additions & 6 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,9 +1404,10 @@ func (i *nsIndex) AggregateQuery(
query index.Query,
opts index.AggregationOptions,
) (index.AggregateQueryResult, error) {
id := i.nsMetadata.ID()
logFields := []opentracinglog.Field{
opentracinglog.String("query", query.String()),
opentracinglog.String("namespace", i.nsMetadata.ID().String()),
opentracinglog.String("namespace", id.String()),
opentracinglog.Int("seriesLimit", opts.SeriesLimit),
opentracinglog.Int("docsLimit", opts.DocsLimit),
xopentracing.Time("queryStart", opts.StartInclusive),
Expand All @@ -1417,13 +1418,15 @@ func (i *nsIndex) AggregateQuery(
sp.LogFields(logFields...)
defer sp.Finish()

metrics := index.NewAggregateUsageMetrics(id, i.opts.InstrumentOptions())
// Get results and set the filters, namespace ID and size limit.
results := i.aggregateResultsPool.Get()
aopts := index.AggregateResultsOptions{
SizeLimit: opts.SeriesLimit,
DocsLimit: opts.DocsLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
SizeLimit: opts.SeriesLimit,
DocsLimit: opts.DocsLimit,
FieldFilter: opts.FieldFilter,
Type: opts.Type,
AggregateUsageMetrics: metrics,
}
ctx.RegisterFinalizer(results)
// use appropriate fn to query underlying blocks.
Expand All @@ -1442,7 +1445,7 @@ func (i *nsIndex) AggregateQuery(
}
}
aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe()
results.Reset(i.nsMetadata.ID(), aopts)
results.Reset(id, aopts)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn, logFields)
if err != nil {
return index.AggregateQueryResult{}, err
Expand Down
107 changes: 102 additions & 5 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"math"
"sync"

"github.com/uber-go/tally"

"github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
)

Expand All @@ -42,10 +45,80 @@ type aggregatedResults struct {
idPool ident.Pool
bytesPool pool.CheckedBytesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool

pool AggregateResultsPool
valuesPool AggregateValuesPool
encodedDocReader docs.EncodedDocumentReader

iOpts instrument.Options
}

var _ AggregateUsageMetrics = (*usageMetrics)(nil)

type usageMetrics struct {
total tally.Counter

totalTerms tally.Counter
dedupedTerms tally.Counter

totalFields tally.Counter
dedupedFields tally.Counter
}

func (m *usageMetrics) IncTotal(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.total != nil {
m.total.Inc(val)
}
}

func (m *usageMetrics) IncTotalTerms(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.totalTerms != nil {
m.totalTerms.Inc(val)
}
}

func (m *usageMetrics) IncDedupedTerms(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.dedupedTerms != nil {
m.dedupedTerms.Inc(val)
}
}

func (m *usageMetrics) IncTotalFields(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.totalFields != nil {
m.totalFields.Inc(val)
}
}

func (m *usageMetrics) IncDedupedFields(val int64) {
// NB: if metrics not set, to valid values, no-op.
if m.dedupedFields != nil {
m.dedupedFields.Inc(val)
}
}

// NewAggregateUsageMetrics builds a new aggregated usage metrics.
func NewAggregateUsageMetrics(ns ident.ID, iOpts instrument.Options) AggregateUsageMetrics {
if ns == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm can we have a catch-all metric tag for these instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that, but figured that potentially having a bunch of metrics under namespace="undefined" would end up kinda confusing and not add much additional information; can add it if you think it would be worth having though 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, when is ns not set? Mostly curious if we want to account for these or not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly when we clear out or finalize the results, or if incorrectly initializing when taking an AggregateResults out of the pool, also happened a lot in tests so had to have some sensible defaults for it without updating all the tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rather not drop metrics. I don't think an undefined namespace is that confusing. it would allow you to sum across the metrics without namespace, which is probably the common case.

return &usageMetrics{}
}

scope := iOpts.MetricsScope()
buildCounter := func(val string) tally.Counter {
return scope.
Tagged(map[string]string{"type": val, "namespace": ns.String()}).
Counter("aggregated-results")
}

return &usageMetrics{
total: buildCounter("total"),
totalTerms: buildCounter("total-terms"),
dedupedTerms: buildCounter("deduped-terms"),
totalFields: buildCounter("total-fields"),
dedupedFields: buildCounter("deduped-fields"),
}
}

// NewAggregateResults returns a new AggregateResults object.
Expand All @@ -54,9 +127,14 @@ func NewAggregateResults(
aggregateOpts AggregateResultsOptions,
opts Options,
) AggregateResults {
if aggregateOpts.AggregateUsageMetrics == nil {
aggregateOpts.AggregateUsageMetrics = &usageMetrics{}
}

return &aggregatedResults{
nsID: namespaceID,
aggregateOpts: aggregateOpts,
iOpts: opts.InstrumentOptions(),
resultsMap: newAggregateResultsMap(opts.IdentifierPool()),
idPool: opts.IdentifierPool(),
bytesPool: opts.CheckedBytesPool(),
Expand All @@ -73,8 +151,11 @@ func (r *aggregatedResults) Reset(
) {
r.Lock()

r.aggregateOpts = aggregateOpts
if aggregateOpts.AggregateUsageMetrics == nil {
aggregateOpts.AggregateUsageMetrics = NewAggregateUsageMetrics(nsID, r.iOpts)
}

r.aggregateOpts = aggregateOpts
// finalize existing held nsID
if r.nsID != nil {
r.nsID.Finalize()
Expand All @@ -91,7 +172,6 @@ func (r *aggregatedResults) Reset(
valueMap := entry.Value()
valueMap.finalize()
}

// reset all keys in the map next
r.resultsMap.Reset()
r.totalDocsCount = 0
Expand All @@ -110,6 +190,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
r.Lock()
defer r.Unlock()

// NB: init total count with batch length, since each aggregated entry
// will have one field.
totalCount := len(batch)
for idx := 0; idx < len(batch); idx++ {
totalCount += len(batch[idx].Terms)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is total different than total terms?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this

}

r.aggregateOpts.AggregateUsageMetrics.IncTotal(int64(totalCount))
remainingDocs := math.MaxInt64
if r.aggregateOpts.DocsLimit != 0 {
remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount
Expand All @@ -119,7 +207,9 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
if remainingDocs <= 0 {
for idx := 0; idx < len(batch); idx++ {
batch[idx].Field.Finalize()
r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1)
for _, term := range batch[idx].Terms {
r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1)
term.Finalize()
}
}
Expand All @@ -138,9 +228,12 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
docs := 0
numInserts := 0
for _, entry := range batch {
r.aggregateOpts.AggregateUsageMetrics.IncTotalFields(1)

if docs >= remainingDocs || numInserts >= remainingInserts {
entry.Field.Finalize()
for _, term := range entry.Terms {
r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1)
term.Finalize()
}

Expand All @@ -154,6 +247,8 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)
aggValues, ok := r.resultsMap.Get(f)
if !ok {
if remainingInserts > numInserts {
r.aggregateOpts.AggregateUsageMetrics.IncDedupedFields(1)

numInserts++
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
Expand All @@ -175,12 +270,14 @@ func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int)

valuesMap := aggValues.Map()
for _, t := range entry.Terms {
r.aggregateOpts.AggregateUsageMetrics.IncTotalTerms(1)
if remainingDocs > docs {
docs++
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
if remainingInserts > numInserts {
r.aggregateOpts.AggregateUsageMetrics.IncDedupedTerms(1)
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
Expand Down
Loading