-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce ControlProcessors command #1698
Conversation
15d7756
to
6a79b43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. My comments are non-blocking, feel free to discard them if it's too much hassle (perhaps a todo/comment for a follow up is sufficient)
async fn run_for_leader(&self, leader_epoch: LeaderEpoch) -> Result<(), ShutdownError> { | ||
self.control_tx | ||
.send(PartitionProcessorControlCommand::RunForLeader(leader_epoch)) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we should await in those cases. We don't want to block the control loop if a single processor is too busy, right?
Perhaps we can set a timeout at the call-site or switch to a try_send?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is indeed a problem. I am also not really happy with it right now. Switching it to try_send
could be a first band-aid. I'll change it that way and open an issue to revisit the PartitionProcessor
because there are also some other places where we await.
name_cache: BTreeMap<PartitionId, &'static str>, | ||
|
||
metadata: Metadata, | ||
metadata_store_client: MetadataStoreClient, | ||
partition_store_manager: PartitionStoreManager, | ||
attach_router: RpcRouter<AttachRequest, Networking>, | ||
incoming_get_state: BoxStream<'static, MessageEnvelope<GetProcessorsState>>, | ||
incoming_get_state: | ||
Pin<Box<dyn Stream<Item = MessageEnvelope<GetProcessorsState>> + Send + Sync + 'static>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why did we need to remove BoxStream stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to add the Sync
trait to the Stream
trait object. That's why I had to spell things out more verbosely :-(
for control_processor in control_processors.commands { | ||
self.on_control_processor(control_processor, &partition_table) | ||
.await?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about critical control loops with inner awaits. Best if we can execute concurrently but it might be too much of a hassle given how everything is structured. Alternatives could be to do fail early on potential blocking points, or add a timeout to limit waiting time upper bound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is not good. As a temporary band-aid, I will replace the sends
with try_send
as you've suggested. It still does not make control loop completely await-free. I will take a look at this in a separate step.
This commit introduces the ControlProcessors command which allows an external entity to instruct the PartitionProcessorManager to start and stop PartitionProcessors. This fixes restatedev#1695.
6a79b43
to
dbae1a5
Compare
This commit introduces the ControlProcessors command which allows an
external entity to instruct the PartitionProcessorManager to start
and stop PartitionProcessors.
This fixes #1695.
This PR is based on #1694.