Skip to content

Commit

Permalink
chore: Add relabeling rules flags. (#3462)
Browse files Browse the repository at this point in the history
This adds flags for the relabeling rules and also makes sure they will
show up in the documentation properly.
  • Loading branch information
simonswine authored Aug 5, 2024
1 parent 0e62d09 commit 80e8ad3
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 40 deletions.
4 changes: 4 additions & 0 deletions cmd/pyroscope/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ Usage of ./pyroscope:
Per-tenant allowed ingestion burst size (in sample size). Units in MB. The burst size refers to the per-distributor local rate limiter, and should be set at least to the maximum profile size expected in a single push request. (default 2)
-distributor.ingestion-rate-limit-mb float
Per-tenant ingestion rate limit in sample size per second. Units in MB. (default 4)
-distributor.ingestion-relabeling-default-rules-position value
Position of the default ingestion relabeling rules in relation to relabel rules from overrides. Valid values are 'first', 'last' or 'disabled'. (default "first")
-distributor.ingestion-relabeling-rules value
List of ingestion relabel configurations. The relabeling rules work the same way, as those of [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). All rules are applied in the order they are specified. Note: In most situations, it is more effective to use relabeling directly in Grafana Alloy.
-distributor.ingestion-tenant-shard-size int
The tenant's shard size used by shuffle-sharding. Must be set both on ingesters and distributors. 0 disables shuffle sharding.
-distributor.push.timeout duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,8 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -validation.max-profile-symbol-value-length
[max_profile_symbol_value_length: <int> | default = 65535]
distributor_usage_groups:
# Duration of the distributor aggregation window. Requires aggregation period to
# be specified. 0 to disable.
# CLI flag: -distributor.aggregation-window
Expand All @@ -1853,9 +1855,30 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -distributor.aggregation-period
[distributor_aggregation_period: <duration> | default = 0s]
[ingestion_relabeling_rules: <relabel_config...> | default = ]
[ingestion_relabeling_default_rules_position: <string> | default = ""]
# List of ingestion relabel configurations. The relabeling rules work the same
# way, as those of
# [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config).
# All rules are applied in the order they are specified. Note: In most
# situations, it is more effective to use relabeling directly in Grafana Alloy.
# Example:
# This example consists of two rules, the first one will drop all profiles
# received with an label 'environment="secrets"' and the second rule will add
# a label 'powered_by="Grafana Labs"' to all profile series.
# ingestion_relabeling_rules:
# - action: drop
# regex: secret
# source_labels:
# - environment
# - action: replace
# replacement: grafana-labs
# target_label: powered_by
# CLI flag: -distributor.ingestion-relabeling-rules
[ingestion_relabeling_rules: <list of Configs> | default = []]
# Position of the default ingestion relabeling rules in relation to relabel
# rules from overrides. Valid values are 'first', 'last' or 'disabled'.
# CLI flag: -distributor.ingestion-relabeling-default-rules-position
[ingestion_relabeling_default_rules_position: <string> | default = "first"]
# The tenant's shard size used by shuffle-sharding. Must be set both on
# ingesters and distributors. 0 disables shuffle sharding.
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.Pu

// Next we split profiles by labels and apply relabel rules.
profileSeries, bytesRelabelDropped, profilesRelabelDropped := extractSampleSeries(req, tenantID, usageGroups, d.limits.IngestionRelabelingRules(tenantID))
validation.DiscardedBytes.WithLabelValues(string(validation.RelabelRules), tenantID).Add(bytesRelabelDropped)
validation.DiscardedProfiles.WithLabelValues(string(validation.RelabelRules), tenantID).Add(profilesRelabelDropped)
validation.DiscardedBytes.WithLabelValues(string(validation.DroppedByRelabelRules), tenantID).Add(bytesRelabelDropped)
validation.DiscardedProfiles.WithLabelValues(string(validation.DroppedByRelabelRules), tenantID).Add(profilesRelabelDropped)

// Filter our series and profiles without samples.
for _, series := range profileSeries {
Expand Down Expand Up @@ -786,7 +786,7 @@ func extractSampleSeries(req *distributormodel.PushRequest, tenantID string, usa
if !keep {
bytesRelabelDropped += float64(raw.Profile.SizeVT())
profilesRelabelDropped++ // in this case we dropped a whole profile
usageGroups.CountDiscardedBytes(string(validation.RelabelRules), int64(raw.Profile.SizeVT()))
usageGroups.CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(raw.Profile.SizeVT()))
continue
}
}
Expand All @@ -810,7 +810,7 @@ func extractSampleSeries(req *distributormodel.PushRequest, tenantID string, usa
if !keep {
droppedBytes := sampleSize(raw.Profile.Profile.StringTable, group.Samples)
bytesRelabelDropped += float64(droppedBytes)
usageGroups.CountDiscardedBytes(string(validation.RelabelRules), droppedBytes)
usageGroups.CountDiscardedBytes(string(validation.DroppedByRelabelRules), droppedBytes)
continue
}
}
Expand Down
28 changes: 12 additions & 16 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"gopkg.in/yaml.v3"

"github.com/grafana/pyroscope/pkg/phlaredb/block"
Expand All @@ -22,14 +21,6 @@ const (
MinCompactorPartialBlockDeletionDelay = 4 * time.Hour
)

type RulesPosition string

const (
RulePositionFirst RulesPosition = "first"
RulePositionDisabled RulesPosition = "disabled"
RulePositionLast RulesPosition = "last"
)

// Limits describe all the limits for tenants; can be used to describe global default
// limits via flags, or per-tenant limits via yaml config.
// NOTE: we use custom `model.Duration` instead of standard `time.Duration` because,
Expand Down Expand Up @@ -58,8 +49,8 @@ type Limits struct {
DistributorAggregationPeriod model.Duration `yaml:"distributor_aggregation_period" json:"distributor_aggregation_period"`

// IngestionRelabelingRules allow to specify additional relabeling rules that get applied before a profile gets ingested. There are some default relabeling rules, which ensure consistency of profiling series. The position of the default rules can be contolled by IngestionRelabelingDefaultRulesPosition
IngestionRelabelingRules []*relabel.Config `yaml:"ingestion_relabeling_rules" json:"ingestion_relabeling_rules"`
IngestionRelabelingDefaultRulesPosition RulesPosition `yaml:"ingestion_relabeling_default_rules_position" json:"ingestion_relabeling_default_rules_position"`
IngestionRelabelingRules RelabelRules `yaml:"ingestion_relabeling_rules" json:"ingestion_relabeling_rules" category:"advanced"`
IngestionRelabelingDefaultRulesPosition RelabelRulesPosition `yaml:"ingestion_relabeling_default_rules_position" json:"ingestion_relabeling_default_rules_position" category:"advanced"`

// The tenant shard size determines the how many ingesters a particular
// tenant will be sharded to. Needs to be specified on distributors for
Expand Down Expand Up @@ -173,6 +164,12 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

_ = l.RejectOlderThan.Set("1h")
f.Var(&l.RejectOlderThan, "validation.reject-older-than", "This limits how far into the past profiling data can be ingested. This limit is enforced in the distributor. 0 to disable, defaults to 1h.")

_ = l.IngestionRelabelingDefaultRulesPosition.Set("first")
f.Var(&l.IngestionRelabelingDefaultRulesPosition, "distributor.ingestion-relabeling-default-rules-position", "Position of the default ingestion relabeling rules in relation to relabel rules from overrides. Valid values are 'first', 'last' or 'disabled'.")
_ = l.IngestionRelabelingRules.Set("[]")
f.Var(&l.IngestionRelabelingRules, "distributor.ingestion-relabeling-rules", "List of ingestion relabel configurations. The relabeling rules work the same way, as those of [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). All rules are applied in the order they are specified. Note: In most situations, it is more effective to use relabeling directly in Grafana Alloy.")

}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand All @@ -198,11 +195,10 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// Validate validates that this limits config is valid.
func (l *Limits) Validate() error {

switch l.IngestionRelabelingDefaultRulesPosition {
case "", RulePositionFirst, RulePositionLast, RulePositionDisabled:
break
default:
return fmt.Errorf("invalid ingestion_relabeling_default_rules_position: %s", l.IngestionRelabelingDefaultRulesPosition)
if l.IngestionRelabelingDefaultRulesPosition != "" {
if err := l.IngestionRelabelingDefaultRulesPosition.Set(string(l.IngestionRelabelingDefaultRulesPosition)); err != nil {
return err
}
}

return nil
Expand Down
75 changes: 73 additions & 2 deletions pkg/validation/relabeling.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package validation

import (
"encoding/json"
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"gopkg.in/yaml.v3"
)

var (
Expand Down Expand Up @@ -50,11 +54,78 @@ var (
}
)

type RelabelRulesPosition string

func (p *RelabelRulesPosition) Set(s string) error {
switch sp := RelabelRulesPosition(s); sp {
case RelabelRulePositionFirst, RelabelRulePositionLast, RelabelRulePositionDisabled:
*p = sp
return nil
}
return fmt.Errorf("invalid ingestion_relabeling_default_rules_position: %s", s)
}

func (p *RelabelRulesPosition) String() string {
return string(*p)
}

const (
RelabelRulePositionFirst RelabelRulesPosition = "first"
RelabelRulePositionDisabled RelabelRulesPosition = "disabled"
RelabelRulePositionLast RelabelRulesPosition = "last"
)

type RelabelRules []*relabel.Config

func (p *RelabelRules) Set(s string) error {

v := []*relabel.Config{}
if err := yaml.Unmarshal([]byte(s), &v); err != nil {
return err
}

for idx, rule := range v {
if err := rule.Validate(); err != nil {
return fmt.Errorf("rule at pos %d is not valid: %w", idx, err)
}
}
*p = v
return nil
}

func (p RelabelRules) String() string {
yamlBytes, err := yaml.Marshal(p)
if err != nil {
panic(fmt.Errorf("error marshal yaml: %w", err))
}

temp := make([]interface{}, 0, len(p))
err = yaml.Unmarshal(yamlBytes, &temp)
if err != nil {
panic(fmt.Errorf("error unmarshal yaml: %w", err))
}

jsonBytes, err := json.Marshal(temp)
if err != nil {
panic(fmt.Errorf("error marshal json: %w", err))
}
return string(jsonBytes)
}

// ExampleDoc provides an example doc for this config, especially valuable since it's custom-unmarshaled.
func (r RelabelRules) ExampleDoc() (comment string, yaml interface{}) {
return `This example consists of two rules, the first one will drop all profiles received with an label 'environment="secrets"' and the second rule will add a label 'powered_by="Grafana Labs"' to all profile series.`,
[]map[string]interface{}{
{"action": "drop", "source_labels": []interface{}{"environment"}, "regex": "secret"},
{"action": "replace", "replacement": "grafana-labs", "target_label": "powered_by"},
}
}

func (o *Overrides) IngestionRelabelingRules(tenantID string) []*relabel.Config {
l := o.getOverridesForTenant(tenantID)

// return only custom rules when default rules are disabled
if l.IngestionRelabelingDefaultRulesPosition == RulePositionDisabled {
if l.IngestionRelabelingDefaultRulesPosition == RelabelRulePositionDisabled {
return l.IngestionRelabelingRules
}

Expand All @@ -65,7 +136,7 @@ func (o *Overrides) IngestionRelabelingRules(tenantID string) []*relabel.Config

rules := make([]*relabel.Config, 0, len(l.IngestionRelabelingRules)+len(defaultRelabelRules))

if l.IngestionRelabelingDefaultRulesPosition == "" || l.IngestionRelabelingDefaultRulesPosition == RulePositionFirst {
if l.IngestionRelabelingDefaultRulesPosition == "" || l.IngestionRelabelingDefaultRulesPosition == RelabelRulePositionFirst {
rules = append(rules, defaultRelabelRules...)
return append(rules, l.IngestionRelabelingRules...)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
QueryMissingTimeRange Reason = "missing_time_range"

// Those profiles were dropped because of relabeling rules
RelabelRules Reason = "dropped_by_relabel_rules"
DroppedByRelabelRules Reason = "dropped_by_relabel_rules"

SeriesLimitErrorMsg = "Maximum active series limit exceeded (%d/%d), reduce the number of active streams (reduce labels or reduce label values), or contact your administrator to see if the limit can be increased"
MissingLabelsErrorMsg = "error at least one label pair is required per profile"
Expand Down
19 changes: 5 additions & 14 deletions tools/doc-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func generateBlocksMarkdown(blocks []*parse.ConfigBlock) string {
return md.string()
}

//nolint:unused
func generateBlockMarkdown(blocks []*parse.ConfigBlock, blockName, fieldName string) string {
// Look for the requested block.
for _, block := range blocks {
Expand Down Expand Up @@ -159,21 +160,11 @@ func main() {

// Generate documentation markdown.
data := struct {
ConfigFile string
BlocksStorageConfigBlock string
StoreGatewayConfigBlock string
CompactorConfigBlock string
QuerierConfigBlock string
S3SSEConfigBlock string
GeneratedFileWarning string
ConfigFile string
GeneratedFileWarning string
}{
ConfigFile: generateBlocksMarkdown(blocks),
BlocksStorageConfigBlock: generateBlockMarkdown(blocks, "blocks_storage_config", "blocks_storage"),
StoreGatewayConfigBlock: generateBlockMarkdown(blocks, "store_gateway_config", "store_gateway"),
CompactorConfigBlock: generateBlockMarkdown(blocks, "compactor_config", "compactor"),
QuerierConfigBlock: generateBlockMarkdown(blocks, "querier_config", "querier"),
S3SSEConfigBlock: generateBlockMarkdown(blocks, "s3_sse_config", "sse"),
GeneratedFileWarning: "<!-- DO NOT EDIT THIS FILE - This file has been automatically generated from its .template -->",
ConfigFile: generateBlocksMarkdown(blocks),
GeneratedFileWarning: "<!-- DO NOT EDIT THIS FILE - This file has been automatically generated from its .template -->",
}

// Load the template file.
Expand Down

0 comments on commit 80e8ad3

Please sign in to comment.