-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Externa metadata store #1838
Externa metadata store #1838
Conversation
The implementation uses an external etcd client to implement the basic Metadata store functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for creating this PR @muhamadazmy. Having an etcd integration is really cool :-) The changes look good to me.
I think that I have given you bad advice on the delete
call specification. I think it is fine if we delete the entry and don't leave a tombstone behind. If users need it, then they can do it themselves.
Once you have updated this bit, then let me know and I'll give the PR another pass.
Cargo.toml
Outdated
@@ -194,6 +194,7 @@ tracing-test = { version = "0.2.5" } | |||
ulid = { version = "1.1.0" } | |||
url = { version = "2.5" } | |||
uuid = { version = "1.3.0", features = ["v7", "serde"] } | |||
etcd-client = { version = "0.14" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to order dependencies alphabetically. Unfortunately, there is no good tooling for it.
crates/core/Cargo.toml
Outdated
tower = { workspace = true } | ||
tracing = { workspace = true } | ||
etcd-client = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here with the alphabetical sorting.
|
||
impl From<etcd_client::Error> for ReadError { | ||
fn from(value: etcd_client::Error) -> Self { | ||
Self::Network(value.into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are all etcd_client::Error
network related or could there be an invalid argument or something different? Maybe we also need to extend ReadError
if it is not expressive enough to return different kind of errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, i had to made a choice either to assume it's all network, or extend ReadError.
Maybe we can a variant StoreError(GenericError) for any possible store error that can't be categorized to a specific Read/WriteError ?
// | ||
// The correct solution is of course to make Version u128. | ||
// What we do here is ONLY relying on the "version" of the key combined with the fact that we never actually delete the key | ||
// but instead put a "tombstone" in place so version doesn't reset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// but instead put a "tombstone" in place so version doesn't reset | |
// but instead put a "tombstone" in place so version doesn't reset. |
Maybe also add that we are only using the lower 32 bits of the Etcd version and fail once the version is larger than u32::MAX
.
impl ToVersion for &KeyValue { | ||
fn to_version(self) -> Result<Version, ReadError> { | ||
if self.version() > u32::MAX as i64 { | ||
return Err(ReadError::Codec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is more an internal error than a codec error?
/// Gets the value and its current version for the given key. If key-value pair is not present, | ||
/// then return [`None`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least with Rust rover, you don't need to copy the RustDoc on the implementation, since the IDE will still display it. Then we only have one place where we need to update the RustDocs.
let mut client = self.client.kv_client(); | ||
let mut response = client.get(key.into_bytes(), None).await?; | ||
|
||
// return first value because this suppose to be an exact match |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// return first value because this suppose to be an exact match | |
// return first value because this is supposed to be an exact match |
/// Puts the versioned value under the given key following the provided precondition. If the | ||
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. | ||
/// | ||
/// NOTE: this implementation disregard that version attached on the value and depends only on the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// NOTE: this implementation disregard that version attached on the value and depends only on the | |
/// NOTE: This implementation disregards the version attached to the value and depends only on the |
.put_with_version( | ||
&mut client, | ||
key.into_bytes(), | ||
Vec::default().into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for giving you bad advice. I think it is ok to implement the delete
operation by deleting the complete entry. If users of the MetadataStore
want to keep the version around, then they need to leave a tombstone behind.
crates/types/src/config/common.rs
Outdated
/// Connects to another node that is running Metadata Store | ||
/// over gRPC. | ||
/// The remote node must run with MetadataStore Role |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
/// Connects to another node that is running Metadata Store | |
/// over gRPC. | |
/// The remote node must run with MetadataStore Role | |
/// Connects to an embedded metadata store that is run by nodes that run with the MetadataStore role. |
@tillrohrmann thank you so much for your review. I applied All the comments. I also changed how we delete keys from the etcd store which reduced the code drastically and hence some comments became irrelevant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, nice work @muhamadazmy. The changes look good to me. I left a few minor comments. +1 for merging after resolving them.
|
||
/// deletes a key only if and only if the current value in store | ||
/// has the exact given version | ||
async fn delete_if_has_version( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn delete_if_has_version( | |
async fn delete_if_version_matches( |
|
||
/// puts a key/value if and only if the current value in store | ||
/// has the exact given version | ||
async fn put_if_has_version( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn put_if_has_version( | |
async fn put_if_version_matches( |
// we keep an empty value in place of | ||
// deleted keys to work around the version limitation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the comment is now outdated.
// we keep an empty value in place of deleted | ||
// keys to work around the version limitation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outdated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, definitely! The entire implementation can be improved now to not relay on the value being empty. Thanks for pointing this out
let metadata_store_address = if let config::MetadataStore::Embedded { address } = | ||
metadata_store_client_options.metadata_store_client | ||
{ | ||
address.clone() | ||
} else { | ||
unimplemented!() | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cases where you expect a single variant, you can use let_assert!(config::MetadataStore::Embedded { address } = metadata_store_client_options.metadata_store_client)
to destructure the type.
crates/types/src/config/common.rs
Outdated
#[cfg_attr(feature = "schemars", schemars(with = "String"))] | ||
pub metadata_store_address: AdvertisedAddress, | ||
/// Metadata store server to bootstrap the node from. | ||
pub metadata_store_client: MetadataStore, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename MetadataStore
into MetadataStoreClient
.
This PR adds support to external metadata store. Right now we only implement the etcd store.
Unit testsNote:
The Testing above is manual testing that involves an etcd cluster. The etcd nodes are randomly stopped or killed and looks like restate operations resumes normally with no issue. Unless the number of down etcd node is bigger than the quorum then of course it fails to write changes. This is restored once enough nodes are back
Fixes #1829