Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka source] Use consumer.recv() directly rather than consumer.stream() #1807

Merged
merged 3 commits into from
Jul 28, 2022

Conversation

guilload
Copy link
Member

Description

Simplify tokio select loop using consumer.recv() directly rather than consumer.stream()

How was this PR tested?

Ran Kafka source on an empty topic.

@guilload guilload requested a review from kstaken July 28, 2022 00:14
@@ -89,6 +89,7 @@ impl ClientContext for RdKafkaContext {}

impl ConsumerContext for RdKafkaContext {
fn pre_rebalance(&self, rebalance: &Rebalance) {
info!("Pre rebalance {:?}", rebalance);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily adding those to understand what happens when adding/removing consumers/partitions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behind the scenes there's actually only one event in the kafka protocol, the Rust lib then adds these additional callbacks at convenient points in its handler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Good to know. Thanks for the explanation. One thing that I observed is that sometimes the same partitions are first revoked and then added again. Makes me question whether we should handle Revoke events.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same but I'm pretty sure now that simple rebalances like this won't need it.

There are more advanced rebalancing strategies that try to avoid stopping the entire consumer group while partitions move but those are only really needed when you're trying to minimize latency and I don't think the Rust lib has good support for those yet. This use case also won't really need that since there's inherent latency in the ingest process no matter what.

quickwit-indexing/src/source/kafka_source.rs Show resolved Hide resolved
quickwit-indexing/src/source/kafka_source.rs Show resolved Hide resolved
quickwit-indexing/src/source/kafka_source.rs Show resolved Hide resolved
quickwit-indexing/src/source/kafka_source.rs Outdated Show resolved Hide resolved
@guilload guilload force-pushed the guilload--scalable-kafka branch 2 times, most recently from 2556415 to aaf2ed1 Compare July 28, 2022 15:04
Copy link
Collaborator

@kstaken kstaken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good.

@kstaken kstaken merged commit 7b36eed into scalable-kafka Jul 28, 2022
@kstaken kstaken deleted the guilload--scalable-kafka branch July 28, 2022 22:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants