From c0d740cb13669d15d44d73016d6388b4675b879d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 20 Sep 2024 15:10:42 +0200 Subject: [PATCH] kafka replay speed: rename CLI flags (#9345) * kafka replay speed: rename CLI flags Make them a bit more consistent on what they mean and add better descriptions. Signed-off-by: Dimitar Dimitrov * Clarify metrics Signed-off-by: Dimitar Dimitrov * Rename flags Co-authored-by: gotjosh * Update docs Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov Co-authored-by: gotjosh --- cmd/mimir/config-descriptor.json | 42 +++++++++---------- cmd/mimir/help-all.txt.tmpl | 12 +++--- cmd/mimir/help.txt.tmpl | 12 +++--- .../mimir-ingest-storage/config/mimir.yaml | 4 +- .../configuration-parameters/index.md | 23 +++++----- pkg/storage/ingest/config.go | 19 +++++---- pkg/storage/ingest/pusher.go | 8 ++-- pkg/storage/ingest/reader.go | 4 +- pkg/storage/ingest/writer_test.go | 2 +- 9 files changed, 64 insertions(+), 62 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index d03d56b6c3..1878fef324 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6650,53 +6650,53 @@ }, { "kind": "field", - "name": "replay_concurrency", + "name": "fetch_concurrency", "required": false, - "desc": "The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup.", + "desc": "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.", "fieldValue": null, "fieldDefaultValue": 1, - "fieldFlag": "ingest-storage.kafka.replay-concurrency", + "fieldFlag": "ingest-storage.kafka.fetch-concurrency", "fieldType": "int" }, { "kind": "field", - "name": "replay_shards", + "name": "records_per_fetch", "required": false, - "desc": "The number of concurrent appends to the TSDB head. 0 to disable.", + "desc": "The number of records to fetch from Kafka in a single request.", "fieldValue": null, - "fieldDefaultValue": 0, - "fieldFlag": "ingest-storage.kafka.replay-shards", + "fieldDefaultValue": 128, + "fieldFlag": "ingest-storage.kafka.records-per-fetch", "fieldType": "int" }, { "kind": "field", - "name": "batch_size", + "name": "use_compressed_bytes_as_fetch_max_bytes", "required": false, - "desc": "The number of timeseries to batch together before ingesting into TSDB.", + "desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.", "fieldValue": null, - "fieldDefaultValue": 128, - "fieldFlag": "ingest-storage.kafka.batch-size", - "fieldType": "int" + "fieldDefaultValue": true, + "fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes", + "fieldType": "boolean" }, { "kind": "field", - "name": "records_per_fetch", + "name": "ingestion_concurrency", "required": false, - "desc": "The number of records to fetch from Kafka in a single request.", + "desc": "The number of concurrent ingestion streams to the TSDB head. 0 to disable.", "fieldValue": null, - "fieldDefaultValue": 128, - "fieldFlag": "ingest-storage.kafka.records-per-fetch", + "fieldDefaultValue": 0, + "fieldFlag": "ingest-storage.kafka.ingestion-concurrency", "fieldType": "int" }, { "kind": "field", - "name": "use_compressed_bytes_as_fetch_max_bytes", + "name": "ingestion_concurrency_batch_size", "required": false, - "desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.", + "desc": "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.", "fieldValue": null, - "fieldDefaultValue": true, - "fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes", - "fieldType": "boolean" + "fieldDefaultValue": 128, + "fieldFlag": "ingest-storage.kafka.ingestion-concurrency-batch-size", + "fieldType": "int" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9ea0f0e006..7063199555 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1351,8 +1351,6 @@ Usage of ./cmd/mimir/mimir: When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions. -ingest-storage.kafka.auto-create-topic-enabled Enable auto-creation of Kafka topic if it doesn't exist. (default true) - -ingest-storage.kafka.batch-size int - The number of timeseries to batch together before ingesting into TSDB. (default 128) -ingest-storage.kafka.client-id string The Kafka client ID. -ingest-storage.kafka.consume-from-position-at-startup string @@ -1365,6 +1363,12 @@ Usage of ./cmd/mimir/mimir: How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s) -ingest-storage.kafka.dial-timeout duration The maximum time allowed to open a connection to a Kafka broker. (default 2s) + -ingest-storage.kafka.fetch-concurrency int + The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1) + -ingest-storage.kafka.ingestion-concurrency int + The number of concurrent ingestion streams to the TSDB head. 0 to disable. + -ingest-storage.kafka.ingestion-concurrency-batch-size int + The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0. (default 128) -ingest-storage.kafka.last-produced-offset-poll-interval duration How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s) -ingest-storage.kafka.last-produced-offset-retry-timeout duration @@ -1377,10 +1381,6 @@ Usage of ./cmd/mimir/mimir: The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) -ingest-storage.kafka.records-per-fetch int The number of records to fetch from Kafka in a single request. (default 128) - -ingest-storage.kafka.replay-concurrency int - The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup. (default 1) - -ingest-storage.kafka.replay-shards int - The number of concurrent appends to the TSDB head. 0 to disable. -ingest-storage.kafka.target-consumer-lag-at-startup duration The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index b5de7dd4a4..ece584254d 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -421,8 +421,6 @@ Usage of ./cmd/mimir/mimir: When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions. -ingest-storage.kafka.auto-create-topic-enabled Enable auto-creation of Kafka topic if it doesn't exist. (default true) - -ingest-storage.kafka.batch-size int - The number of timeseries to batch together before ingesting into TSDB. (default 128) -ingest-storage.kafka.client-id string The Kafka client ID. -ingest-storage.kafka.consume-from-position-at-startup string @@ -435,6 +433,12 @@ Usage of ./cmd/mimir/mimir: How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s) -ingest-storage.kafka.dial-timeout duration The maximum time allowed to open a connection to a Kafka broker. (default 2s) + -ingest-storage.kafka.fetch-concurrency int + The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1) + -ingest-storage.kafka.ingestion-concurrency int + The number of concurrent ingestion streams to the TSDB head. 0 to disable. + -ingest-storage.kafka.ingestion-concurrency-batch-size int + The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0. (default 128) -ingest-storage.kafka.last-produced-offset-poll-interval duration How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s) -ingest-storage.kafka.last-produced-offset-retry-timeout duration @@ -447,10 +451,6 @@ Usage of ./cmd/mimir/mimir: The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) -ingest-storage.kafka.records-per-fetch int The number of records to fetch from Kafka in a single request. (default 128) - -ingest-storage.kafka.replay-concurrency int - The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup. (default 1) - -ingest-storage.kafka.replay-shards int - The number of concurrent appends to the TSDB head. 0 to disable. -ingest-storage.kafka.target-consumer-lag-at-startup duration The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index a8806dc382..e17247366d 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -15,8 +15,8 @@ ingest_storage: address: kafka_1:9092 topic: mimir-ingest last_produced_offset_poll_interval: 500ms - replay_concurrency: 3 - replay_shards: 8 + fetch_concurrency: 3 + ingestion_concurrency: 8 ingester: track_ingester_owned_series: false # suppress log messages in c-61 about empty ring; doesn't affect testing diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index e84ac8ce20..40b122b6c9 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3851,18 +3851,10 @@ kafka: # CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout [wait_strong_read_consistency_timeout: | default = 20s] - # The number of concurrent fetch requests that the ingester sends to kafka + # The number of concurrent fetch requests that the ingester sends to Kafka # when catching up during startup. - # CLI flag: -ingest-storage.kafka.replay-concurrency - [replay_concurrency: | default = 1] - - # The number of concurrent appends to the TSDB head. 0 to disable. - # CLI flag: -ingest-storage.kafka.replay-shards - [replay_shards: | default = 0] - - # The number of timeseries to batch together before ingesting into TSDB. - # CLI flag: -ingest-storage.kafka.batch-size - [batch_size: | default = 128] + # CLI flag: -ingest-storage.kafka.fetch-concurrency + [fetch_concurrency: | default = 1] # The number of records to fetch from Kafka in a single request. # CLI flag: -ingest-storage.kafka.records-per-fetch @@ -3875,6 +3867,15 @@ kafka: # CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes [use_compressed_bytes_as_fetch_max_bytes: | default = true] + # The number of concurrent ingestion streams to the TSDB head. 0 to disable. + # CLI flag: -ingest-storage.kafka.ingestion-concurrency + [ingestion_concurrency: | default = 0] + + # The number of timeseries to batch together before ingesting into TSDB. This + # is only used when ingestion-concurrency is greater than 0. + # CLI flag: -ingest-storage.kafka.ingestion-concurrency-batch-size + [ingestion_concurrency_batch_size: | default = 128] + migration: # When both this option and ingest storage are enabled, distributors write to # both Kafka and ingesters. A write request is considered successful only when diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index 38f81eb6e6..4fe14e8031 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -91,12 +91,13 @@ type KafkaConfig struct { WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"` // Used when logging unsampled client errors. Set from ingester's ErrorSampleRate. - FallbackClientErrorSampleRate int64 `yaml:"-"` - ReplayConcurrency int `yaml:"replay_concurrency"` - ReplayShards int `yaml:"replay_shards"` - BatchSize int `yaml:"batch_size"` - RecordsPerFetch int `yaml:"records_per_fetch"` - UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"` + FallbackClientErrorSampleRate int64 `yaml:"-"` + + FetchConcurrency int `yaml:"fetch_concurrency"` + RecordsPerFetch int `yaml:"records_per_fetch"` + UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"` + IngestionConcurrency int `yaml:"ingestion_concurrency"` + IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"` } func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) { @@ -131,11 +132,11 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.") - f.IntVar(&cfg.ReplayConcurrency, prefix+".replay-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to kafka when catching up during startup.") - f.IntVar(&cfg.ReplayShards, prefix+".replay-shards", 0, "The number of concurrent appends to the TSDB head. 0 to disable.") - f.IntVar(&cfg.BatchSize, prefix+".batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB.") + f.IntVar(&cfg.FetchConcurrency, prefix+".fetch-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.") f.IntVar(&cfg.RecordsPerFetch, prefix+".records-per-fetch", 128, "The number of records to fetch from Kafka in a single request.") f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.") + f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. 0 to disable.") + f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.") } func (cfg *KafkaConfig) Validate() error { diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 1a78ab2544..ec03778712 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -70,8 +70,8 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics return &pusherConsumerMetrics{ numTimeSeriesPerFlush: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_ingester_pusher_num_timeseries_per_flush", - Help: "Number of time series per flush", + Name: "cortex_ingester_pusher_num_timeseries_per_shard_flush", + Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than ingestion-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.", NativeHistogramBucketFactor: 1.1, }), processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ @@ -189,11 +189,11 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { } func (c pusherConsumer) newStorageWriter() PusherCloser { - if c.kafkaConfig.ReplayShards == 0 { + if c.kafkaConfig.IngestionConcurrency == 0 { return newSequentialStoragePusher(c.metrics, c.pusher) } - return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.ReplayShards, c.kafkaConfig.BatchSize, c.logger) + return newParallelStoragePusher(c.metrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger) } func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error { diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 329022785a..8d3cdca3fa 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -179,8 +179,8 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { return errors.Wrap(err, "starting service manager") } - if r.kafkaCfg.ReplayConcurrency > 1 { - r.fetcher, err = newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.ReplayConcurrency, r.kafkaCfg.RecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + if r.kafkaCfg.FetchConcurrency > 1 { + r.fetcher, err = newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.FetchConcurrency, r.kafkaCfg.RecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) if err != nil { return errors.Wrap(err, "creating concurrent fetchers") } diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index f36dec7814..4472af76e2 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -1078,7 +1078,7 @@ func createTestKafkaConfig(clusterAddr, topicName string) KafkaConfig { cfg.Address = clusterAddr cfg.Topic = topicName cfg.WriteTimeout = 2 * time.Second - cfg.ReplayConcurrency = 2 + cfg.FetchConcurrency = 2 cfg.RecordsPerFetch = 2 return cfg