Skip to content

Commit

Permalink
PolyBytes serde by proxy
Browse files Browse the repository at this point in the history
Introduces a serde-proxy for PolyBytes to allow passing `Record` in places where we need the value to be serializable. Serialization is a passthrough in case the sum type is already serialized
  • Loading branch information
AhmedSoliman committed Sep 10, 2024
1 parent 50ffef0 commit e00bfc2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
20 changes: 18 additions & 2 deletions crates/types/src/logs/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@

use std::sync::Arc;

use crate::storage::{PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode};
use serde::{Deserialize, Serialize};

use crate::storage::{
EncodedPolyBytes, PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode,
};
use crate::time::NanosSinceEpoch;

use super::{KeyFilter, Keys, MatchKeyQuery};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
created_at: NanosSinceEpoch,
#[serde(with = "serde_with::As::<EncodedPolyBytes>")]
body: PolyBytes,
keys: Keys,
}
Expand All @@ -39,6 +44,17 @@ impl Record {
&self.keys
}

pub fn estimated_encode_size(&self) -> usize {
let body_size = match &self.body {
PolyBytes::Bytes(slice) => slice.len(),
PolyBytes::Typed(_) => {
// constant, assumption based on base envelope size of ~600 bytes.
2_048 // 2KiB
}
};
size_of::<Keys>() + size_of::<NanosSinceEpoch>() + body_size
}

pub fn body(&self) -> &PolyBytes {
&self.body
}
Expand Down
51 changes: 49 additions & 2 deletions crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
use std::mem;
use std::sync::Arc;

use bytes::{Buf, BufMut, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use downcast_rs::{impl_downcast, DowncastSync};
use serde::de::{DeserializeOwned, Error as DeserializationError};
use serde::ser::Error as SerializationError;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use tracing::error;

use crate::errors::GenericError;
Expand Down Expand Up @@ -190,6 +190,53 @@ pub enum PolyBytes {
Typed(Arc<dyn StorageEncode>),
}

impl StorageEncode for PolyBytes {
fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> {
match self {
PolyBytes::Bytes(bytes) => buf.put_slice(bytes.as_ref()),
PolyBytes::Typed(typed) => {
StorageCodec::encode(&**typed, buf)?;
}
};
Ok(())
}

fn default_codec(&self) -> StorageCodecKind {
StorageCodecKind::FlexbuffersSerde
}
}

/// SerializeAs/DeserializeAs to implement ser/de trait for [`PolyBytes`]
/// Use it with `#[serde(with = "serde_with::As::<EncodedPolyBytes>")]`.
pub struct EncodedPolyBytes {}

impl serde_with::SerializeAs<PolyBytes> for EncodedPolyBytes {
fn serialize_as<S>(source: &PolyBytes, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match source {
PolyBytes::Bytes(bytes) => serializer.serialize_bytes(bytes.as_ref()),
PolyBytes::Typed(typed) => {
// todo: estimate size to avoid re allocations
let mut buf = BytesMut::new();
StorageCodec::encode(&**typed, &mut buf).expect("record serde is infallible");
serializer.serialize_bytes(buf.as_ref())
}
}
}
}

impl<'de> serde_with::DeserializeAs<'de, PolyBytes> for EncodedPolyBytes {
fn deserialize_as<D>(deserializer: D) -> Result<PolyBytes, D::Error>
where
D: serde::Deserializer<'de>,
{
let buf = Bytes::deserialize(deserializer)?;
Ok(PolyBytes::Bytes(buf))
}
}

static_assertions::assert_impl_all!(PolyBytes: Send, Sync);

/// Enable simple serialization of String types as length-prefixed byte slice
Expand Down

0 comments on commit e00bfc2

Please sign in to comment.