Skip to content

Commit

Permalink
Add MetadataStore variant with explicit version propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Mar 19, 2024
1 parent eb453e0 commit c80f41b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/metadata-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ publish = false
restate-types = { workspace = true }

async-trait = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true }
serde = { workspace = true }
static_assertions = { workspace = true }
Expand Down
68 changes: 53 additions & 15 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

use async_trait::async_trait;
use bytes::Bytes;
use restate_types::Version;
use restate_types::{Version, Versioned};
use serde::de::{DeserializeOwned, StdError};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

Expand All @@ -26,6 +27,8 @@ pub enum NetworkError {
pub enum ReadError {
#[error(transparent)]
Network(NetworkError),
#[error("deserialize failed: {0}")]
Deserialize(Box<dyn StdError>),
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -34,6 +37,8 @@ pub enum WriteError {
PreconditionViolation(String),
#[error(transparent)]
Network(NetworkError),
#[error("serialize failed: {0}")]
Serialize(Box<dyn StdError>),
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -71,15 +76,14 @@ pub trait MetadataStore {
/// [`None`].
async fn get_version(&self, key: &str) -> Result<Option<Version>, ReadError>;

/// Puts the given key-value pair following the provided precondition. If the operation succeeds,
/// then it returns the new [`Version`] of the key-value pair. If the precondition is not met,
/// then the operation returns a [`WriteError::PreconditionViolation`].
/// Puts the versioned value under the given key following the provided precondition. If the
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`].
async fn put(
&self,
key: &str,
value: Bytes,
value: VersionedValue,
precondition: Precondition,
) -> Result<Version, WriteError>;
) -> Result<(), WriteError>;

/// Deletes the key-value pair for the given key following the provided precondition. If the
/// precondition is not met, then the operation returns a [`WriteError::PreconditionViolation`].
Expand All @@ -101,25 +105,59 @@ impl MetadataStoreClient {
inner: Arc::new(metadata_store),
}
}
}

#[async_trait]
impl MetadataStore for MetadataStoreClient {
async fn get(&self, key: &str) -> Result<Option<VersionedValue>, ReadError> {
self.inner.get(key).await
async fn get<T: Versioned + DeserializeOwned>(
&self,
key: &str,
) -> Result<Option<T>, ReadError> {
let value = self.inner.get(key).await?;

if let Some(versioned_value) = value {
// todo add proper format version
let (value, _) = bincode::serde::decode_from_slice::<T, _>(
versioned_value.value.as_ref(),
bincode::config::standard(),
)
.map_err(|err| ReadError::Deserialize(err.into()))?;

assert_eq!(
versioned_value.version,
value.version(),
"versions must align"
);

Ok(Some(value))
} else {
Ok(None)
}
}

async fn get_version(&self, key: &str) -> Result<Option<Version>, ReadError> {
self.inner.get_version(key).await
}

async fn put(
async fn put<T>(
&self,
key: &str,
value: Bytes,
value: T,
precondition: Precondition,
) -> Result<Version, WriteError> {
self.inner.put(key, value, precondition).await
) -> Result<(), WriteError>
where
T: Versioned + Serialize,
{
let version = value.version();

// todo add proper format version
let value = bincode::serde::encode_to_vec(value, bincode::config::standard())
.map_err(|err| WriteError::Serialize(err.into()))?;

self.inner
.put(
key,
VersionedValue::new(version, value.into()),
precondition,
)
.await
}

async fn delete(&self, key: &str, precondition: Precondition) -> Result<(), WriteError> {
Expand Down

0 comments on commit c80f41b

Please sign in to comment.