Skip to content

Commit

Permalink
kafka replay speed: rename CLI flags (#9345)
Browse files Browse the repository at this point in the history
* kafka replay speed: rename CLI flags

Make them a bit more consistent on what they mean and add better descriptions.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Clarify metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Rename flags

Co-authored-by: gotjosh <[email protected]>

* Update docs

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: gotjosh <[email protected]>
  • Loading branch information
dimitarvdimitrov and gotjosh committed Sep 20, 2024
1 parent fead2ce commit fb30f39
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 62 deletions.
42 changes: 21 additions & 21 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6650,53 +6650,53 @@
},
{
"kind": "field",
"name": "replay_concurrency",
"name": "fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup.",
"desc": "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.",
"fieldValue": null,
"fieldDefaultValue": 1,
"fieldFlag": "ingest-storage.kafka.replay-concurrency",
"fieldFlag": "ingest-storage.kafka.fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "replay_shards",
"name": "records_per_fetch",
"required": false,
"desc": "The number of concurrent appends to the TSDB head. 0 to disable.",
"desc": "The number of records to fetch from Kafka in a single request.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.replay-shards",
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "batch_size",
"name": "use_compressed_bytes_as_fetch_max_bytes",
"required": false,
"desc": "The number of timeseries to batch together before ingesting into TSDB.",
"desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.batch-size",
"fieldType": "int"
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "records_per_fetch",
"name": "ingestion_concurrency",
"required": false,
"desc": "The number of records to fetch from Kafka in a single request.",
"desc": "The number of concurrent ingestion streams to the TSDB head. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.records-per-fetch",
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ingestion-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "use_compressed_bytes_as_fetch_max_bytes",
"name": "ingestion_concurrency_batch_size",
"required": false,
"desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.",
"desc": "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes",
"fieldType": "boolean"
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.ingestion-concurrency-batch-size",
"fieldType": "int"
}
],
"fieldValue": null,
Expand Down
12 changes: 6 additions & 6 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1351,8 +1351,6 @@ Usage of ./cmd/mimir/mimir:
When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.
-ingest-storage.kafka.auto-create-topic-enabled
Enable auto-creation of Kafka topic if it doesn't exist. (default true)
-ingest-storage.kafka.batch-size int
The number of timeseries to batch together before ingesting into TSDB. (default 128)
-ingest-storage.kafka.client-id string
The Kafka client ID.
-ingest-storage.kafka.consume-from-position-at-startup string
Expand All @@ -1365,6 +1363,12 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency int
The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1)
-ingest-storage.kafka.ingestion-concurrency int
The number of concurrent ingestion streams to the TSDB head. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0. (default 128)
-ingest-storage.kafka.last-produced-offset-poll-interval duration
How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s)
-ingest-storage.kafka.last-produced-offset-retry-timeout duration
Expand All @@ -1377,10 +1381,6 @@ Usage of ./cmd/mimir/mimir:
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.replay-concurrency int
The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup. (default 1)
-ingest-storage.kafka.replay-shards int
The number of concurrent appends to the TSDB head. 0 to disable.
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
12 changes: 6 additions & 6 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,6 @@ Usage of ./cmd/mimir/mimir:
When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.
-ingest-storage.kafka.auto-create-topic-enabled
Enable auto-creation of Kafka topic if it doesn't exist. (default true)
-ingest-storage.kafka.batch-size int
The number of timeseries to batch together before ingesting into TSDB. (default 128)
-ingest-storage.kafka.client-id string
The Kafka client ID.
-ingest-storage.kafka.consume-from-position-at-startup string
Expand All @@ -435,6 +433,12 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency int
The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1)
-ingest-storage.kafka.ingestion-concurrency int
The number of concurrent ingestion streams to the TSDB head. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0. (default 128)
-ingest-storage.kafka.last-produced-offset-poll-interval duration
How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s)
-ingest-storage.kafka.last-produced-offset-retry-timeout duration
Expand All @@ -447,10 +451,6 @@ Usage of ./cmd/mimir/mimir:
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.replay-concurrency int
The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup. (default 1)
-ingest-storage.kafka.replay-shards int
The number of concurrent appends to the TSDB head. 0 to disable.
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
4 changes: 2 additions & 2 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ ingest_storage:
address: kafka_1:9092
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
replay_concurrency: 3
replay_shards: 8
fetch_concurrency: 3
ingestion_concurrency: 8

