diff --git a/cloudevents-sdk-mqtt/Cargo.toml b/cloudevents-sdk-mqtt/Cargo.toml index f01e197d..b3523817 100644 --- a/cloudevents-sdk-mqtt/Cargo.toml +++ b/cloudevents-sdk-mqtt/Cargo.toml @@ -9,7 +9,7 @@ description = "CloudEvents official Rust SDK - Mqtt integration" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -cloudevents-sdk = { version = "0.2.0", path = ".." } +cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" paho-mqtt = { path = "../../paho.mqtt.rust" } chrono = { version = "^0.4", features = ["serde"] } diff --git a/cloudevents-sdk-mqtt/src/headers.rs b/cloudevents-sdk-mqtt/src/headers.rs index f9f57a6c..4a7a9bdb 100644 --- a/cloudevents-sdk-mqtt/src/headers.rs +++ b/cloudevents-sdk-mqtt/src/headers.rs @@ -16,7 +16,7 @@ fn attributes_to_headers(it: impl Iterator) -> HashMap<&'st (s, attribute_name_to_header!(s)) } }) - .collect() + .collect() } lazy_static! { @@ -32,4 +32,4 @@ pub enum MqttVersion { V3_1, V3_1_1, V5, -} \ No newline at end of file +} diff --git a/cloudevents-sdk-mqtt/src/lib.rs b/cloudevents-sdk-mqtt/src/lib.rs index 2499b500..ff58243a 100644 --- a/cloudevents-sdk-mqtt/src/lib.rs +++ b/cloudevents-sdk-mqtt/src/lib.rs @@ -2,13 +2,13 @@ //! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ #[macro_use] mod headers; -mod mqtt_producer_record; mod mqtt_consumer_record; +mod mqtt_producer_record; pub use mqtt_consumer_record::record_to_event; pub use mqtt_consumer_record::ConsumerMessageDeserializer; pub use mqtt_consumer_record::MessageExt; +pub use headers::MqttVersion; pub use mqtt_producer_record::MessageBuilderExt; pub use mqtt_producer_record::MessageRecord; -pub use headers::MqttVersion; \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs index 2ad83b3e..4c18a94c 100644 --- a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs +++ b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs @@ -1,7 +1,9 @@ use crate::headers; use cloudevents::event::SpecVersion; -use cloudevents::message::{Result, BinarySerializer, BinaryDeserializer, MessageAttributeValue, - MessageDeserializer, Encoding, StructuredSerializer, StructuredDeserializer}; +use cloudevents::message::{ + BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, + Result, StructuredDeserializer, StructuredSerializer, +}; use cloudevents::{message, Event}; use paho_mqtt::{Message, PropertyCode}; use std::collections::HashMap; @@ -37,7 +39,7 @@ impl ConsumerMessageDeserializer { impl BinaryDeserializer for ConsumerMessageDeserializer { fn deserialize_binary>(mut self, mut visitor: V) -> Result { if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}) + return Err(message::Error::WrongEncoding {}); } let spec_version = SpecVersion::try_from( @@ -124,9 +126,15 @@ impl MessageDeserializer for ConsumerMessageDeserializer { pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { match version { - headers::MqttVersion::V5 => BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), - headers::MqttVersion::V3_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), - headers::MqttVersion::V3_1_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + headers::MqttVersion::V5 => { + BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) + } + headers::MqttVersion::V3_1 => { + StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) + } + headers::MqttVersion::V3_1_1 => { + StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) + } } } @@ -145,11 +153,12 @@ mod tests { use super::*; use crate::mqtt_producer_record::MessageRecord; + use crate::MessageBuilderExt; use chrono::Utc; + use cloudevents::event::Data; use cloudevents::{EventBuilder, EventBuilderV10}; - use crate::MessageBuilderExt; + use paho_mqtt::MessageBuilder; use serde_json::json; - use cloudevents::event::Data; #[test] fn test_binary_record() { @@ -160,8 +169,10 @@ mod tests { .ty("example.test") .time(time) .source("http://localhost") - .data("application/json", - Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes())) + .data( + "application/json", + Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()), + ) .extension("someint", "10") .build() .unwrap(); @@ -178,7 +189,7 @@ mod tests { .unwrap(), headers::MqttVersion::V5, ) - .unwrap(); + .unwrap(); let msg = MessageBuilder::new() .topic("test") @@ -220,6 +231,9 @@ mod tests { .qos(1) .finalize(); - assert_eq!(msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), expected) + assert_eq!( + msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), + expected + ) } -} \ No newline at end of file +} diff --git a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs index 1b894dc0..e4c3487f 100644 --- a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs @@ -1,9 +1,11 @@ use super::headers; -use paho_mqtt::{Properties, Property, PropertyCode, MessageBuilder}; -use cloudevents::message::{BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, - StructuredDeserializer, StructuredSerializer, Error}; -use cloudevents::Event; use cloudevents::event::SpecVersion; +use cloudevents::message::{ + BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, + StructuredDeserializer, StructuredSerializer, +}; +use cloudevents::Event; +use paho_mqtt::{MessageBuilder, Properties, Property, PropertyCode}; use std::option::Option::Some; pub struct MessageRecord { @@ -22,68 +24,76 @@ impl MessageRecord { pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { match version { - headers::MqttVersion::V5 => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()), - headers::MqttVersion::V3_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), - headers::MqttVersion::V3_1_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), + headers::MqttVersion::V5 => { + BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) + } + headers::MqttVersion::V3_1 => { + StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) + } + headers::MqttVersion::V3_1_1 => { + StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) + } } } } impl BinarySerializer for MessageRecord { fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, headers::SPEC_VERSION_HEADER, - spec_version.as_str()) { - Ok(property) => { - match self.headers.push(property) { - Err(e) => Err(Error::Other { - source: Box::new(e) - }), - _ => Ok(self) - } + match Property::new_string_pair( + PropertyCode::UserProperty, + headers::SPEC_VERSION_HEADER, + spec_version.as_str(), + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), }, _ => Err(Error::UnrecognizedAttributeName { - name: headers::SPEC_VERSION_HEADER.to_string() - }) + name: headers::SPEC_VERSION_HEADER.to_string(), + }), } } fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, &headers::ATTRIBUTES_TO_MQTT_HEADERS - .get(name) - .ok_or(cloudevents::message::Error::UnrecognizedAttributeName { - name: String::from(name), - })? - .clone()[..], - &value.to_string()[..]) { - Ok(property) => { - match self.headers.push(property) { - Err(e) => Err(Error::Other { - source: Box::new(e) - }), - _ => Ok(self) - } + match Property::new_string_pair( + PropertyCode::UserProperty, + &headers::ATTRIBUTES_TO_MQTT_HEADERS + .get(name) + .ok_or(cloudevents::message::Error::UnrecognizedAttributeName { + name: String::from(name), + })? + .clone()[..], + &value.to_string()[..], + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), }, _ => Err(Error::UnrecognizedAttributeName { - name: headers::SPEC_VERSION_HEADER.to_string() - }) + name: headers::SPEC_VERSION_HEADER.to_string(), + }), } } fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, - &attribute_name_to_header!(name)[..], - &value.to_string()[..]) { - Ok(property) => { - match self.headers.push(property) { - Err(e) => Err(Error::Other { - source: Box::new(e) - }), - _ => Ok(self) - } + match Property::new_string_pair( + PropertyCode::UserProperty, + &attribute_name_to_header!(name)[..], + &value.to_string()[..], + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), }, _ => Err(Error::UnrecognizedAttributeName { - name: headers::SPEC_VERSION_HEADER.to_string() - }) + name: headers::SPEC_VERSION_HEADER.to_string(), + }), } } @@ -100,14 +110,15 @@ impl BinarySerializer for MessageRecord { impl StructuredSerializer for MessageRecord { fn set_structured_event(mut self, bytes: Vec) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, - headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER) { - Ok(property) => { - match self.headers.push(property) { - _ => () - } + match Property::new_string_pair( + PropertyCode::UserProperty, + headers::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER, + ) { + Ok(property) => match self.headers.push(property) { + _ => (), }, - _ => () + _ => (), } self.payload = Some(bytes); @@ -116,16 +127,11 @@ impl StructuredSerializer for MessageRecord { } pub trait MessageBuilderExt { - fn message_record( - self, - message_record: & MessageRecord, - ) -> MessageBuilder; + fn message_record(self, message_record: &MessageRecord) -> MessageBuilder; } impl MessageBuilderExt for MessageBuilder { - fn message_record(mut self, - message_record: & MessageRecord - ) -> MessageBuilder { + fn message_record(mut self, message_record: &MessageRecord) -> MessageBuilder { self = self.properties(message_record.headers.clone()); if let Some(s) = message_record.payload.as_ref() { @@ -134,4 +140,4 @@ impl MessageBuilderExt for MessageBuilder { self } -} \ No newline at end of file +}