Skip to content

Commit

Permalink
Implement backfill mode
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jul 28, 2022
1 parent ed67ab2 commit a2e5a2f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"params": {
"topic": "cloudera-cluster-logs",
"client_params": {
"bootstrap.servers": "host:9092"
"bootstrap.servers": "localhost:9092"
}
}
}
146 changes: 108 additions & 38 deletions quickwit-config/src/source_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ pub struct KafkaSourceParams {
#[serde(default = "serde_json::Value::default")]
#[serde(skip_serializing_if = "serde_json::Value::is_null")]
pub client_params: serde_json::Value,
/// When backfill mode is enabled, the source exits after reaching the end of the topic.
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub enable_backfill_mode: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -229,9 +233,9 @@ pub struct KinesisSourceParams {
pub stream_name: String,
#[serde(flatten)]
pub region_or_endpoint: Option<RegionOrEndpoint>,
#[doc(hidden)]
/// When backfill mode is enabled, the source exits after reaching the end of the stream.
#[serde(skip_serializing_if = "is_false")]
pub shutdown_at_stream_eof: bool,
pub enable_backfill_mode: bool,
}

#[derive(Clone, Debug, PartialEq, Deserialize)]
Expand All @@ -240,9 +244,8 @@ struct KinesisSourceParamsInner {
pub stream_name: String,
pub region: Option<String>,
pub endpoint: Option<String>,
#[doc(hidden)]
#[serde(default)]
pub shutdown_at_stream_eof: bool,
pub enable_backfill_mode: bool,
}

impl TryFrom<KinesisSourceParamsInner> for KinesisSourceParams {
Expand All @@ -261,7 +264,7 @@ impl TryFrom<KinesisSourceParamsInner> for KinesisSourceParams {
Ok(KinesisSourceParams {
stream_name: value.stream_name,
region_or_endpoint,
shutdown_at_stream_eof: value.shutdown_at_stream_eof,
enable_backfill_mode: value.enable_backfill_mode,
})
}
}
Expand Down Expand Up @@ -317,12 +320,81 @@ mod tests {
source_params: SourceParams::Kafka(KafkaSourceParams {
topic: "cloudera-cluster-logs".to_string(),
client_log_level: None,
client_params: json! {{"bootstrap.servers": "host:9092"}},
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: false,
}),
};
assert_eq!(source_config, expected_source_config);
}

#[test]
fn test_kafka_source_params_serialization() {
{
let params = KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: None,
client_params: json!(null),
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(&params_yaml).unwrap(),
params,
)
}
{
let params = KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: Some("info".to_string()),
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(&params_yaml).unwrap(),
params,
)
}
}

#[test]
fn test_kafka_source_params_deserialization() {
{
let yaml = r#"
topic: my-topic
"#;
assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(yaml).unwrap(),
KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: None,
client_params: json!(null),
enable_backfill_mode: false,
}
);
}
{
let yaml = r#"
topic: my-topic
client_log_level: info
client_params:
bootstrap.servers: localhost:9092
enable_backfill_mode: true
"#;
assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(yaml).unwrap(),
KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: Some("info".to_string()),
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: true,
}
);
}
}

#[tokio::test]
async fn test_load_kinesis_source_config() {
let source_config_filepath = get_source_config_filepath("kinesis-source.yaml");
Expand All @@ -336,7 +408,7 @@ mod tests {
source_params: SourceParams::Kinesis(KinesisSourceParams {
stream_name: "emr-cluster-logs".to_string(),
region_or_endpoint: None,
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
}),
};
assert_eq!(source_config, expected_source_config);
Expand All @@ -363,7 +435,7 @@ mod tests {
let params = KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

Expand All @@ -376,7 +448,7 @@ mod tests {
let params = KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())),
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

Expand All @@ -391,7 +463,7 @@ mod tests {
region_or_endpoint: Some(RegionOrEndpoint::Endpoint(
"https://localhost:4566".to_string(),
)),
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

Expand All @@ -405,43 +477,41 @@ mod tests {
#[test]
fn test_kinesis_source_params_deserialization() {
{
{
let yaml = r#"
let yaml = r#"
stream_name: my-stream
"#;
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
shutdown_at_stream_eof: false,
}
);
}
{
let yaml = r#"
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
enable_backfill_mode: false,
}
);
}
{
let yaml = r#"
stream_name: my-stream
region: us-west-1
shutdown_at_stream_eof: true
enable_backfill_mode: true
"#;
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())),
shutdown_at_stream_eof: true,
}
);
}
{
let yaml = r#"
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())),
enable_backfill_mode: true,
}
);
}
{
let yaml = r#"
stream_name: my-stream
region: us-west-1
endpoint: https://localhost:4566
"#;
let error = serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap_err();
assert!(error.to_string().starts_with("Kinesis source parameters "));
}
let error = serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap_err();
assert!(error.to_string().starts_with("Kinesis source parameters "));
}
}

