Skip to content

Commit

Permalink
Merge pull request #1807 from quickwit-oss/guilload--scalable-kafka
Browse files Browse the repository at this point in the history
[Kafka source] Use `consumer.recv()` directly rather than `consumer.stream()`
  • Loading branch information
kstaken committed Jul 28, 2022
2 parents edbb96a + a2e5a2f commit 7b36eed
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"params": {
"topic": "cloudera-cluster-logs",
"client_params": {
"bootstrap.servers": "host:9092"
"bootstrap.servers": "localhost:9092"
}
}
}
146 changes: 108 additions & 38 deletions quickwit-config/src/source_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ pub struct KafkaSourceParams {
#[serde(default = "serde_json::Value::default")]
#[serde(skip_serializing_if = "serde_json::Value::is_null")]
pub client_params: serde_json::Value,
/// When backfill mode is enabled, the source exits after reaching the end of the topic.
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub enable_backfill_mode: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -229,9 +233,9 @@ pub struct KinesisSourceParams {
pub stream_name: String,
#[serde(flatten)]
pub region_or_endpoint: Option<RegionOrEndpoint>,
#[doc(hidden)]
/// When backfill mode is enabled, the source exits after reaching the end of the stream.
#[serde(skip_serializing_if = "is_false")]
pub shutdown_at_stream_eof: bool,
pub enable_backfill_mode: bool,
}

#[derive(Clone, Debug, PartialEq, Deserialize)]
Expand All @@ -240,9 +244,8 @@ struct KinesisSourceParamsInner {
pub stream_name: String,
pub region: Option<String>,
pub endpoint: Option<String>,
#[doc(hidden)]
#[serde(default)]
pub shutdown_at_stream_eof: bool,
pub enable_backfill_mode: bool,
}

impl TryFrom<KinesisSourceParamsInner> for KinesisSourceParams {
Expand All @@ -261,7 +264,7 @@ impl TryFrom<KinesisSourceParamsInner> for KinesisSourceParams {
Ok(KinesisSourceParams {
stream_name: value.stream_name,
region_or_endpoint,
shutdown_at_stream_eof: value.shutdown_at_stream_eof,
enable_backfill_mode: value.enable_backfill_mode,
})
}
}
Expand Down Expand Up @@ -317,12 +320,81 @@ mod tests {
source_params: SourceParams::Kafka(KafkaSourceParams {
topic: "cloudera-cluster-logs".to_string(),
client_log_level: None,
client_params: json! {{"bootstrap.servers": "host:9092"}},
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: false,
}),
};
assert_eq!(source_config, expected_source_config);
}

#[test]
fn test_kafka_source_params_serialization() {
{
let params = KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: None,
client_params: json!(null),
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(&params_yaml).unwrap(),
params,
)
}
{
let params = KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: Some("info".to_string()),
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(&params_yaml).unwrap(),
params,
)
}
}

#[test]
fn test_kafka_source_params_deserialization() {
{
let yaml = r#"
topic: my-topic
"#;
assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(yaml).unwrap(),
KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: None,
client_params: json!(null),
enable_backfill_mode: false,
}
);
}
{
let yaml = r#"
topic: my-topic
client_log_level: info
client_params:
bootstrap.servers: localhost:9092
enable_backfill_mode: true
"#;
assert_eq!(
serde_yaml::from_str::<KafkaSourceParams>(yaml).unwrap(),
KafkaSourceParams {
topic: "my-topic".to_string(),
client_log_level: Some("info".to_string()),
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
enable_backfill_mode: true,
}
);
}
}

#[tokio::test]
async fn test_load_kinesis_source_config() {
let source_config_filepath = get_source_config_filepath("kinesis-source.yaml");
Expand All @@ -336,7 +408,7 @@ mod tests {
source_params: SourceParams::Kinesis(KinesisSourceParams {
stream_name: "emr-cluster-logs".to_string(),
region_or_endpoint: None,
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
}),
};
assert_eq!(source_config, expected_source_config);
Expand All @@ -363,7 +435,7 @@ mod tests {
let params = KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

Expand All @@ -376,7 +448,7 @@ mod tests {
let params = KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())),
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

Expand All @@ -391,7 +463,7 @@ mod tests {
region_or_endpoint: Some(RegionOrEndpoint::Endpoint(
"https://localhost:4566".to_string(),
)),
shutdown_at_stream_eof: false,
enable_backfill_mode: false,
};
let params_yaml = serde_yaml::to_string(&params).unwrap();

Expand All @@ -405,43 +477,41 @@ mod tests {
#[test]
fn test_kinesis_source_params_deserialization() {
{
{
let yaml = r#"
let yaml = r#"
stream_name: my-stream
"#;
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
shutdown_at_stream_eof: false,
}
);
}
{
let yaml = r#"
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: None,
enable_backfill_mode: false,
}
);
}
{
let yaml = r#"
stream_name: my-stream
region: us-west-1
shutdown_at_stream_eof: true
enable_backfill_mode: true
"#;
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())),
shutdown_at_stream_eof: true,
}
);
}
{
let yaml = r#"
assert_eq!(
serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap(),
KinesisSourceParams {
stream_name: "my-stream".to_string(),
region_or_endpoint: Some(RegionOrEndpoint::Region("us-west-1".to_string())),
enable_backfill_mode: true,
}
);
}
{
let yaml = r#"
stream_name: my-stream
region: us-west-1
endpoint: https://localhost:4566
"#;
let error = serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap_err();
assert!(error.to_string().starts_with("Kinesis source parameters "));
}
let error = serde_yaml::from_str::<KinesisSourceParams>(yaml).unwrap_err();
assert!(error.to_string().starts_with("Kinesis source parameters "));
}
}

Expand Down
Loading

0 comments on commit 7b36eed

Please sign in to comment.