Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a deserialization target that retains the ValidatedMessage metadata #45

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/hedwig.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
strategy:
fail-fast: false
matrix:
rust_toolchain: [nightly, stable, 1.53.0]
rust_toolchain: [nightly, stable, 1.60.0]
os: [ubuntu-latest]
timeout-minutes: 20
steps:
Expand Down
35 changes: 31 additions & 4 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
//!
//! See the [`Consumer`] trait.

use crate::ValidatedMessage;
use crate::message::ValidatedMessage;
use async_trait::async_trait;
use bytes::Bytes;
use either::Either;
use futures_util::stream;
use pin_project::pin_project;
Expand Down Expand Up @@ -53,7 +54,7 @@ pub trait Consumer {
type Error;
/// The stream returned by [`stream`]
type Stream: stream::Stream<
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage>, Self::Error>,
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage<Bytes>>, Self::Error>,
>;

/// Begin pulling messages from the backing message service.
Expand Down Expand Up @@ -86,11 +87,37 @@ pub trait DecodableMessage {
type Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error>
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized;
}

impl<M> DecodableMessage for ValidatedMessage<M>
where
M: DecodableMessage,
{
/// The error returned when a message fails to decode
type Error = M::Error;

/// The decoder used to decode a validated message
type Decoder = M::Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized,
{
let message = M::decode(msg.clone(), decoder)?;
Ok(Self {
id: msg.id,
timestamp: msg.timestamp,
schema: msg.schema,
headers: msg.headers,
data: message,
})
}
}

/// A received message which can be acknowledged to prevent re-delivery by the backing message
/// service.
///
Expand Down Expand Up @@ -186,7 +213,7 @@ pub struct MessageStream<S, D, M> {
impl<S, D, M, AckToken, StreamError> stream::Stream for MessageStream<S, D, M>
where
S: stream::Stream<
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage>, StreamError>,
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage<Bytes>>, StreamError>,
>,
M: DecodableMessage<Decoder = D>,
{
Expand Down
83 changes: 3 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,20 @@
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{borrow::Cow, collections::BTreeMap, time::SystemTime};
pub use topic::Topic;

use bytes::Bytes;
use uuid::Uuid;
use std::collections::BTreeMap;

mod backends;
mod consumer;
pub mod message;
mod publisher;
mod tests;
mod topic;
pub mod validators;

pub use backends::*;

Check warning on line 91 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (nightly, ubuntu-latest)

unused import: `backends::*`
pub use consumer::*;
pub use publisher::*;

Expand Down Expand Up @@ -117,81 +117,4 @@
pub type Headers = BTreeMap<String, String>;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage {
/// Unique message identifier.
id: Uuid,
/// The timestamp when message was created in the publishing service.
timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
headers: Headers,
/// The encoded message data.
data: Bytes,
}

impl ValidatedMessage {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}

/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The encoded message data.
pub fn data(&self) -> &[u8] {
&self.data
}

/// Destructure this message into just the contained data
pub fn into_data(self) -> Bytes {
self.data
}
}
pub type ValidatedMessage = message::ValidatedMessage<Bytes>;
87 changes: 87 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use bytes::Bytes;
use std::{borrow::Cow, time::SystemTime};
use uuid::Uuid;

use crate::Headers;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage<M> {
/// Unique message identifier.
pub(crate) id: Uuid,
/// The timestamp when message was created in the publishing service.
pub(crate) timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub(crate) schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub(crate) headers: Headers,
/// The message data.
pub(crate) data: M,
}

impl ValidatedMessage<Bytes> {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}
}

impl<M> ValidatedMessage<M> {
/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The message data.
pub fn data(&self) -> &M {
&self.data
}

/// Destructure this message into just the contained data
pub fn into_data(self) -> M {
self.data
}
}
23 changes: 22 additions & 1 deletion src/tests/google.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError,
StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
},
message,
validators::{
prost::{ExactSchemaMatcher, SchemaMismatchError},
ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError,
Expand Down Expand Up @@ -42,7 +43,7 @@ impl EncodableMessage for TestMessage {
uuid::Uuid::nil(),
std::time::SystemTime::UNIX_EPOCH,
SCHEMA,
Headers::default(),
Headers::from([(String::from("key"), String::from("value"))]),
self,
)
}
Expand All @@ -57,6 +58,26 @@ impl DecodableMessage for TestMessage {
}
}

#[test]
fn decode_with_headers() -> Result<(), BoxError> {
let orig_message = TestMessage {
payload: "foobar".into(),
};

let encoded = orig_message.encode(&ProstValidator::new())?;

let decoded = message::ValidatedMessage::<TestMessage>::decode(
encoded,
&ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)),
)?;

let headers = Headers::from([(String::from("key"), String::from("value"))]);

assert_eq!(decoded.headers(), &headers);

Ok(())
}

#[tokio::test]
#[ignore = "pubsub emulator is finicky, run this test manually"]
async fn roundtrip_protobuf() -> Result<(), BoxError> {
Expand Down
Loading