Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Externa metadata store #1838

Merged
merged 16 commits into from
Aug 16, 2024
Merged
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ datafusion-expr = { version = "40.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
dialoguer = { version = "0.11.0" }
downcast-rs = { version ="1.2.1" }
downcast-rs = { version = "1.2.1" }
enum-map = { version = "2.7.3" }
enumset = { version = "1.1.3" }
flexbuffers = { version = "2.0.0" }
Expand Down Expand Up @@ -194,6 +194,7 @@ tracing-test = { version = "0.2.5" }
ulid = { version = "1.1.0" }
url = { version = "2.5" }
uuid = { version = "1.3.0", features = ["v7", "serde"] }
etcd-client = { version = "0.14" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to order dependencies alphabetically. Unfortunately, there is no good tooling for it.


[profile.release]
opt-level = 3
Expand Down
8 changes: 7 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["tracing"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio-util = { workspace = true, features = ["net"] }
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
tonic = { workspace = true, features = [
"transport",
"codegen",
"prost",
"gzip",
] }
tower = { workspace = true }
tracing = { workspace = true }
etcd-client = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with the alphabetical sorting.


[build-dependencies]
tonic-build = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/metadata_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod providers;
mod test_util;

#[cfg(any(test, feature = "test-util"))]
Expand Down
286 changes: 286 additions & 0 deletions crates/core/src/metadata_store/providers/etcd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
use crate::metadata_store::{
MetadataStore, Precondition, ReadError, Version, VersionedValue, WriteError,
};
use anyhow::Context;
use bytes::Bytes;
use bytestring::ByteString;
use etcd_client::{Client, Compare, CompareOp, KeyValue, KvClient, Txn, TxnOp, TxnOpResponse};

impl From<etcd_client::Error> for ReadError {
fn from(value: etcd_client::Error) -> Self {
Self::Network(value.into())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all etcd_client::Error network related or could there be an invalid argument or something different? Maybe we also need to extend ReadError if it is not expressive enough to return different kind of errors.

Copy link
Contributor Author

@muhamadazmy muhamadazmy Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i had to made a choice either to assume it's all network, or extend ReadError.

Maybe we can a variant StoreError(GenericError) for any possible store error that can't be categorized to a specific Read/WriteError ?

}
}

impl From<etcd_client::Error> for WriteError {
fn from(value: etcd_client::Error) -> Self {
Self::Network(value.into())
}
}

//todo: version of the kv is reset to 1 if the key was deleted then recrated.
// this means that a key can change (by means of deletion and recreation) and will
// always have version 1!
// The only way to detect this is to also track the "mod revision" of the store as part
// of the VersionValue.
//
// The problem is that the restate Version is only u32 while both etcd version and mod version
// are both i64.
//
// Changing the Version to have a u64 value also delays the problem for later since it will be a while before
// mod_revision or version hit the u32::MAX but that's totally dependent on how frequent the changes are
//
// The correct solution is of course to make Version u128.
// What we do here is ONLY relying on the "version" of the key combined with the fact that we never actually delete the key
// but instead put a "tombstone" in place so version doesn't reset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// but instead put a "tombstone" in place so version doesn't reset
// but instead put a "tombstone" in place so version doesn't reset.

Maybe also add that we are only using the lower 32 bits of the Etcd version and fail once the version is larger than u32::MAX.

trait ToVersion {
fn to_version(self) -> Result<Version, ReadError>;
}

impl ToVersion for &KeyValue {
fn to_version(self) -> Result<Version, ReadError> {
if self.version() > u32::MAX as i64 {
return Err(ReadError::Codec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is more an internal error than a codec error?

"[etcd] key version exceeds max u32".into(),
));
}

Ok(Version::from(self.version() as u32))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could rely on u32::try_from(self.version()) for the conversion. Then we don't have to check boundaries manually and do manual casts via as.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely!

}
}

pub struct EtcdMetadataStore {
muhamadazmy marked this conversation as resolved.
Show resolved Hide resolved
client: Client,
}

impl EtcdMetadataStore {
pub async fn new<S: AsRef<[A]>, A: AsRef<str>>(addresses: S) -> anyhow::Result<Self> {
//todo: maybe expose some of the connection options to the node config
let client = Client::connect(addresses, None)
.await
.context("failed to connect to etcd cluster")?;

Ok(Self { client })
}

// put updates the key if and only if the key does not exist
// and because we never actually delete the key there is a possibility
// that the key exists, but it's value is nilled (that still considered "does not exist").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// that the key exists, but it's value is nilled (that still considered "does not exist").
// that the key exists, but it's value is nilled (that is still considered "does not exist").

// this is why we run this complex transaction where we
// - check if the key exists but empty (value is empty bytes)
// - otherwise, we run an (or_else) branch where itself is a transaction
// where we check if the key actually does not exist at all!
// - Then do a PUT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it deliberate that you've used a normal comment here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably put this comment into the body of the method since it seems to describe the concrete implementation strategy of the method.

pub async fn put_not_exist(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be non-pub?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put_if_not_exists and put_if_version_matches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there is no reason for it to be public, i must have missed it

&self,
client: &mut KvClient,
key: Bytes,
value: Bytes,
) -> Result<bool, WriteError> {
// an or_else branch is executed if the and_then branch of the
// root transaction was not executed (failed condition)
// this means that checked if the value exists but empty always
// done first. Otherwise we check if the value does not exist at all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this to where txn is defined: "We first check if the value is empty and then whether the value does not exist." or something similar.

let or_else = Txn::new()
.when(vec![Compare::version(key.clone(), CompareOp::Equal, 0)])
.and_then(vec![TxnOp::put(key.clone(), value.clone(), None)]);

let txn = Txn::new()
.when(vec![Compare::value(
key.clone(),
CompareOp::Equal,
Vec::default(),
)])
.and_then(vec![TxnOp::put(key, value, None)])
.or_else(vec![TxnOp::txn(or_else)]);
let response = client.txn(txn).await?;

// to check if the put operation has actually been "executed"
// we need to check the success of the individual branches.
// and we can't really rely on the response.success() output
// the reason is if the or_else branch was executed this is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// and we can't really rely on the response.success() output
// the reason is if the or_else branch was executed this is
// We can't really rely on the response.success() output,
// because if the or_else branch was executed the txn is

// still considered a failure, while in our case that can still
// be a success, unless the or_else branch itself failed as well
if let Some(resp) = response.op_responses().into_iter().next() {
match resp {
TxnOpResponse::Txn(txn) => {
// this means we had to enter the else branch
// and had to run the or_else transaction
return Ok(txn.succeeded());
}
TxnOpResponse::Put(_) => {
return Ok(response.succeeded());
}
_ => {
unreachable!()
}
}
}
Ok(response.succeeded())
}

// only update the key if has the exact given version
// otherwise returns false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change into Rust doc.

pub async fn put_with_version(
&self,
client: &mut KvClient,
key: Bytes,
value: Bytes,
version: i64,
) -> Result<bool, WriteError> {
let txn = Txn::new()
.when(vec![Compare::version(
key.clone(),
CompareOp::Equal,
version,
)])
.and_then(vec![TxnOp::put(key, value, None)]);

let response = client.txn(txn).await?;

Ok(response.succeeded())
}
}

#[async_trait::async_trait]
impl MetadataStore for EtcdMetadataStore {
/// Gets the value and its current version for the given key. If key-value pair is not present,
/// then return [`None`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least with Rust rover, you don't need to copy the RustDoc on the implementation, since the IDE will still display it. Then we only have one place where we need to update the RustDocs.

async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let mut client = self.client.kv_client();
let mut response = client.get(key.into_bytes(), None).await?;

// return first value because this suppose to be an exact match
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// return first value because this suppose to be an exact match
// return first value because this is supposed to be an exact match

// not a scan
let kv = match response.take_kvs().into_iter().next() {
None => return Ok(None),
Some(kv) => kv,
};

// please read todo! on implementation of .to_version()
let version = (&kv).to_version()?;
let (_, value) = kv.into_key_value();

if value.is_empty() {
// we keep an empty value in place of
// deleted keys to work around the version limitation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the comment is now outdated.

return Ok(None);
}

Ok(Some(VersionedValue::new(version, value.into())))
}

/// Gets the current version for the given key. If key-value pair is not present, then return
/// [`None`].
async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let mut client = self.client.kv_client();
let mut response = client.get(key.into_bytes(), None).await?;

// return first value because this suppose to be an exact match
// not a scan
let kv = match response.take_kvs().into_iter().next() {
None => return Ok(None),
Some(kv) => kv,
};

// please read todo! on implementation of .to_version()
let version = (&kv).to_version()?;
let (_, value) = kv.into_key_value();

if value.is_empty() {
// we keep an empty value in place of deleted
// keys to work around the version limitation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely! The entire implementation can be improved now to not relay on the value being empty. Thanks for pointing this out

return Ok(None);
}

Ok(Some(version))
}

/// Puts the versioned value under the given key following the provided precondition. If the
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`].
///
/// NOTE: this implementation disregard that version attached on the value and depends only on the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// NOTE: this implementation disregard that version attached on the value and depends only on the
/// NOTE: This implementation disregards the version attached to the value and depends only on the

/// auto version provided by etcd
///
/// it's up to the caller of this function to make sure that both versions are in sync.
async fn put(
&self,
key: ByteString,
value: VersionedValue,
precondition: Precondition,
) -> Result<(), WriteError> {
let mut client = self.client.kv_client();
match precondition {
Precondition::None => {
client.put(key.into_bytes(), value.value, None).await?;
}
Precondition::DoesNotExist => {
if !self
.put_not_exist(&mut client, key.into_bytes(), value.value)
.await?
{
// pre condition failed.
return Err(WriteError::FailedPrecondition(
"key-value pair exists".into(),
));
};
}
Precondition::MatchesVersion(version) => {
if !self
.put_with_version(
&mut client,
key.into_bytes(),
value.value,
u32::from(version) as i64,
)
.await?
{
return Err(WriteError::FailedPrecondition(
"key version mismatch".into(),
));
};
}
}

Ok(())
}

/// Deletes the key-value pair for the given key following the provided precondition. If the
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`].
async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
let mut client = self.client.kv_client();
match precondition {
Precondition::None => {
client.delete(key.into_bytes(), None).await?;
}
Precondition::DoesNotExist => {
if !self
.put_not_exist(&mut client, key.into_bytes(), Vec::default().into())
.await?
{
// pre condition failed.
return Err(WriteError::FailedPrecondition(
"key-value pair exists".into(),
));
};
}
Precondition::MatchesVersion(version) => {
if !self
.put_with_version(
&mut client,
key.into_bytes(),
Vec::default().into(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for giving you bad advice. I think it is ok to implement the delete operation by deleting the complete entry. If users of the MetadataStore want to keep the version around, then they need to leave a tombstone behind.

u32::from(version) as i64,
)
.await?
{
return Err(WriteError::FailedPrecondition(
"key version mismatch".into(),
));
};
}
}

Ok(())
}
}
3 changes: 3 additions & 0 deletions crates/core/src/metadata_store/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod etcd;

pub use etcd::EtcdMetadataStore;
Loading
Loading