diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index df40267e9ae8..9e933b87d4de 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3305,12 +3305,22 @@ ruler_remote_write_sigv4_config: # Deprecated: Use deletion_mode per tenant configuration instead. [allow_deletes: ] +# Define streams sharding behavior. shard_streams: - [enabled: ] + # Automatically shard streams to keep them under the per-stream rate limit. + # Sharding is dictated by the desired rate. + # CLI flag: -shard-streams.enabled + [enabled: | default = true] - [logging_enabled: ] + # Whether to log sharding streams behavior or not. Not recommended for + # production environments. + # CLI flag: -shard-streams.logging-enabled + [logging_enabled: | default = false] - [desired_rate: ] + # Threshold used to cut a new shard. Default (1536KB) means if a rate is above + # 1536KB/s, it will be sharded into two streams. + # CLI flag: -shard-streams.desired-rate + [desired_rate: | default = 1536KB] [blocked_queries: ] diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 268db96e897a..87036b5e23c3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -589,7 +589,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream) } -func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg *shardstreams.Config, stream logproto.Stream) []KeyedStream { +func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []KeyedStream { derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg) for i := 0; i < len(stream.Entries); i++ { @@ -601,7 +601,7 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in return derivedStreams } -func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg *shardstreams.Config) []KeyedStream { +func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []KeyedStream { var ( streamLabels = labelTemplate(stream.Labels, d.logger) streamPattern = streamLabels.String() @@ -809,7 +809,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, // based on the rate stored in the rate store and will store the new evaluated number of shards. // // desiredRate is expected to be given in bytes. -func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, pushSize int, tenantID string, streamShardcfg *shardstreams.Config) int { +func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, pushSize int, tenantID string, streamShardcfg shardstreams.Config) int { if streamShardcfg.DesiredRate.Val() <= 0 { if streamShardcfg.LoggingEnabled { level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String()) diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 05734db4184f..a207570c25d5 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -25,7 +25,7 @@ type Limits interface { DiscoverServiceName(userID string) []string DiscoverLogLevels(userID string) bool - ShardStreams(userID string) *shardstreams.Config + ShardStreams(userID string) shardstreams.Config IngestionRateStrategy() string IngestionRateBytes(userID string) float64 IngestionBurstSizeBytes(userID string) int diff --git a/pkg/distributor/ratestore_test.go b/pkg/distributor/ratestore_test.go index af9fa9f0adb7..5bfacf96ebd4 100644 --- a/pkg/distributor/ratestore_test.go +++ b/pkg/distributor/ratestore_test.go @@ -341,15 +341,15 @@ type fakeOverrides struct { func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits { return map[string]*validation.Limits{ "ingester0": { - ShardStreams: &shardstreams.Config{ + ShardStreams: shardstreams.Config{ Enabled: c.enabled, }, }, } } -func (c *fakeOverrides) ShardStreams(_ string) *shardstreams.Config { - return &shardstreams.Config{ +func (c *fakeOverrides) ShardStreams(_ string) shardstreams.Config { + return shardstreams.Config{ Enabled: c.enabled, } } diff --git a/pkg/distributor/shardstreams/config.go b/pkg/distributor/shardstreams/config.go index 1bf1f89f961c..5c39fcc28d6c 100644 --- a/pkg/distributor/shardstreams/config.go +++ b/pkg/distributor/shardstreams/config.go @@ -7,12 +7,13 @@ import ( ) type Config struct { - Enabled bool `yaml:"enabled" json:"enabled"` - LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled"` + Enabled bool `yaml:"enabled" json:"enabled" doc:"description=Automatically shard streams to keep them under the per-stream rate limit. Sharding is dictated by the desired rate."` + + LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled" doc:"description=Whether to log sharding streams behavior or not. Not recommended for production environments."` // DesiredRate is the threshold used to shard the stream into smaller pieces. // Expected to be in bytes. - DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate"` + DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate" doc:"description=Threshold used to cut a new shard. Default (1536KB) means if a rate is above 1536KB/s, it will be sharded into two streams."` } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) { diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index acc5864fc557..88b613aa8db2 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1049,7 +1049,7 @@ func (f fakeLimits) AllByUserID() map[string]*validation.Limits { func TestStreamShardingUsage(t *testing.T) { setupCustomTenantLimit := func(perStreamLimit string) *validation.Limits { - shardStreamsCfg := &shardstreams.Config{Enabled: true, LoggingEnabled: true} + shardStreamsCfg := shardstreams.Config{Enabled: true, LoggingEnabled: true} shardStreamsCfg.DesiredRate.Set("6MB") //nolint:errcheck customTenantLimits := &validation.Limits{} diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 193209a54f6b..94c77a30be7e 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -27,7 +27,7 @@ type Limits interface { MaxLocalStreamsPerUser(userID string) int MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit - ShardStreams(userID string) *shardstreams.Config + ShardStreams(userID string) shardstreams.Config } // Limiter implements primitives to get the maximum number of streams diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index aef0bd440d56..ff9c00e0ed59 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -97,7 +97,7 @@ type Config struct { CompactorConfig compactor.Config `yaml:"compactor,omitempty"` CompactorHTTPClient compactorclient.HTTPConfig `yaml:"compactor_client,omitempty" doc:"hidden"` CompactorGRPCClient compactorclient.GRPCConfig `yaml:"compactor_grpc_client,omitempty"` - LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + LimitsConfig validation.Limits `yaml:"limits_config"` Worker worker.Config `yaml:"frontend_worker,omitempty"` TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` MemberlistKV memberlist.KVConfig `yaml:"memberlist"` diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 27e6702dc988..a4280a2ee9e3 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -187,7 +187,7 @@ type Limits struct { // Deprecated CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes" doc:"deprecated|description=Use deletion_mode per tenant configuration instead."` - ShardStreams *shardstreams.Config `yaml:"shard_streams" json:"shard_streams"` + ShardStreams shardstreams.Config `yaml:"shard_streams" json:"shard_streams" doc:"description=Define streams sharding behavior."` BlockedQueries []*validation.BlockedQuery `yaml:"blocked_queries,omitempty" json:"blocked_queries,omitempty"` @@ -388,7 +388,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ), ) - l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint") @@ -900,7 +899,7 @@ func (o *Overrides) DeletionMode(userID string) string { return o.getOverridesForUser(userID).DeletionMode } -func (o *Overrides) ShardStreams(userID string) *shardstreams.Config { +func (o *Overrides) ShardStreams(userID string) shardstreams.Config { return o.getOverridesForUser(userID).ShardStreams }