-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add single node MetadataStore implementation #1291
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very Impressive! I feel bad on how slow I'm moving on the local loglet in comparison 😆
I've left a few minor nits but in general it's good to go.
|
||
/// Puts the versioned value under the given key following the provided precondition. If the | ||
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. | ||
async fn put( | ||
&self, | ||
key: &str, | ||
key: ByteString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious to know the benefits of using ByteString here, I'm under the impression that pretty much all keys are static strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I switched to ByteString
is that I needed an owned type for the restate_node_protocol::metadata_store::Put
message and I wanted to avoid an extra clone operation for &'static str
keys. With ByteString::from_static(&'static str)
this should be possible. Moreover, I didn't want to change the traits type to &'static str
which seemed too limiting for users. This is probably premature optimization at its best 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining. I think at some point we should explore using smol_str
or smartstring
for most of our internal strings.
async fn request(&self, kind: RequestKind) -> Result<ResponseKind, RequestError> { | ||
let request_id = self.next_request_id(); | ||
|
||
// eventual clean up of closed response senders | ||
if request_id >= self.state.last_clean_up.load(Ordering::Relaxed) + 1000 { | ||
self.retain_open_response_senders() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like pretty soon we will need to write an abstraction for in-flight request/response tracking. This will be a common pattern as we build more networked components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed. This would be great to solve generically :-)
fn delete(&self, key: &ByteString, precondition: &Precondition) -> Result<()> { | ||
match precondition { | ||
Precondition::None => self.delete_kv_pair(key), | ||
// this condition does not really make sense for the delete operation | ||
Precondition::DoesNotExist => { | ||
let current_version = self.get_version(key)?; | ||
|
||
if current_version.is_none() { | ||
// nothing to do | ||
Ok(()) | ||
} else { | ||
Err(Error::kv_pair_exists()) | ||
} | ||
} | ||
Precondition::MatchesVersion(version) => { | ||
let current_version = self.get_version(key)?; | ||
|
||
if current_version.as_ref() == Some(version) { | ||
self.delete_kv_pair(key) | ||
} else { | ||
Err(Error::version_mismatch(version, current_version.as_ref())) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disclaimer: It's unlikely that we will need delete() so everything I write here is not necessary.
A note on deleting versioned data. Removing versioned data require that we leave a tombstone with the latest version that was used. This ensures that if we ever re-put the data, we'd be sure that the new value picks up a higher version number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true. Your comment actually touches upon another aspect that I would like to bring up to discuss: At the moment the metadata store implementation does not enforce that newly inserted values have a strictly larger version than a previously stored value. As long as the precondition is fulfilled, users can put a value with an arbitrary version. This puts a stronger burden on the user to set the versions correctly. If we enforce this invariant, then we would additionally need the admin command set
to set an arbitrary versioned value which is currently possible via put
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't worry so much about versions going backwards, in fact, it might come handy in some emergency tooling because if this happens, it's a bug.
cb83864
to
e08286f
Compare
@AhmedSoliman I've pushed a new commit (the last one) which changes the MetadataStore trait and implementation to require strictly monotonic versions when putting new items. It restricts a bit the flexibility of the metadatastore while it enforces that versions are always increasing. Not entirely sure whether it isn't actually a disimprovement. |
8960c9c
to
2dcc529
Compare
2dcc529
to
3d1d3c3
Compare
@@ -0,0 +1,17 @@ | |||
[package] | |||
name = "restate-grpc-util" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could that be part of restate-network or would that be needed in crates that don't depend on restate-network already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the metadata-store
crate needs this functionality and does not depend on restate-network
.
|
||
/// Puts the versioned value under the given key following the provided precondition. If the | ||
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`]. | ||
async fn put( | ||
&self, | ||
key: &str, | ||
key: ByteString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining. I think at some point we should explore using smol_str
or smartstring
for most of our internal strings.
/// | ||
/// In order to avoid issues arising from concurrency, we run the metadata | ||
/// store in a single thread. | ||
pub struct RocksDBMetadataStore<N> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. let's refine the naming as we progress forward. Eventually single/multinode will be merged I suppose.
fn delete(&self, key: &ByteString, precondition: &Precondition) -> Result<()> { | ||
match precondition { | ||
Precondition::None => self.delete_kv_pair(key), | ||
// this condition does not really make sense for the delete operation | ||
Precondition::DoesNotExist => { | ||
let current_version = self.get_version(key)?; | ||
|
||
if current_version.is_none() { | ||
// nothing to do | ||
Ok(()) | ||
} else { | ||
Err(Error::kv_pair_exists()) | ||
} | ||
} | ||
Precondition::MatchesVersion(version) => { | ||
let current_version = self.get_version(key)?; | ||
|
||
if current_version.as_ref() == Some(version) { | ||
self.delete_kv_pair(key) | ||
} else { | ||
Err(Error::version_mismatch(version, current_version.as_ref())) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't worry so much about versions going backwards, in fact, it might come handy in some emergency tooling because if this happens, it's a bug.
BindAddress is the server side equivalent of the AdvertisedAddress and allows to specify where to bind the server to.
The restate-grpc-util crate contains helper functions for inter- acting with grpc services.
The generic hyper server run util takes a hyper service and a bind address. It will let the server bind to the specified bind address.
This commit removes the increment_version method from the Versioned trait since it is only used for testing purposes.
3d1d3c3
to
8d99db9
Compare
Thanks for your review @AhmedSoliman. I've addressed your comments and pushed the final version. Merging this PR once GHA gives green light. |
The implementation is based on a single node which uses gRPC for communication and stores the kv pairs durably in a RocksDB instance. This fixes restatedev#1284.
8d99db9
to
ba7893c
Compare
The implementation is based on a single node which uses Restate's
networking and stores the kv pairs durably in a RocksDB instance.
This fixes #1284.
This PR is based on #1288.