-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MetadataManager init #1202
MetadataManager init #1202
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @AhmedSoliman. I like it a lot :-) +1 for merging.
crates/network/src/utils.rs
Outdated
let channel = match address { | ||
AdvertisedAddress::Uds(uds_path) => { | ||
// dummy endpoint required to specify an uds connector, it is not used anywhere | ||
Endpoint::try_from("/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to put a valid uri with scheme and authority here. Otherwise tonic complains. So maybe http://127.0.0.1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix. I think I just moved this code over, but it's a good catch.
crates/node/src/metadata.rs
Outdated
Ok(*v) | ||
} | ||
|
||
// Returns when the metadata kind is at the provided version (or newer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment seems not correct.
// Returns when the metadata kind is at the provided version (or newer) | ||
pub async fn wait_for_version( | ||
&self, | ||
metadata_kind: MetadataKind, | ||
min_version: Version, | ||
) -> Result<Version, ShutdownError> { | ||
let mut recv = self.inner.write_watches[metadata_kind].receive.clone(); | ||
let v = recv | ||
.wait_for(|v| *v >= min_version) | ||
.await | ||
.map_err(|_| ShutdownError)?; | ||
Ok(*v) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :-)
crates/node/src/metadata.rs
Outdated
biased; | ||
_ = cancellation_watcher() => { | ||
info!("Metadata manager stopped"); | ||
break; | ||
} | ||
Some(cmd) = self.inbound.recv() => { | ||
self.handle_command(cmd) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation looks a bit off. Might be GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Macros and rustfmt don't like each other :)
Will indent.
} | ||
|
||
/// Start and wait for shutdown signal. | ||
pub async fn run(mut self /*, network_sender: NetworkSender*/) -> anyhow::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the network_sender
be used to actively fetch metadata updates from the metadata store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a possible future, yes.
info!("Metadata manager started"); | ||
|
||
loop { | ||
tokio::select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐱💀 ;-)
This introduces `MetadataManager`, a node-level service that provides uniform access to different metadata kinds. This PR focuses on the nodes configuration only. `Metadata` is a cheaply cloneable handle that components can use to access metadata. `MetadataWriter` is only shared with components that can propose updates to the cached metadata. Additionally, this changes the node startup procedure to be one step closer to the original design. Worker attachment is only concerned with partition assignment and node IDs are directly fetched from NodesConfiguration. In this, nodes configuration will be statically created on startup if the bootstrap_cluster option is set (set by default at the moment). The server will fail to start if this is unset until we implement fetching nodes configuration from metadata store. The admin_address is not used anymore on startup but will be replaced with metadata address later for remote nodes config fetching.
MetadataManager init
This introduces
MetadataManager
, a node-level service that provides uniform access to different metadata kinds. This PR focuses on the nodes configuration only.Metadata
is a cheaply cloneable handle that components can use to access metadata.MetadataWriter
is only shared with components that can propose updates to the cached metadata.Additionally, this changes the node startup procedure to be one step closer to the original design. Worker attachment is only concerned with partition assignment and node IDs are directly fetched from NodesConfiguration.
In this, nodes configuration will be statically created on startup if the bootstrap_cluster option is set (set by default at the moment). The server will fail to start if this is unset until we implement fetching nodes configuration from metadata store.
The admin_address is not used anymore on startup but will be replaced with metadata address later for remote nodes config fetching.
Stack created with Sapling. Best reviewed with ReviewStack.