Skip to content

Commit

Permalink
[coordinator] Enable rule filtering on prom metric type (#3325)
Browse files Browse the repository at this point in the history
Today we only support filtering on m3 metric type.
  • Loading branch information
wesleyk authored Mar 4, 2021
1 parent c92b3bc commit 0068083
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 12 deletions.
6 changes: 3 additions & 3 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type SamplesAppenderResult struct {
// SampleAppenderOptions defines the options being used when constructing
// the samples appender for a metric.
type SampleAppenderOptions struct {
Override bool
OverrideRules SamplesAppenderOverrideRules
MetricType ts.M3MetricType
Override bool
OverrideRules SamplesAppenderOverrideRules
SeriesAttributes ts.SeriesAttributes
}

// SamplesAppenderOverrideRules provides override rules to
Expand Down
105 changes: 103 additions & 2 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,64 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing.
},
},
sampleAppenderOpts: &SampleAppenderOptions{
MetricType: ts.M3MetricTypeCounter,
SeriesAttributes: ts.SeriesAttributes{M3Type: ts.M3MetricTypeCounter},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
{
tags: map[string]string{
"app": "nginx_edge",
"endpoint": "health",
},
values: []expectedValue{{value: 30}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: 1 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

//nolint:dupl
func TestDownsamplerAggregationWithRulesConfigMappingRulesTypePromFilter(t *testing.T) {
t.Parallel()

gaugeMetric := testGaugeMetric{
tags: map[string]string{
"app": "nginx_edge",
"endpoint": "health",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
identTag: "endpoint",
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "__m3_prom_type__:counter",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 1 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
},
sampleAppenderOpts: &SampleAppenderOptions{
SeriesAttributes: ts.SeriesAttributes{PromType: ts.PromMetricTypeCounter},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
Expand Down Expand Up @@ -733,7 +790,51 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilterNoMatch(t *t
},
},
sampleAppenderOpts: &SampleAppenderOptions{
MetricType: ts.M3MetricTypeGauge,
SeriesAttributes: ts.SeriesAttributes{M3Type: ts.M3MetricTypeGauge},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

//nolint:dupl
func TestDownsamplerAggregationWithRulesConfigMappingRulesPromTypeFilterNoMatch(t *testing.T) {
t.Parallel()

gaugeMetric := testGaugeMetric{
tags: map[string]string{
"app": "nginx_edge",
"endpoint": "health",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
identTag: "endpoint",
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "__m3_prom_type__:counter",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 1 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
},
sampleAppenderOpts: &SampleAppenderOptions{
SeriesAttributes: ts.SeriesAttributes{PromType: ts.PromMetricTypeGauge},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
Expand Down
20 changes: 19 additions & 1 deletion src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,32 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
switch opts.SeriesAttributes.M3Type {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}
switch opts.SeriesAttributes.PromType {
case ts.PromMetricTypeUnknown:
tags.append(metric.M3PromTypeTag, metric.PromUnknownValue)
case ts.PromMetricTypeCounter:
tags.append(metric.M3PromTypeTag, metric.PromCounterValue)
case ts.PromMetricTypeGauge:
tags.append(metric.M3PromTypeTag, metric.PromGaugeValue)
case ts.PromMetricTypeHistogram:
tags.append(metric.M3PromTypeTag, metric.PromHistogramValue)
case ts.PromMetricTypeGaugeHistogram:
tags.append(metric.M3PromTypeTag, metric.PromGaugeHistogramValue)
case ts.PromMetricTypeSummary:
tags.append(metric.M3PromTypeTag, metric.PromSummaryValue)
case ts.PromMetricTypeInfo:
tags.append(metric.M3PromTypeTag, metric.PromInfoValue)
case ts.PromMetricTypeStateSet:
tags.append(metric.M3PromTypeTag, metric.PromStateSetValue)
}

// Sort tags
sort.Sort(tags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestSamplesAppenderPoolResetsTagsAcrossSamples(t *testing.T) {
// NB: expected ID is generated into human-readable form
// from tags in ForwardMatch mock above. Also include the m3 type, which is included when matching.
// nolint:scopelint
expected := fmt.Sprintf("__m3_type__-gauge,foo%d-bar%d", i, i)
expected := fmt.Sprintf("__m3_prom_type__-unknown,__m3_type__-gauge,foo%d-bar%d", i, i)
if expected != u.ID.String() {
// NB: if this fails, appender is holding state after Finalize.
return fmt.Errorf("expected ID %s, got %s", expected, u.ID.String())
Expand Down
4 changes: 3 additions & 1 deletion src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (d *downsamplerAndWriter) writeToDownsampler(
downsample.GraphiteIDSchemeTagValue)
}

// NB: we don't set series attributes on the sample appender options here.
// In practice this isn't needed because only the carbon ingest path comes through here.
var appenderOpts downsample.SampleAppenderOptions
if downsampleMappingRuleOverrides, ok := d.downsampleOverrideRules(overrides); ok {
appenderOpts = downsample.SampleAppenderOptions{
Expand Down Expand Up @@ -478,7 +480,7 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
}

opts := downsample.SampleAppenderOptions{
MetricType: value.Attributes.M3Type,
SeriesAttributes: value.Attributes,
}
if downsampleMappingRuleOverrides, ok := d.downsampleOverrideRules(overrides); ok {
opts = downsample.SampleAppenderOptions{
Expand Down
12 changes: 8 additions & 4 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,16 @@ func TestDownsampleAndWriteBatchDifferentTypes(t *testing.T) {

mockMetricsAppender.
EXPECT().
SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.M3MetricTypeCounter}).
Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1)
SamplesAppender(downsample.SampleAppenderOptions{
SeriesAttributes: ts.SeriesAttributes{M3Type: ts.M3MetricTypeCounter},
}).Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).
Times(1)
mockMetricsAppender.
EXPECT().
SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.M3MetricTypeTimer}).
Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1)
SamplesAppender(downsample.SampleAppenderOptions{
SeriesAttributes: ts.SeriesAttributes{M3Type: ts.M3MetricTypeTimer},
}).Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).
Times(1)
for _, tag := range testTags1.Tags {
mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value)
}
Expand Down
10 changes: 10 additions & 0 deletions src/metrics/metric/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,23 @@ var (
M3GaugeValue = []byte("gauge")
M3TimerValue = []byte("timer")

PromUnknownValue = []byte("unknown")
PromCounterValue = []byte("counter")
PromGaugeValue = []byte("gauge")
PromHistogramValue = []byte("histogram")
PromGaugeHistogramValue = []byte("gauge_histogram")
PromSummaryValue = []byte("summary")
PromInfoValue = []byte("info")
PromStateSetValue = []byte("state_set")

M3MetricsPrefix = []byte("__m3")
M3MetricsPrefixString = string(M3MetricsPrefix)

M3TypeTag = []byte(M3MetricsPrefixString + "_type__")
M3MetricsGraphiteAggregation = []byte(M3MetricsPrefixString + "_graphite_aggregation__")
M3MetricsGraphitePrefix = []byte(M3MetricsPrefixString + "_graphite_prefix__")
M3MetricsDropTimestamp = []byte(M3MetricsPrefixString + "_drop_timestamp__")
M3PromTypeTag = []byte(M3MetricsPrefixString + "_prom_type__")
)

func (t Type) String() string {
Expand Down

0 comments on commit 0068083

Please sign in to comment.