ingester:
track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing
Expand Down
23 changes: 12 additions & 11 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3860,18 +3860,10 @@ kafka:
# CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout
[wait_strong_read_consistency_timeout: <duration> | default = 20s]
# The number of concurrent fetch requests that the ingester sends to kafka
# The number of concurrent fetch requests that the ingester sends to Kafka
# when catching up during startup.
# CLI flag: -ingest-storage.kafka.replay-concurrency
[replay_concurrency: <int> | default = 1]
# The number of concurrent appends to the TSDB head. 0 to disable.
# CLI flag: -ingest-storage.kafka.replay-shards
[replay_shards: <int> | default = 0]
# The number of timeseries to batch together before ingesting into TSDB.
# CLI flag: -ingest-storage.kafka.batch-size
[batch_size: <int> | default = 128]
# CLI flag: -ingest-storage.kafka.fetch-concurrency
[fetch_concurrency: <int> | default = 1]
# The number of records to fetch from Kafka in a single request.
# CLI flag: -ingest-storage.kafka.records-per-fetch
Expand All @@ -3884,6 +3876,15 @@ kafka:
# CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
[use_compressed_bytes_as_fetch_max_bytes: <boolean> | default = true]
# The number of concurrent ingestion streams to the TSDB head. 0 to disable.
# CLI flag: -ingest-storage.kafka.ingestion-concurrency
[ingestion_concurrency: <int> | default = 0]
# The number of timeseries to batch together before ingesting into TSDB. This
# is only used when ingestion-concurrency is greater than 0.
# CLI flag: -ingest-storage.kafka.ingestion-concurrency-batch-size
[ingestion_concurrency_batch_size: <int> | default = 128]
migration:
# When both this option and ingest storage are enabled, distributors write to
# both Kafka and ingesters. A write request is considered successful only when
Expand Down
19 changes: 10 additions & 9 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ type KafkaConfig struct {
WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"`

// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`
ReplayConcurrency int `yaml:"replay_concurrency"`
ReplayShards int `yaml:"replay_shards"`
BatchSize int `yaml:"batch_size"`
RecordsPerFetch int `yaml:"records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
FallbackClientErrorSampleRate int64 `yaml:"-"`

FetchConcurrency int `yaml:"fetch_concurrency"`
RecordsPerFetch int `yaml:"records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
IngestionConcurrency int `yaml:"ingestion_concurrency"`
IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -131,11 +132,11 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
f.IntVar(&cfg.ReplayConcurrency, prefix+".replay-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup.")
f.IntVar(&cfg.ReplayShards, prefix+".replay-shards", 0, "The number of concurrent appends to the TSDB head. 0 to disable.")
f.IntVar(&cfg.BatchSize, prefix+".batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB.")
f.IntVar(&cfg.FetchConcurrency, prefix+".fetch-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.")
f.IntVar(&cfg.RecordsPerFetch, prefix+".records-per-fetch", 128, "The number of records to fetch from Kafka in a single request.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. 0 to disable.")
f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.")
}

func (cfg *KafkaConfig) Validate() error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics

return &pusherConsumerMetrics{
numTimeSeriesPerFlush: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_pusher_num_timeseries_per_flush",
Help: "Number of time series per flush",
Name: "cortex_ingester_pusher_num_timeseries_per_shard_flush",
Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than ingestion-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.",
NativeHistogramBucketFactor: 1.1,
}),
processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Expand Down Expand Up @@ -189,11 +189,11 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error {
}

func (c pusherConsumer) newStorageWriter() PusherCloser {
if c.kafkaConfig.ReplayShards == 0 {
if c.kafkaConfig.IngestionConcurrency == 0 {
return newSequentialStoragePusher(c.metrics, c.pusher)
}

return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.ReplayShards, c.kafkaConfig.BatchSize, c.logger)
return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger)
}

func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
return errors.Wrap(err, "starting service manager")
}

if r.kafkaCfg.ReplayConcurrency > 1 {
r.fetcher, err = newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.ReplayConcurrency, r.kafkaCfg.RecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics)
if r.kafkaCfg.FetchConcurrency > 1 {
r.fetcher, err = newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.FetchConcurrency, r.kafkaCfg.RecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics)
if err != nil {
return errors.Wrap(err, "creating concurrent fetchers")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ func createTestKafkaConfig(clusterAddr, topicName string) KafkaConfig {
cfg.Address = clusterAddr
cfg.Topic = topicName
cfg.WriteTimeout = 2 * time.Second
cfg.ReplayConcurrency = 2
cfg.FetchConcurrency = 2
cfg.RecordsPerFetch = 2

return cfg
Expand Down

0 comments on commit fb30f39

Please sign in to comment.