diff --git a/quickwit-config/resources/tests/source_config/kafka-source.json b/quickwit-config/resources/tests/source_config/kafka-source.json index 4a0db5a22e4..19b3c9b486b 100644 --- a/quickwit-config/resources/tests/source_config/kafka-source.json +++ b/quickwit-config/resources/tests/source_config/kafka-source.json @@ -4,7 +4,7 @@ "params": { "topic": "cloudera-cluster-logs", "client_params": { - "bootstrap.servers": "host:9092" + "bootstrap.servers": "localhost:9092" } } } diff --git a/quickwit-config/src/source_config.rs b/quickwit-config/src/source_config.rs index aef134eb713..062258e06ce 100644 --- a/quickwit-config/src/source_config.rs +++ b/quickwit-config/src/source_config.rs @@ -214,6 +214,10 @@ pub struct KafkaSourceParams { #[serde(default = "serde_json::Value::default")] #[serde(skip_serializing_if = "serde_json::Value::is_null")] pub client_params: serde_json::Value, + /// When backfill mode is enabled, the source exits after reaching the end of the topic. + #[serde(default)] + #[serde(skip_serializing_if = "is_false")] + pub enable_backfill_mode: bool, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -229,9 +233,9 @@ pub struct KinesisSourceParams { pub stream_name: String, #[serde(flatten)] pub region_or_endpoint: Option, - #[doc(hidden)] + /// When backfill mode is enabled, the source exits after reaching the end of the stream. #[serde(skip_serializing_if = "is_false")] - pub shutdown_at_stream_eof: bool, + pub enable_backfill_mode: bool, } #[derive(Clone, Debug, PartialEq, Deserialize)] @@ -240,9 +244,8 @@ struct KinesisSourceParamsInner { pub stream_name: String, pub region: Option, pub endpoint: Option, - #[doc(hidden)] #[serde(default)] - pub shutdown_at_stream_eof: bool, + pub enable_backfill_mode: bool, } impl TryFrom for KinesisSourceParams { @@ -261,7 +264,7 @@ impl TryFrom for KinesisSourceParams { Ok(KinesisSourceParams { stream_name: value.stream_name, region_or_endpoint, - shutdown_at_stream_eof: value.shutdown_at_stream_eof, + enable_backfill_mode: value.enable_backfill_mode, }) } } @@ -317,12 +320,81 @@ mod tests { source_params: SourceParams::Kafka(KafkaSourceParams { topic: "cloudera-cluster-logs".to_string(), client_log_level: None, - client_params: json! {{"bootstrap.servers": "host:9092"}}, + client_params: json! {{"bootstrap.servers": "localhost:9092"}}, + enable_backfill_mode: false, }), }; assert_eq!(source_config, expected_source_config); } + #[test] + fn test_kafka_source_params_serialization() { + { + let params = KafkaSourceParams { + topic: "my-topic".to_string(), + client_log_level: None, + client_params: json!(null), + enable_backfill_mode: false, + }; + let params_yaml = serde_yaml::to_string(¶ms).unwrap(); + + assert_eq!( + serde_yaml::from_str::(¶ms_yaml).unwrap(), + params, + ) + } + { + let params = KafkaSourceParams { + topic: "my-topic".to_string(), + client_log_level: Some("info".to_string()), + client_params: json! {{"bootstrap.servers": "localhost:9092"}}, + enable_backfill_mode: false, + }; + let params_yaml = serde_yaml::to_string(¶ms).unwrap(); + + assert_eq!( + serde_yaml::from_str::(¶ms_yaml).unwrap(), + params, + ) + } + } + + #[test] + fn test_kafka_source_params_deserialization() { + { + let yaml = r#" + topic: my-topic + "#; + assert_eq!( + serde_yaml::from_str::(yaml).unwrap(), + KafkaSourceParams { + topic: "my-topic".to_string(), + client_log_level: None, + client_params: json!(null), + enable_backfill_mode: false, + } + ); + } + { + let yaml = r#" + topic: my-topic + client_log_level: info + client_params: + bootstrap.servers: localhost:9092 + enable_backfill_mode: true + "#; + assert_eq!( + serde_yaml::from_str::(yaml).unwrap(), + KafkaSourceParams { + topic: "my-topic".to_string(), + client_log_level: Some("info".to_string()), + client_params: json! {{"bootstrap.servers": "localhost:9092"}}, + enable_backfill_mode: true, + } + ); + } + } + #[tokio::test] async fn test_load_kinesis_source_config() { let source_config_filepath = get_source_config_filepath("kinesis-source.yaml"); @@ -336,7 +408,7 @@ mod tests { source_params: SourceParams::Kinesis(KinesisSourceParams { stream_name: "emr-cluster-logs".to_string(), region_or_endpoint: None, - shutdown_at_stream_eof: false, + enable_backfill_mode: false, }), }; assert_eq!(source_config, expected_source_config); @@ -363,7 +435,7 @@ mod tests { let params = KinesisSourceParams { stream_name: "my-stream".to_string(), region_or_endpoint: None, - shutdown_at_stream_eof: false, + enable_backfill_mode: false, }; let params_yaml = serde_yaml::to_string(¶ms).unwrap(); @@ -376,7 +448,7 @@ mod tests { let params = KinesisSourceParams { stream_name: "my-stream".to_string(), region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())), - shutdown_at_stream_eof: false, + enable_backfill_mode: false, }; let params_yaml = serde_yaml::to_string(¶ms).unwrap(); @@ -391,7 +463,7 @@ mod tests { region_or_endpoint: Some(RegionOrEndpoint::Endpoint( "https://localhost:4566".to_string(), )), - shutdown_at_stream_eof: false, + enable_backfill_mode: false, }; let params_yaml = serde_yaml::to_string(¶ms).unwrap(); @@ -405,43 +477,41 @@ mod tests { #[test] fn test_kinesis_source_params_deserialization() { { - { - let yaml = r#" + let yaml = r#" stream_name: my-stream "#; - assert_eq!( - serde_yaml::from_str::(yaml).unwrap(), - KinesisSourceParams { - stream_name: "my-stream".to_string(), - region_or_endpoint: None, - shutdown_at_stream_eof: false, - } - ); - } - { - let yaml = r#" + assert_eq!( + serde_yaml::from_str::(yaml).unwrap(), + KinesisSourceParams { + stream_name: "my-stream".to_string(), + region_or_endpoint: None, + enable_backfill_mode: false, + } + ); + } + { + let yaml = r#" stream_name: my-stream region: us-west-1 - shutdown_at_stream_eof: true + enable_backfill_mode: true "#; - assert_eq!( - serde_yaml::from_str::(yaml).unwrap(), - KinesisSourceParams { - stream_name: "my-stream".to_string(), - region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())), - shutdown_at_stream_eof: true, - } - ); - } - { - let yaml = r#" + assert_eq!( + serde_yaml::from_str::(yaml).unwrap(), + KinesisSourceParams { + stream_name: "my-stream".to_string(), + region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())), + enable_backfill_mode: true, + } + ); + } + { + let yaml = r#" stream_name: my-stream region: us-west-1 endpoint: https://localhost:4566 "#; - let error = serde_yaml::from_str::(yaml).unwrap_err(); - assert!(error.to_string().starts_with("Kinesis source parameters ")); - } + let error = serde_yaml::from_str::(yaml).unwrap_err(); + assert!(error.to_string().starts_with("Kinesis source parameters ")); } } diff --git a/quickwit-indexing/src/source/kafka_source.rs b/quickwit-indexing/src/source/kafka_source.rs index 26a4cfc19ac..08276faaf01 100644 --- a/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit-indexing/src/source/kafka_source.rs @@ -24,7 +24,6 @@ use std::time::Duration; use anyhow::{bail, Context}; use async_trait::async_trait; -use futures::StreamExt; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::KafkaSourceParams; @@ -34,14 +33,15 @@ use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::{ BaseConsumer, Consumer, ConsumerContext, DefaultConsumerContext, Rebalance, }; -use rdkafka::error::KafkaError; +use rdkafka::error::{KafkaError, KafkaResult}; use rdkafka::message::BorrowedMessage; use rdkafka::util::Timeout; -use rdkafka::{ClientContext, Message, Offset}; +use rdkafka::{ClientContext, Message, Offset, TopicPartitionList}; use serde_json::json; use tokio::runtime::Handle; use tokio::sync::mpsc; use tokio::task::spawn_blocking; +use tokio::time; use tracing::{debug, info, warn}; use crate::actors::Indexer; @@ -89,6 +89,7 @@ impl ClientContext for RdKafkaContext {} impl ConsumerContext for RdKafkaContext { fn pre_rebalance(&self, rebalance: &Rebalance) { + info!("Pre rebalance {:?}", rebalance); let source_id = self.ctx.config.source_id.clone(); let mut assignments = HashMap::new(); @@ -186,6 +187,46 @@ impl ConsumerContext for RdKafkaContext { ); // TODO: handle the sending error. } + + fn post_rebalance(&self, rebalance: &Rebalance) { + info!("Post rebalance {:?}", rebalance); + } + + fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { + info!("Committing offsets: {:?}", result); + } +} + +#[derive(Debug)] +pub enum NumActivePartitions { + Initializing, + Some(usize), +} + +impl NumActivePartitions { + fn dec(&mut self) { + match self { + Self::Initializing => panic!("We should not decrement the number of active partitions while initializing the source. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."), + Self::Some(ref mut counter) => *counter -= 1, + } + } + + fn is_zero(&self) -> bool { + matches!(self, Self::Some(0)) + } + + fn json(&self) -> serde_json::Value { + match self { + Self::Initializing => json!(null), + Self::Some(ref counter) => json!(counter), + } + } +} + +impl Default for NumActivePartitions { + fn default() -> Self { + Self::Initializing + } } type RdKafkaConsumer = StreamConsumer; @@ -197,7 +238,7 @@ pub struct KafkaSourceState { /// Offset for each partition of the last message received. pub current_positions: HashMap, /// Number of active partitions, i.e., that have not reached EOF. - pub num_active_partitions: usize, + pub num_active_partitions: NumActivePartitions, /// Number of bytes processed by the source. pub num_bytes_processed: u64, /// Number of messages processed by the source (including invalid messages). @@ -213,6 +254,7 @@ pub struct KafkaSource { consumer: Arc, state: KafkaSourceState, rebalance_events: mpsc::Receiver, + backfill_mode_enabled: bool, } impl fmt::Debug for KafkaSource { @@ -233,6 +275,7 @@ impl KafkaSource { _checkpoint: SourceCheckpoint, ) -> anyhow::Result { let topic = params.topic.clone(); + let backfill_mode_enabled = params.enable_backfill_mode; let (rebalance_sender, rebalance_receiver) = mpsc::channel(32); @@ -247,8 +290,6 @@ impl KafkaSource { .context("Failed to resume from checkpoint.")?; let state = KafkaSourceState { - assigned_partition_ids: HashMap::new(), - num_active_partitions: 0, ..Default::default() }; Ok(KafkaSource { @@ -257,6 +298,7 @@ impl KafkaSource { consumer, state, rebalance_events: rebalance_receiver, + backfill_mode_enabled, }) } } @@ -268,13 +310,12 @@ impl Source for KafkaSource { batch_sink: &Mailbox, ctx: &SourceContext, ) -> Result { + let mut batch_num_bytes = 0; let mut docs = Vec::new(); let mut checkpoint_delta = CheckpointDelta::default(); - let deadline = tokio::time::sleep(quickwit_actors::HEARTBEAT / 2); - let mut message_stream = Box::pin(self.consumer.stream().take_until(deadline)); - - let mut batch_num_bytes = 0; + let deadline = time::sleep(quickwit_actors::HEARTBEAT / 2); + tokio::pin!(deadline); loop { tokio::select! { @@ -288,7 +329,7 @@ impl Source for KafkaSource { }, RebalanceEvent::Assignments(assignments) => { debug!("Received partition assignments {:?}", assignments); - self.state.num_active_partitions = assignments.len(); + self.state.num_active_partitions = NumActivePartitions::Some(assignments.len()); self.state.assigned_partition_ids = assignments; } } @@ -296,71 +337,65 @@ impl Source for KafkaSource { None => {} } }, - kafka_res = message_stream.next() => { - if let Some(message_res) = kafka_res { - let message = match message_res { - Ok(message) => message, - Err(KafkaError::PartitionEOF(partition_id)) => { - self.state.num_active_partitions -= 1; - info!( - topic = %self.topic, - partition_id = ?partition_id, - num_active_partitions = ?self.state.num_active_partitions, - "Reached end of partition." - ); - continue; - } - // FIXME: This is assuming that Kafka errors are not recoverable, it may not be the - // case. - Err(err) => return Err(ActorExitStatus::from(anyhow::anyhow!(err))), - }; - if let Some(doc) = parse_message_payload(&message) { - docs.push(doc); - } else { - self.state.num_invalid_messages += 1; - } - batch_num_bytes += message.payload_len() as u64; - self.state.num_bytes_processed += message.payload_len() as u64; - self.state.num_messages_processed += 1; - - let partition_id = self - .state - .assigned_partition_ids - .get(&message.partition()) - .ok_or_else(|| { - anyhow::anyhow!( - "Received message from unassigned partition `{}`. Assigned partitions: \ - `{{{}}}`.", - message.partition(), - self.state.assigned_partition_ids.keys().join(", "), - ) - })? - .clone(); - let current_position = Position::from(message.offset()); - let previous_position = self - .state - .current_positions - .insert(message.partition(), current_position.clone()) - .unwrap_or_else(|| previous_position_for_offset(message.offset())); - checkpoint_delta - .record_partition_delta(partition_id, previous_position, current_position) - .context("Failed to record partition delta.")?; - - if batch_num_bytes >= TARGET_BATCH_NUM_BYTES { - break; + message_res = self.consumer.recv() => { + let message = match message_res { + Ok(message) => message, + Err(KafkaError::PartitionEOF(partition_id)) => { + self.state.num_active_partitions.dec(); + info!( + topic = %self.topic, + partition_id = ?partition_id, + num_active_partitions = ?self.state.num_active_partitions, + "Reached end of partition." + ); + continue; } - ctx.record_progress(); + // FIXME: This is assuming that Kafka errors are not recoverable, it may not be the + // case. + Err(err) => return Err(ActorExitStatus::from(anyhow::anyhow!(err))), + }; + if let Some(doc) = parse_message_payload(&message) { + docs.push(doc); } else { - // TODO: this is probably not what should happen here. - // In this case there is no current data in the topic but on a live stream - // more data may come. This should ideally wait for a period of time for new - // data to arrive before returning. + self.state.num_invalid_messages += 1; + } + batch_num_bytes += message.payload_len() as u64; + self.state.num_bytes_processed += message.payload_len() as u64; + self.state.num_messages_processed += 1; + + let partition_id = self + .state + .assigned_partition_ids + .get(&message.partition()) + .ok_or_else(|| { + anyhow::anyhow!( + "Received message from unassigned partition `{}`. Assigned partitions: \ + `{{{}}}`.", + message.partition(), + self.state.assigned_partition_ids.keys().join(", "), + ) + })? + .clone(); + let current_position = Position::from(message.offset()); + let previous_position = self + .state + .current_positions + .insert(message.partition(), current_position.clone()) + .unwrap_or_else(|| previous_position_for_offset(message.offset())); + checkpoint_delta + .record_partition_delta(partition_id, previous_position, current_position) + .context("Failed to record partition delta.")?; + + if batch_num_bytes >= TARGET_BATCH_NUM_BYTES { break; } + ctx.record_progress(); + } + _ = &mut deadline => { + break; } } } - if !checkpoint_delta.is_empty() { let batch = RawDocBatch { docs, @@ -368,14 +403,11 @@ impl Source for KafkaSource { }; ctx.send_message(batch_sink, batch).await?; } - - // TODO: not entirely sure why this is here. Seems like this would prevent stream processing - if self.state.num_active_partitions == 0 { + if self.backfill_mode_enabled && self.state.num_active_partitions.is_zero() { info!(topic = %self.topic, "Reached end of topic."); ctx.send_exit_with_success(batch_sink).await?; return Err(ActorExitStatus::Success); } - debug!("batch complete, waiting for next iteration"); Ok(Duration::default()) } @@ -404,7 +436,7 @@ impl Source for KafkaSource { "topic": self.topic, "assigned_partition_ids": assigned_partition_ids, "current_positions": current_positions, - "num_active_partitions": self.state.num_active_partitions, + "num_active_partitions": self.state.num_active_partitions.json(), "num_bytes_processed": self.state.num_bytes_processed, "num_messages_processed": self.state.num_messages_processed, "num_invalid_messages": self.state.num_invalid_messages, @@ -724,6 +756,7 @@ mod kafka_broker_tests { "bootstrap.servers": bootstrap_servers, "enable.partition.eof": true, }), + enable_backfill_mode: true, }), }; @@ -952,6 +985,7 @@ mod kafka_broker_tests { topic: topic.clone(), client_log_level: None, client_params: json!({ "bootstrap.servers": bootstrap_servers }), + enable_backfill_mode: true, }) .await?; @@ -963,6 +997,7 @@ mod kafka_broker_tests { topic: "non-existent-topic".to_string(), client_log_level: None, client_params: json!({ "bootstrap.servers": bootstrap_servers }), + enable_backfill_mode: true, }) .await; @@ -975,6 +1010,7 @@ mod kafka_broker_tests { client_params: json!({ "bootstrap.servers": "192.0.2.10:9092" }), + enable_backfill_mode: true, }) .await; diff --git a/quickwit-indexing/src/source/kinesis/kinesis_source.rs b/quickwit-indexing/src/source/kinesis/kinesis_source.rs index 264bb323b87..d11eb5aa14b 100644 --- a/quickwit-indexing/src/source/kinesis/kinesis_source.rs +++ b/quickwit-indexing/src/source/kinesis/kinesis_source.rs @@ -97,7 +97,7 @@ pub struct KinesisSource { // Receiver for the communication channel between the source and the shard consumers. shard_consumers_rx: mpsc::Receiver, state: KinesisSourceState, - shutdown_at_stream_eof: bool, + backfill_mode_enabled: bool, } impl fmt::Debug for KinesisSource { @@ -118,7 +118,7 @@ impl KinesisSource { checkpoint: SourceCheckpoint, ) -> anyhow::Result { let stream_name = params.stream_name; - let shutdown_at_stream_eof = params.shutdown_at_stream_eof; + let backfill_mode_enabled = params.enable_backfill_mode; let region = get_region(params.region_or_endpoint)?; let kinesis_client = get_kinesis_client(region)?; let (shard_consumers_tx, shard_consumers_rx) = mpsc::channel(1_000); @@ -132,7 +132,7 @@ impl KinesisSource { shard_consumers_tx, shard_consumers_rx, state, - shutdown_at_stream_eof, + backfill_mode_enabled, retry_params, }) } @@ -154,7 +154,7 @@ impl KinesisSource { self.stream_name.clone(), shard_id.clone(), from_sequence_number_exclusive, - self.shutdown_at_stream_eof, + self.backfill_mode_enabled, self.kinesis_client.clone(), self.shard_consumers_tx.clone(), self.retry_params.clone(), @@ -386,7 +386,7 @@ mod tests { region_or_endpoint: Some(RegionOrEndpoint::Endpoint( "http://localhost:4566".to_string(), )), - shutdown_at_stream_eof: true, + enable_backfill_mode: true, }; { let checkpoint = SourceCheckpoint::default(); diff --git a/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs b/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs index f2a1d5d5d7a..f6aef93d0a1 100644 --- a/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs +++ b/quickwit-metastore/src/backward_compatibility_tests/index_metadata.rs @@ -175,6 +175,7 @@ pub(crate) fn sample_index_metadata_for_regression() -> IndexMetadata { topic: "kafka-topic".to_string(), client_log_level: None, client_params: serde_json::json!({}), + enable_backfill_mode: false, }), }; let mut sources = HashMap::default();