Skip to content

Commit

Permalink
Fix num active partitions race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jul 28, 2022
1 parent 15a1095 commit ed67ab2
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,38 @@ impl ConsumerContext for RdKafkaContext {
}
}

#[derive(Debug)]
pub enum NumActivePartitions {
Initializing,
Some(usize),
}

impl NumActivePartitions {
fn dec(&mut self) {
match self {
Self::Initializing => panic!("We should not decrement the number of active partitions while initializing the source. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."),
Self::Some(ref mut counter) => *counter -= 1,
}
}

fn is_zero(&self) -> bool {
matches!(self, Self::Some(0))
}

fn json(&self) -> serde_json::Value {
match self {
Self::Initializing => json!(null),
Self::Some(ref counter) => json!(counter),
}
}
}

impl Default for NumActivePartitions {
fn default() -> Self {
Self::Initializing
}
}

type RdKafkaConsumer = StreamConsumer<RdKafkaContext>;

#[derive(Default)]
Expand All @@ -206,7 +238,7 @@ pub struct KafkaSourceState {
/// Offset for each partition of the last message received.
pub current_positions: HashMap<i32, Position>,
/// Number of active partitions, i.e., that have not reached EOF.
pub num_active_partitions: usize,
pub num_active_partitions: NumActivePartitions,
/// Number of bytes processed by the source.
pub num_bytes_processed: u64,
/// Number of messages processed by the source (including invalid messages).
Expand Down Expand Up @@ -256,8 +288,6 @@ impl KafkaSource {
.context("Failed to resume from checkpoint.")?;

let state = KafkaSourceState {
assigned_partition_ids: HashMap::new(),
num_active_partitions: 0,
..Default::default()
};
Ok(KafkaSource {
Expand Down Expand Up @@ -296,7 +326,7 @@ impl Source for KafkaSource {
},
RebalanceEvent::Assignments(assignments) => {
debug!("Received partition assignments {:?}", assignments);
self.state.num_active_partitions = assignments.len();
self.state.num_active_partitions = NumActivePartitions::Some(assignments.len());
self.state.assigned_partition_ids = assignments;
}
}
Expand All @@ -308,7 +338,7 @@ impl Source for KafkaSource {
let message = match message_res {
Ok(message) => message,
Err(KafkaError::PartitionEOF(partition_id)) => {
self.state.num_active_partitions -= 1;
self.state.num_active_partitions.dec();
info!(
topic = %self.topic,
partition_id = ?partition_id,
Expand Down Expand Up @@ -370,14 +400,11 @@ impl Source for KafkaSource {
};
ctx.send_message(batch_sink, batch).await?;
}

// TODO: not entirely sure why this is here. Seems like this would prevent stream processing
if self.state.num_active_partitions == 0 {
if self.state.num_active_partitions.is_zero() {
info!(topic = %self.topic, "Reached end of topic.");
ctx.send_exit_with_success(batch_sink).await?;
return Err(ActorExitStatus::Success);
}

debug!("batch complete, waiting for next iteration");
Ok(Duration::default())
}
Expand Down Expand Up @@ -406,7 +433,7 @@ impl Source for KafkaSource {
"topic": self.topic,
"assigned_partition_ids": assigned_partition_ids,
"current_positions": current_positions,
"num_active_partitions": self.state.num_active_partitions,
"num_active_partitions": self.state.num_active_partitions.json(),
"num_bytes_processed": self.state.num_bytes_processed,
"num_messages_processed": self.state.num_messages_processed,
"num_invalid_messages": self.state.num_invalid_messages,
Expand Down

0 comments on commit ed67ab2

Please sign in to comment.