Expand Down
9 changes: 8 additions & 1 deletion quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub struct KafkaSource {
consumer: Arc<RdKafkaConsumer>,
state: KafkaSourceState,
rebalance_events: mpsc::Receiver<RebalanceEvent>,
backfill_mode_enabled: bool,
}

impl fmt::Debug for KafkaSource {
Expand All @@ -274,6 +275,7 @@ impl KafkaSource {
_checkpoint: SourceCheckpoint,
) -> anyhow::Result<Self> {
let topic = params.topic.clone();
let backfill_mode_enabled = params.enable_backfill_mode;

let (rebalance_sender, rebalance_receiver) = mpsc::channel(32);

Expand All @@ -296,6 +298,7 @@ impl KafkaSource {
consumer,
state,
rebalance_events: rebalance_receiver,
backfill_mode_enabled,
})
}
}
Expand Down Expand Up @@ -400,7 +403,7 @@ impl Source for KafkaSource {
};
ctx.send_message(batch_sink, batch).await?;
}
if self.state.num_active_partitions.is_zero() {
if self.backfill_mode_enabled && self.state.num_active_partitions.is_zero() {
info!(topic = %self.topic, "Reached end of topic.");
ctx.send_exit_with_success(batch_sink).await?;
return Err(ActorExitStatus::Success);
Expand Down Expand Up @@ -753,6 +756,7 @@ mod kafka_broker_tests {
"bootstrap.servers": bootstrap_servers,
"enable.partition.eof": true,
}),
enable_backfill_mode: true,
}),
};

Expand Down Expand Up @@ -981,6 +985,7 @@ mod kafka_broker_tests {
topic: topic.clone(),
client_log_level: None,
client_params: json!({ "bootstrap.servers": bootstrap_servers }),
enable_backfill_mode: true,
})
.await?;

Expand All @@ -992,6 +997,7 @@ mod kafka_broker_tests {
topic: "non-existent-topic".to_string(),
client_log_level: None,
client_params: json!({ "bootstrap.servers": bootstrap_servers }),
enable_backfill_mode: true,
})
.await;

Expand All @@ -1004,6 +1010,7 @@ mod kafka_broker_tests {
client_params: json!({
"bootstrap.servers": "192.0.2.10:9092"
}),
enable_backfill_mode: true,
})
.await;

Expand Down
10 changes: 5 additions & 5 deletions quickwit-indexing/src/source/kinesis/kinesis_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct KinesisSource {
// Receiver for the communication channel between the source and the shard consumers.
shard_consumers_rx: mpsc::Receiver<ShardConsumerMessage>,
state: KinesisSourceState,
shutdown_at_stream_eof: bool,
backfill_mode_enabled: bool,
}

impl fmt::Debug for KinesisSource {
Expand All @@ -118,7 +118,7 @@ impl KinesisSource {
checkpoint: SourceCheckpoint,
) -> anyhow::Result<Self> {
let stream_name = params.stream_name;
let shutdown_at_stream_eof = params.shutdown_at_stream_eof;
let backfill_mode_enabled = params.enable_backfill_mode;
let region = get_region(params.region_or_endpoint)?;
let kinesis_client = get_kinesis_client(region)?;
let (shard_consumers_tx, shard_consumers_rx) = mpsc::channel(1_000);
Expand All @@ -132,7 +132,7 @@ impl KinesisSource {
shard_consumers_tx,
shard_consumers_rx,
state,
shutdown_at_stream_eof,
backfill_mode_enabled,
retry_params,
})
}
Expand All @@ -154,7 +154,7 @@ impl KinesisSource {
self.stream_name.clone(),
shard_id.clone(),
from_sequence_number_exclusive,
self.shutdown_at_stream_eof,
self.backfill_mode_enabled,
self.kinesis_client.clone(),
self.shard_consumers_tx.clone(),
self.retry_params.clone(),
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
region_or_endpoint: Some(RegionOrEndpoint::Endpoint(
"http://localhost:4566".to_string(),
)),
shutdown_at_stream_eof: true,
enable_backfill_mode: true,
};
{
let checkpoint = SourceCheckpoint::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub(crate) fn sample_index_metadata_for_regression() -> IndexMetadata {
topic: "kafka-topic".to_string(),
client_log_level: None,
client_params: serde_json::json!({}),
enable_backfill_mode: false,
}),
};
let mut sources = HashMap::default();
Expand Down

0 comments on commit a2e5a2f

Please sign in to comment.