Skip to content

Commit

Permalink
chore: Call shardstreams.Config by value instead of by reference (#…
Browse files Browse the repository at this point in the history
…12915)

Use `shardstreams.Config` by value instead of by reference to fix docs generation.
Our `docs-generator` tool relies on the struct address/references to assume that flags are present. Using this config by value fixes it.

(cherry picked from commit afd9e36)
  • Loading branch information
DylanGuedes committed May 8, 2024
1 parent 167b468 commit 8c50692
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 19 deletions.
16 changes: 13 additions & 3 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3281,12 +3281,22 @@ ruler_remote_write_sigv4_config:
# Deprecated: Use deletion_mode per tenant configuration instead.
[allow_deletes: <boolean>]

# Define streams sharding behavior.
shard_streams:
[enabled: <boolean>]
# Automatically shard streams to keep them under the per-stream rate limit.
# Sharding is dictated by the desired rate.
# CLI flag: -shard-streams.enabled
[enabled: <boolean> | default = true]

[logging_enabled: <boolean>]
# Whether to log sharding streams behavior or not. Not recommended for
# production environments.
# CLI flag: -shard-streams.logging-enabled
[logging_enabled: <boolean> | default = false]

[desired_rate: <int>]
# Threshold used to cut a new shard. Default (1536KB) means if a rate is above
# 1536KB/s, it will be sharded into two streams.
# CLI flag: -shard-streams.desired-rate
[desired_rate: <int> | default = 1536KB]

[blocked_queries: <blocked_query...>]

Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream)
}

func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg *shardstreams.Config, stream logproto.Stream) []KeyedStream {
func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []KeyedStream {
derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg)

for i := 0; i < len(stream.Entries); i++ {
Expand All @@ -571,7 +571,7 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in
return derivedStreams
}

func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg *shardstreams.Config) []KeyedStream {
func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []KeyedStream {
var (
streamLabels = labelTemplate(stream.Labels, d.logger)
streamPattern = streamLabels.String()
Expand Down Expand Up @@ -779,7 +779,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
// based on the rate stored in the rate store and will store the new evaluated number of shards.
//
// desiredRate is expected to be given in bytes.
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, pushSize int, tenantID string, streamShardcfg *shardstreams.Config) int {
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, pushSize int, tenantID string, streamShardcfg shardstreams.Config) int {
if streamShardcfg.DesiredRate.Val() <= 0 {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Limits interface {
DiscoverServiceName(userID string) []string
DiscoverLogLevels(userID string) bool

ShardStreams(userID string) *shardstreams.Config
ShardStreams(userID string) shardstreams.Config
IngestionRateStrategy() string
IngestionRateBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/ratestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,15 @@ type fakeOverrides struct {
func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits {
return map[string]*validation.Limits{
"ingester0": {
ShardStreams: &shardstreams.Config{
ShardStreams: shardstreams.Config{
Enabled: c.enabled,
},
},
}
}

func (c *fakeOverrides) ShardStreams(_ string) *shardstreams.Config {
return &shardstreams.Config{
func (c *fakeOverrides) ShardStreams(_ string) shardstreams.Config {
return shardstreams.Config{
Enabled: c.enabled,
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/distributor/shardstreams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
)

type Config struct {
Enabled bool `yaml:"enabled" json:"enabled"`
LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled"`
Enabled bool `yaml:"enabled" json:"enabled" doc:"description=Automatically shard streams to keep them under the per-stream rate limit. Sharding is dictated by the desired rate."`

LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled" doc:"description=Whether to log sharding streams behavior or not. Not recommended for production environments."`

// DesiredRate is the threshold used to shard the stream into smaller pieces.
// Expected to be in bytes.
DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate"`
DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate" doc:"description=Threshold used to cut a new shard. Default (1536KB) means if a rate is above 1536KB/s, it will be sharded into two streams."`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ func (f fakeLimits) AllByUserID() map[string]*validation.Limits {

func TestStreamShardingUsage(t *testing.T) {
setupCustomTenantLimit := func(perStreamLimit string) *validation.Limits {
shardStreamsCfg := &shardstreams.Config{Enabled: true, LoggingEnabled: true}
shardStreamsCfg := shardstreams.Config{Enabled: true, LoggingEnabled: true}
shardStreamsCfg.DesiredRate.Set("6MB") //nolint:errcheck

customTenantLimits := &validation.Limits{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Limits interface {
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
ShardStreams(userID string) *shardstreams.Config
ShardStreams(userID string) shardstreams.Config
}

// Limiter implements primitives to get the maximum number of streams
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Config struct {
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
CompactorHTTPClient compactorclient.HTTPConfig `yaml:"compactor_client,omitempty" doc:"hidden"`
CompactorGRPCClient compactorclient.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Expand Down
13 changes: 10 additions & 3 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type Limits struct {
// Deprecated
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes" doc:"deprecated|description=Use deletion_mode per tenant configuration instead."`

ShardStreams *shardstreams.Config `yaml:"shard_streams" json:"shard_streams"`
ShardStreams shardstreams.Config `yaml:"shard_streams" json:"shard_streams" doc:"description=Define streams sharding behavior."`

BlockedQueries []*validation.BlockedQuery `yaml:"blocked_queries,omitempty" json:"blocked_queries,omitempty"`

Expand Down Expand Up @@ -376,7 +376,14 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
),
)

l.ShardStreams = &shardstreams.Config{}
_ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize)
f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size",
fmt.Sprintf(
"Experimental. The maximum bloom size per log stream. A log stream whose generated bloom filter exceeds this size will be discarded. A value of 0 sets an unlimited size. Default is %s.",
defaultBloomCompactorMaxBloomSize,
),
)

l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)

f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint")
Expand Down Expand Up @@ -879,7 +886,7 @@ func (o *Overrides) DeletionMode(userID string) string {
return o.getOverridesForUser(userID).DeletionMode
}

func (o *Overrides) ShardStreams(userID string) *shardstreams.Config {
func (o *Overrides) ShardStreams(userID string) shardstreams.Config {
return o.getOverridesForUser(userID).ShardStreams
}

Expand Down

0 comments on commit 8c50692

Please sign in to comment.