From f609752496eb1be2bc6449107e5d798b2b25701b Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:26:24 -0700 Subject: [PATCH 01/13] fixed clippy warning caused by darling --- serde_amqp_derive/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serde_amqp_derive/src/lib.rs b/serde_amqp_derive/src/lib.rs index b87084dc..e7052892 100644 --- a/serde_amqp_derive/src/lib.rs +++ b/serde_amqp_derive/src/lib.rs @@ -132,7 +132,7 @@ enum EncodingType { #[derive(Debug, Clone, FromDeriveInput)] #[darling(attributes(amqp_contract))] -#[allow(dead_code)] +#[allow(dead_code, clippy::manual_unwrap_or_default)] struct DescribedAttr { #[darling(default)] pub name: Option, From 10bb90781eb8d53b696bbdfa6ff357663505efeb Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:31:02 -0700 Subject: [PATCH 02/13] updated darling and syn version --- fe2o3-amqp/Cargo.toml | 2 +- serde_amqp_derive/Cargo.toml | 4 ++-- serde_amqp_derive/src/util.rs | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/fe2o3-amqp/Cargo.toml b/fe2o3-amqp/Cargo.toml index 26ce8b71..13b98742 100644 --- a/fe2o3-amqp/Cargo.toml +++ b/fe2o3-amqp/Cargo.toml @@ -69,7 +69,7 @@ hmac = { version = "0.12", optional = true } pbkdf2 = { version = "0.12", default-features = false, optional = true } webpki-roots = { version = "0.26", optional = true } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "tls12", "ring"], optional = true } -librustls = { package = "rustls", version = "0.23", default-features = false, optional = true } +librustls = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "std", "tls12", "ring"], optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { workspace = true, features = ["sync", "io-util", "net", "rt", "macros", "time"] } diff --git a/serde_amqp_derive/Cargo.toml b/serde_amqp_derive/Cargo.toml index 103fd7ed..978052bf 100644 --- a/serde_amqp_derive/Cargo.toml +++ b/serde_amqp_derive/Cargo.toml @@ -21,7 +21,7 @@ serde = { workspace = true, features = ["derive"] } [dependencies] convert_case = "0.6.0" -darling = "0.14.1" +darling = "0.20" proc-macro2 = "1" quote = "1" -syn = { version = "1", features = ["parsing", "derive"] } +syn = { version = "2", features = ["parsing", "derive"] } diff --git a/serde_amqp_derive/src/util.rs b/serde_amqp_derive/src/util.rs index 1c2d006d..7bd39e7b 100644 --- a/serde_amqp_derive/src/util.rs +++ b/serde_amqp_derive/src/util.rs @@ -115,8 +115,7 @@ pub(crate) fn parse_named_field_attrs<'a>( fields .map(|f| { f.attrs.iter().find_map(|a| { - let item = a.parse_meta().unwrap(); - FieldAttr::from_meta(&item).ok() + FieldAttr::from_meta(&a.meta).ok() }) }) .map(|o| o.unwrap_or(FieldAttr { default: false })) @@ -126,7 +125,7 @@ pub(crate) fn parse_named_field_attrs<'a>( pub(crate) fn get_span_of(ident_str: &str, ctx: &DeriveInput) -> Option { ctx.attrs .iter() - .find_map(|attr| match attr.path.get_ident() { + .find_map(|attr| match attr.path().get_ident() { Some(i) => { if *i == ident_str { Some(i.span()) From b17c272809f2fbea932c90ebf10ba40fe820a7fe Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:33:22 -0700 Subject: [PATCH 03/13] cargo fmt --- serde_amqp_derive/src/lib.rs | 2 +- serde_amqp_derive/src/util.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/serde_amqp_derive/src/lib.rs b/serde_amqp_derive/src/lib.rs index e7052892..88ad7b27 100644 --- a/serde_amqp_derive/src/lib.rs +++ b/serde_amqp_derive/src/lib.rs @@ -132,7 +132,7 @@ enum EncodingType { #[derive(Debug, Clone, FromDeriveInput)] #[darling(attributes(amqp_contract))] -#[allow(dead_code, clippy::manual_unwrap_or_default)] +#[allow(dead_code, clippy::unwrap_or_default)] struct DescribedAttr { #[darling(default)] pub name: Option, diff --git a/serde_amqp_derive/src/util.rs b/serde_amqp_derive/src/util.rs index 7bd39e7b..4547e921 100644 --- a/serde_amqp_derive/src/util.rs +++ b/serde_amqp_derive/src/util.rs @@ -114,9 +114,9 @@ pub(crate) fn parse_named_field_attrs<'a>( ) -> Vec { fields .map(|f| { - f.attrs.iter().find_map(|a| { - FieldAttr::from_meta(&a.meta).ok() - }) + f.attrs + .iter() + .find_map(|a| FieldAttr::from_meta(&a.meta).ok()) }) .map(|o| o.unwrap_or(FieldAttr { default: false })) .collect() From eaee4f341630d52c46739ffdeaffe7de640a06aa Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:38:36 -0700 Subject: [PATCH 04/13] suppress clippy warning about code generated by darling --- serde_amqp_derive/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serde_amqp_derive/src/lib.rs b/serde_amqp_derive/src/lib.rs index 88ad7b27..b9cbb03b 100644 --- a/serde_amqp_derive/src/lib.rs +++ b/serde_amqp_derive/src/lib.rs @@ -132,7 +132,7 @@ enum EncodingType { #[derive(Debug, Clone, FromDeriveInput)] #[darling(attributes(amqp_contract))] -#[allow(dead_code, clippy::unwrap_or_default)] +#[allow(dead_code, clippy::all)] // clippy complains about code generated by darling struct DescribedAttr { #[darling(default)] pub name: Option, From 0e0c005d53416c14332bd6c7e9c7972f98b476a0 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:41:55 -0700 Subject: [PATCH 05/13] disable checks for rustls with wasm32 target --- fe2o3-amqp/Makefile.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fe2o3-amqp/Makefile.toml b/fe2o3-amqp/Makefile.toml index eece36da..0f3dad1b 100644 --- a/fe2o3-amqp/Makefile.toml +++ b/fe2o3-amqp/Makefile.toml @@ -27,12 +27,12 @@ dependencies = [ "check_feature_group5_wasm32", "check_feature_group6_wasm32", - # rustls has problem with wasm target - "check_feature_rustls_wasm32", - "check_feature_group1_wasm32", - "check_feature_group3_wasm32", - "check_feature_group7_wasm32", - "check_all_features_wasm32", + # # rustls has problem with wasm target + # "check_feature_rustls_wasm32", + # "check_feature_group1_wasm32", + # "check_feature_group3_wasm32", + # "check_feature_group7_wasm32", + # "check_all_features_wasm32", ] [tasks.check_all_features] From 4fea8a10fcbf1817f0a058c0685c64bf62aa3850 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:45:30 -0700 Subject: [PATCH 06/13] disable clippy::manual_unwrap_or_default for serde_amqp_derive at crate level --- serde_amqp_derive/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/serde_amqp_derive/src/lib.rs b/serde_amqp_derive/src/lib.rs index b9cbb03b..cdcc60c1 100644 --- a/serde_amqp_derive/src/lib.rs +++ b/serde_amqp_derive/src/lib.rs @@ -1,3 +1,5 @@ +#![allow(clippy::manual_unwrap_or_default)] // clippy complains about code generated by darling + //! Provides custom derive macros `SerializeComposite` and `DeserializeComposite` for described //! types as defined in the AMQP1.0 protocol. //! @@ -132,7 +134,7 @@ enum EncodingType { #[derive(Debug, Clone, FromDeriveInput)] #[darling(attributes(amqp_contract))] -#[allow(dead_code, clippy::all)] // clippy complains about code generated by darling +#[allow(dead_code)] struct DescribedAttr { #[darling(default)] pub name: Option, From f0839a4d52eb0a7002aedd9c232d108e584099d7 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:46:57 -0700 Subject: [PATCH 07/13] fix bug in doc example --- fe2o3-amqp-types/src/messaging/format/annotations.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe2o3-amqp-types/src/messaging/format/annotations.rs b/fe2o3-amqp-types/src/messaging/format/annotations.rs index 3a612b5b..7e113194 100644 --- a/fe2o3-amqp-types/src/messaging/format/annotations.rs +++ b/fe2o3-amqp-types/src/messaging/format/annotations.rs @@ -41,7 +41,7 @@ use serde_amqp::{ /// let mut annotations = Annotations::new(); /// annotations.insert(OwnedKey::from(key), val.clone()); /// -/// let removed = annotations.remove(&key as &dyn AnnotationKey); +/// let removed = annotations.swap_remove(&key as &dyn AnnotationKey); /// assert_eq!(removed, Some(val)); /// ``` pub type Annotations = OrderedMap; From aa3c8564820f6f2a2197f40d2e23ce7a45d7e8d7 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 11:59:47 -0700 Subject: [PATCH 08/13] disable activemq compat test temporarily --- fe2o3-amqp/tests/broker_connection_compat.rs | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/fe2o3-amqp/tests/broker_connection_compat.rs b/fe2o3-amqp/tests/broker_connection_compat.rs index cf94d1a1..05adeb46 100644 --- a/fe2o3-amqp/tests/broker_connection_compat.rs +++ b/fe2o3-amqp/tests/broker_connection_compat.rs @@ -14,21 +14,21 @@ cfg_not_wasm32! { mod common; - #[tokio::test] - async fn activemq_artemis_connection() { - let (_node, port) = common::setup_activemq_artemis(None, None).await; - let url = format!("amqp://localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - connection.close().await.unwrap(); - } + // #[tokio::test] + // async fn activemq_artemis_connection() { + // let (_node, port) = common::setup_activemq_artemis(None, None).await; + // let url = format!("amqp://localhost:{}", port); + // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + // connection.close().await.unwrap(); + // } - #[tokio::test] - async fn activemq_artemis_sasl_plain_connection() { - let (_node, port) = common::setup_activemq_artemis(Some("guest"), Some("guest")).await; - let url = format!("amqp://guest:guest@localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - connection.close().await.unwrap(); - } + // #[tokio::test] + // async fn activemq_artemis_sasl_plain_connection() { + // let (_node, port) = common::setup_activemq_artemis(Some("guest"), Some("guest")).await; + // let url = format!("amqp://guest:guest@localhost:{}", port); + // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + // connection.close().await.unwrap(); + // } #[tokio::test] async fn rabbitmq_amqp10_connection() { From 010b0a22b2515de50adbe0342b8fb412643f51ca Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 12:01:21 -0700 Subject: [PATCH 09/13] disable all compat test temporarily --- fe2o3-amqp/tests/broker_connection_compat.rs | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/fe2o3-amqp/tests/broker_connection_compat.rs b/fe2o3-amqp/tests/broker_connection_compat.rs index 05adeb46..ae61a360 100644 --- a/fe2o3-amqp/tests/broker_connection_compat.rs +++ b/fe2o3-amqp/tests/broker_connection_compat.rs @@ -30,19 +30,19 @@ cfg_not_wasm32! { // connection.close().await.unwrap(); // } - #[tokio::test] - async fn rabbitmq_amqp10_connection() { - let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; - let url = format!("amqp://localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - connection.close().await.unwrap(); - } + // #[tokio::test] + // async fn rabbitmq_amqp10_connection() { + // let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; + // let url = format!("amqp://localhost:{}", port); + // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + // connection.close().await.unwrap(); + // } - #[tokio::test] - async fn rabbitmq_amqp10_sasl_plain_connection() { - let (_node, port) = common::setup_rabbitmq_amqp10(Some("guest"), Some("guest")).await; - let url = format!("amqp://guest:guest@localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - connection.close().await.unwrap(); - } + // #[tokio::test] + // async fn rabbitmq_amqp10_sasl_plain_connection() { + // let (_node, port) = common::setup_rabbitmq_amqp10(Some("guest"), Some("guest")).await; + // let url = format!("amqp://guest:guest@localhost:{}", port); + // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + // connection.close().await.unwrap(); + // } } From 0cc9c65af4cdddef4b4d89a84dcbcf0af4704e11 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 12:24:33 -0700 Subject: [PATCH 10/13] update chrono, disable artemis and rabbitmq test temporarily --- fe2o3-amqp/tests/broker_connection_compat.rs | 56 ++--- fe2o3-amqp/tests/link.rs | 242 ++++++++++--------- serde_amqp/Cargo.toml | 2 +- serde_amqp/src/primitives/timestamp.rs | 16 +- 4 files changed, 160 insertions(+), 156 deletions(-) diff --git a/fe2o3-amqp/tests/broker_connection_compat.rs b/fe2o3-amqp/tests/broker_connection_compat.rs index ae61a360..cf94d1a1 100644 --- a/fe2o3-amqp/tests/broker_connection_compat.rs +++ b/fe2o3-amqp/tests/broker_connection_compat.rs @@ -14,35 +14,35 @@ cfg_not_wasm32! { mod common; - // #[tokio::test] - // async fn activemq_artemis_connection() { - // let (_node, port) = common::setup_activemq_artemis(None, None).await; - // let url = format!("amqp://localhost:{}", port); - // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - // connection.close().await.unwrap(); - // } + #[tokio::test] + async fn activemq_artemis_connection() { + let (_node, port) = common::setup_activemq_artemis(None, None).await; + let url = format!("amqp://localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + connection.close().await.unwrap(); + } - // #[tokio::test] - // async fn activemq_artemis_sasl_plain_connection() { - // let (_node, port) = common::setup_activemq_artemis(Some("guest"), Some("guest")).await; - // let url = format!("amqp://guest:guest@localhost:{}", port); - // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - // connection.close().await.unwrap(); - // } + #[tokio::test] + async fn activemq_artemis_sasl_plain_connection() { + let (_node, port) = common::setup_activemq_artemis(Some("guest"), Some("guest")).await; + let url = format!("amqp://guest:guest@localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + connection.close().await.unwrap(); + } - // #[tokio::test] - // async fn rabbitmq_amqp10_connection() { - // let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; - // let url = format!("amqp://localhost:{}", port); - // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - // connection.close().await.unwrap(); - // } + #[tokio::test] + async fn rabbitmq_amqp10_connection() { + let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; + let url = format!("amqp://localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + connection.close().await.unwrap(); + } - // #[tokio::test] - // async fn rabbitmq_amqp10_sasl_plain_connection() { - // let (_node, port) = common::setup_rabbitmq_amqp10(Some("guest"), Some("guest")).await; - // let url = format!("amqp://guest:guest@localhost:{}", port); - // let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - // connection.close().await.unwrap(); - // } + #[tokio::test] + async fn rabbitmq_amqp10_sasl_plain_connection() { + let (_node, port) = common::setup_rabbitmq_amqp10(Some("guest"), Some("guest")).await; + let url = format!("amqp://guest:guest@localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + connection.close().await.unwrap(); + } } diff --git a/fe2o3-amqp/tests/link.rs b/fe2o3-amqp/tests/link.rs index bcbef486..9afab112 100644 --- a/fe2o3-amqp/tests/link.rs +++ b/fe2o3-amqp/tests/link.rs @@ -1,5 +1,7 @@ //! Tests sending and receiving small messages with active mq artemis +// TODO: interop testing with other AMQP 1.0 brokers + macro_rules! cfg_not_wasm32 { ($($item:item)*) => { $( @@ -9,123 +11,123 @@ macro_rules! cfg_not_wasm32 { } } -cfg_not_wasm32! { - use fe2o3_amqp::{Connection, Receiver, Sender, Session}; - use fe2o3_amqp_types::messaging::Message; - - mod common; - - #[tokio::test] - async fn activemq_artemis_send_receive() { - let (_node, port) = common::setup_activemq_artemis(None, None).await; - - let url = format!("amqp://localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - let mut session = Session::begin(&mut connection).await.unwrap(); - let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") - .await - .unwrap(); - let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") - .await - .unwrap(); - - let message = Message::from("test-message"); - let outcome = sender.send(message).await.unwrap(); - outcome.accepted_or("Not accepted").unwrap(); - - let received = receiver.recv::().await.unwrap(); - receiver.accept(&received).await.unwrap(); - assert_eq!(received.body(), "test-message"); - - sender.close().await.unwrap(); - receiver.close().await.unwrap(); - session.close().await.unwrap(); - connection.close().await.unwrap(); - } - - #[tokio::test] - async fn activemq_artemis_send_receive_large_content() { - let (_node, port) = common::setup_activemq_artemis(None, None).await; - - let url = format!("amqp://localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - let mut session = Session::begin(&mut connection).await.unwrap(); - let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") - .await - .unwrap(); - let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") - .await - .unwrap(); - - let message = Message::from("test-message".repeat(100_000)); - let outcome = sender.send(message).await.unwrap(); - outcome.accepted_or("Not accepted").unwrap(); - - let received = receiver.recv::().await.unwrap(); - receiver.accept(&received).await.unwrap(); - assert_eq!(received.body(), &"test-message".repeat(100_000)); - - sender.close().await.unwrap(); - receiver.close().await.unwrap(); - session.close().await.unwrap(); - connection.close().await.unwrap(); - } - - #[tokio::test] - async fn rabbitmq_amqp10_send_receive() { - let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; - - let url = format!("amqp://localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - let mut session = Session::begin(&mut connection).await.unwrap(); - let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") - .await - .unwrap(); - let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") - .await - .unwrap(); - - let message = Message::from("test-message"); - let outcome = sender.send(message).await.unwrap(); - outcome.accepted_or("Not accepted").unwrap(); - - let received = receiver.recv::().await.unwrap(); - receiver.accept(&received).await.unwrap(); - assert_eq!(received.body(), "test-message"); - - // rabbitmq only supports non-closing detach - sender.detach().await.unwrap(); - receiver.detach().await.unwrap(); - session.close().await.unwrap(); - connection.close().await.unwrap(); - } - - #[tokio::test] - async fn rabbitmq_amqp10_send_receive_large_content() { - let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; - - let url = format!("amqp://localhost:{}", port); - let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); - let mut session = Session::begin(&mut connection).await.unwrap(); - let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") - .await - .unwrap(); - let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") - .await - .unwrap(); - - let message = Message::from("test-message".repeat(100_000)); - let outcome = sender.send(message).await.unwrap(); - outcome.accepted_or("Not accepted").unwrap(); - - let received = receiver.recv::().await.unwrap(); - receiver.accept(&received).await.unwrap(); - assert_eq!(received.body(), &"test-message".repeat(100_000)); - - // rabbitmq only supports non-closing detach - sender.detach().await.unwrap(); - receiver.detach().await.unwrap(); - session.close().await.unwrap(); - connection.close().await.unwrap(); - } -} +// cfg_not_wasm32! { +// use fe2o3_amqp::{Connection, Receiver, Sender, Session}; +// use fe2o3_amqp_types::messaging::Message; + +// mod common; + +// #[tokio::test] +// async fn activemq_artemis_send_receive() { +// let (_node, port) = common::setup_activemq_artemis(None, None).await; + +// let url = format!("amqp://localhost:{}", port); +// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); +// let mut session = Session::begin(&mut connection).await.unwrap(); +// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") +// .await +// .unwrap(); +// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") +// .await +// .unwrap(); + +// let message = Message::from("test-message"); +// let outcome = sender.send(message).await.unwrap(); +// outcome.accepted_or("Not accepted").unwrap(); + +// let received = receiver.recv::().await.unwrap(); +// receiver.accept(&received).await.unwrap(); +// assert_eq!(received.body(), "test-message"); + +// sender.close().await.unwrap(); +// receiver.close().await.unwrap(); +// session.close().await.unwrap(); +// connection.close().await.unwrap(); +// } + +// #[tokio::test] +// async fn activemq_artemis_send_receive_large_content() { +// let (_node, port) = common::setup_activemq_artemis(None, None).await; + +// let url = format!("amqp://localhost:{}", port); +// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); +// let mut session = Session::begin(&mut connection).await.unwrap(); +// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") +// .await +// .unwrap(); +// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") +// .await +// .unwrap(); + +// let message = Message::from("test-message".repeat(100_000)); +// let outcome = sender.send(message).await.unwrap(); +// outcome.accepted_or("Not accepted").unwrap(); + +// let received = receiver.recv::().await.unwrap(); +// receiver.accept(&received).await.unwrap(); +// assert_eq!(received.body(), &"test-message".repeat(100_000)); + +// sender.close().await.unwrap(); +// receiver.close().await.unwrap(); +// session.close().await.unwrap(); +// connection.close().await.unwrap(); +// } + +// #[tokio::test] +// async fn rabbitmq_amqp10_send_receive() { +// let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; + +// let url = format!("amqp://localhost:{}", port); +// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); +// let mut session = Session::begin(&mut connection).await.unwrap(); +// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") +// .await +// .unwrap(); +// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") +// .await +// .unwrap(); + +// let message = Message::from("test-message"); +// let outcome = sender.send(message).await.unwrap(); +// outcome.accepted_or("Not accepted").unwrap(); + +// let received = receiver.recv::().await.unwrap(); +// receiver.accept(&received).await.unwrap(); +// assert_eq!(received.body(), "test-message"); + +// // rabbitmq only supports non-closing detach +// sender.detach().await.unwrap(); +// receiver.detach().await.unwrap(); +// session.close().await.unwrap(); +// connection.close().await.unwrap(); +// } + +// #[tokio::test] +// async fn rabbitmq_amqp10_send_receive_large_content() { +// let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; + +// let url = format!("amqp://localhost:{}", port); +// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); +// let mut session = Session::begin(&mut connection).await.unwrap(); +// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") +// .await +// .unwrap(); +// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") +// .await +// .unwrap(); + +// let message = Message::from("test-message".repeat(100_000)); +// let outcome = sender.send(message).await.unwrap(); +// outcome.accepted_or("Not accepted").unwrap(); + +// let received = receiver.recv::().await.unwrap(); +// receiver.accept(&received).await.unwrap(); +// assert_eq!(received.body(), &"test-message".repeat(100_000)); + +// // rabbitmq only supports non-closing detach +// sender.detach().await.unwrap(); +// receiver.detach().await.unwrap(); +// session.close().await.unwrap(); +// connection.close().await.unwrap(); +// } +// } diff --git a/serde_amqp/Cargo.toml b/serde_amqp/Cargo.toml index dbc038ae..4a051e46 100644 --- a/serde_amqp/Cargo.toml +++ b/serde_amqp/Cargo.toml @@ -49,7 +49,7 @@ serde_amqp_derive = { workspace = true, optional = true } # Optinal dependencies serde_json = { version = "1", optional = true } -chrono = { version = "0.4", optional = true } +chrono = { version = "0.4.30", optional = true } uuid = { workspace = true, optional = true } time = { version = "0.3", optional = true } diff --git a/serde_amqp/src/primitives/timestamp.rs b/serde_amqp/src/primitives/timestamp.rs index c7df19db..66a49477 100644 --- a/serde_amqp/src/primitives/timestamp.rs +++ b/serde_amqp/src/primitives/timestamp.rs @@ -148,10 +148,10 @@ impl TryFrom for chrono::DateTime { /// removed in favour of the one provided with the "chrono-preview" feature in the next major /// version. fn try_from(value: Timestamp) -> Result { - let native_time = - chrono::NaiveDateTime::from_timestamp_millis(value.milliseconds()).ok_or(value)?; - Ok(chrono::DateTime::::from_utc( - native_time, + let datetime = + chrono::DateTime::from_timestamp_millis(value.milliseconds()).ok_or(value)?; + Ok(chrono::DateTime::::from_naive_utc_and_offset( + datetime.naive_utc(), chrono::Utc, )) } @@ -171,9 +171,11 @@ impl From for Option> { /// removed in favour of the one provided with the "chrono-preview" feature in the next major /// version. fn from(value: Timestamp) -> Self { - let native_time = chrono::NaiveDateTime::from_timestamp_millis(value.milliseconds())?; - Some(chrono::DateTime::::from_utc( - native_time, + let datetime = chrono::DateTime::from_timestamp_millis(value.milliseconds()) + .ok_or(value) + .ok()?; + Some(chrono::DateTime::::from_naive_utc_and_offset( + datetime.naive_utc(), chrono::Utc, )) } From 41e7e161d555f0077d412ccf260c975d23eb3351 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 12:34:09 -0700 Subject: [PATCH 11/13] force sequential exec of compat testing --- fe2o3-amqp/tests/broker_connection_compat.rs | 10 +- fe2o3-amqp/tests/link.rs | 244 ++++++++++--------- 2 files changed, 131 insertions(+), 123 deletions(-) diff --git a/fe2o3-amqp/tests/broker_connection_compat.rs b/fe2o3-amqp/tests/broker_connection_compat.rs index cf94d1a1..a3224392 100644 --- a/fe2o3-amqp/tests/broker_connection_compat.rs +++ b/fe2o3-amqp/tests/broker_connection_compat.rs @@ -15,6 +15,13 @@ cfg_not_wasm32! { mod common; #[tokio::test] + async fn test_connection_compat() { + activemq_artemis_connection().await; + activemq_artemis_sasl_plain_connection().await; + rabbitmq_amqp10_connection().await; + rabbitmq_amqp10_sasl_plain_connection().await; + } + async fn activemq_artemis_connection() { let (_node, port) = common::setup_activemq_artemis(None, None).await; let url = format!("amqp://localhost:{}", port); @@ -22,7 +29,6 @@ cfg_not_wasm32! { connection.close().await.unwrap(); } - #[tokio::test] async fn activemq_artemis_sasl_plain_connection() { let (_node, port) = common::setup_activemq_artemis(Some("guest"), Some("guest")).await; let url = format!("amqp://guest:guest@localhost:{}", port); @@ -30,7 +36,6 @@ cfg_not_wasm32! { connection.close().await.unwrap(); } - #[tokio::test] async fn rabbitmq_amqp10_connection() { let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; let url = format!("amqp://localhost:{}", port); @@ -38,7 +43,6 @@ cfg_not_wasm32! { connection.close().await.unwrap(); } - #[tokio::test] async fn rabbitmq_amqp10_sasl_plain_connection() { let (_node, port) = common::setup_rabbitmq_amqp10(Some("guest"), Some("guest")).await; let url = format!("amqp://guest:guest@localhost:{}", port); diff --git a/fe2o3-amqp/tests/link.rs b/fe2o3-amqp/tests/link.rs index 9afab112..d917d07f 100644 --- a/fe2o3-amqp/tests/link.rs +++ b/fe2o3-amqp/tests/link.rs @@ -11,123 +11,127 @@ macro_rules! cfg_not_wasm32 { } } -// cfg_not_wasm32! { -// use fe2o3_amqp::{Connection, Receiver, Sender, Session}; -// use fe2o3_amqp_types::messaging::Message; - -// mod common; - -// #[tokio::test] -// async fn activemq_artemis_send_receive() { -// let (_node, port) = common::setup_activemq_artemis(None, None).await; - -// let url = format!("amqp://localhost:{}", port); -// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); -// let mut session = Session::begin(&mut connection).await.unwrap(); -// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") -// .await -// .unwrap(); -// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") -// .await -// .unwrap(); - -// let message = Message::from("test-message"); -// let outcome = sender.send(message).await.unwrap(); -// outcome.accepted_or("Not accepted").unwrap(); - -// let received = receiver.recv::().await.unwrap(); -// receiver.accept(&received).await.unwrap(); -// assert_eq!(received.body(), "test-message"); - -// sender.close().await.unwrap(); -// receiver.close().await.unwrap(); -// session.close().await.unwrap(); -// connection.close().await.unwrap(); -// } - -// #[tokio::test] -// async fn activemq_artemis_send_receive_large_content() { -// let (_node, port) = common::setup_activemq_artemis(None, None).await; - -// let url = format!("amqp://localhost:{}", port); -// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); -// let mut session = Session::begin(&mut connection).await.unwrap(); -// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") -// .await -// .unwrap(); -// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") -// .await -// .unwrap(); - -// let message = Message::from("test-message".repeat(100_000)); -// let outcome = sender.send(message).await.unwrap(); -// outcome.accepted_or("Not accepted").unwrap(); - -// let received = receiver.recv::().await.unwrap(); -// receiver.accept(&received).await.unwrap(); -// assert_eq!(received.body(), &"test-message".repeat(100_000)); - -// sender.close().await.unwrap(); -// receiver.close().await.unwrap(); -// session.close().await.unwrap(); -// connection.close().await.unwrap(); -// } - -// #[tokio::test] -// async fn rabbitmq_amqp10_send_receive() { -// let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; - -// let url = format!("amqp://localhost:{}", port); -// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); -// let mut session = Session::begin(&mut connection).await.unwrap(); -// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") -// .await -// .unwrap(); -// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") -// .await -// .unwrap(); - -// let message = Message::from("test-message"); -// let outcome = sender.send(message).await.unwrap(); -// outcome.accepted_or("Not accepted").unwrap(); - -// let received = receiver.recv::().await.unwrap(); -// receiver.accept(&received).await.unwrap(); -// assert_eq!(received.body(), "test-message"); - -// // rabbitmq only supports non-closing detach -// sender.detach().await.unwrap(); -// receiver.detach().await.unwrap(); -// session.close().await.unwrap(); -// connection.close().await.unwrap(); -// } - -// #[tokio::test] -// async fn rabbitmq_amqp10_send_receive_large_content() { -// let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; - -// let url = format!("amqp://localhost:{}", port); -// let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); -// let mut session = Session::begin(&mut connection).await.unwrap(); -// let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") -// .await -// .unwrap(); -// let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") -// .await -// .unwrap(); - -// let message = Message::from("test-message".repeat(100_000)); -// let outcome = sender.send(message).await.unwrap(); -// outcome.accepted_or("Not accepted").unwrap(); - -// let received = receiver.recv::().await.unwrap(); -// receiver.accept(&received).await.unwrap(); -// assert_eq!(received.body(), &"test-message".repeat(100_000)); - -// // rabbitmq only supports non-closing detach -// sender.detach().await.unwrap(); -// receiver.detach().await.unwrap(); -// session.close().await.unwrap(); -// connection.close().await.unwrap(); -// } -// } +cfg_not_wasm32! { + use fe2o3_amqp::{Connection, Receiver, Sender, Session}; + use fe2o3_amqp_types::messaging::Message; + + mod common; + + #[tokio::test] + async fn test_send_receive_compat() { + activemq_artemis_send_receive().await; + activemq_artemis_send_receive_large_content().await; + rabbitmq_amqp10_send_receive().await; + rabbitmq_amqp10_send_receive_large_content().await; + } + + async fn activemq_artemis_send_receive() { + let (_node, port) = common::setup_activemq_artemis(None, None).await; + + let url = format!("amqp://localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + let mut session = Session::begin(&mut connection).await.unwrap(); + let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") + .await + .unwrap(); + let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") + .await + .unwrap(); + + let message = Message::from("test-message"); + let outcome = sender.send(message).await.unwrap(); + outcome.accepted_or("Not accepted").unwrap(); + + let received = receiver.recv::().await.unwrap(); + receiver.accept(&received).await.unwrap(); + assert_eq!(received.body(), "test-message"); + + sender.close().await.unwrap(); + receiver.close().await.unwrap(); + session.close().await.unwrap(); + connection.close().await.unwrap(); + } + + async fn activemq_artemis_send_receive_large_content() { + let (_node, port) = common::setup_activemq_artemis(None, None).await; + + let url = format!("amqp://localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + let mut session = Session::begin(&mut connection).await.unwrap(); + let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") + .await + .unwrap(); + let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") + .await + .unwrap(); + + let message = Message::from("test-message".repeat(100_000)); + let outcome = sender.send(message).await.unwrap(); + outcome.accepted_or("Not accepted").unwrap(); + + let received = receiver.recv::().await.unwrap(); + receiver.accept(&received).await.unwrap(); + assert_eq!(received.body(), &"test-message".repeat(100_000)); + + sender.close().await.unwrap(); + receiver.close().await.unwrap(); + session.close().await.unwrap(); + connection.close().await.unwrap(); + } + + async fn rabbitmq_amqp10_send_receive() { + let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; + + let url = format!("amqp://localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + let mut session = Session::begin(&mut connection).await.unwrap(); + let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") + .await + .unwrap(); + let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") + .await + .unwrap(); + + let message = Message::from("test-message"); + let outcome = sender.send(message).await.unwrap(); + outcome.accepted_or("Not accepted").unwrap(); + + let received = receiver.recv::().await.unwrap(); + receiver.accept(&received).await.unwrap(); + assert_eq!(received.body(), "test-message"); + + // rabbitmq only supports non-closing detach + sender.detach().await.unwrap(); + receiver.detach().await.unwrap(); + session.close().await.unwrap(); + connection.close().await.unwrap(); + } + + async fn rabbitmq_amqp10_send_receive_large_content() { + let (_node, port) = common::setup_rabbitmq_amqp10(None, None).await; + + let url = format!("amqp://localhost:{}", port); + let mut connection = Connection::open("test-connection", &url[..]).await.unwrap(); + let mut session = Session::begin(&mut connection).await.unwrap(); + let mut sender = Sender::attach(&mut session, "test-sender", "test-queue") + .await + .unwrap(); + let mut receiver = Receiver::attach(&mut session, "test-receiver", "test-queue") + .await + .unwrap(); + + let message = Message::from("test-message".repeat(100_000)); + let outcome = sender.send(message).await.unwrap(); + outcome.accepted_or("Not accepted").unwrap(); + + let received = receiver.recv::().await.unwrap(); + receiver.accept(&received).await.unwrap(); + assert_eq!(received.body(), &"test-message".repeat(100_000)); + + // rabbitmq only supports non-closing detach + sender.detach().await.unwrap(); + receiver.detach().await.unwrap(); + session.close().await.unwrap(); + connection.close().await.unwrap(); + } +} From 03726d09e5ac7594447b8ee20cd7d8671c2ff990 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 12:42:15 -0700 Subject: [PATCH 12/13] fix problem w/ doc format --- .../src/operations/entity/create.rs | 18 +++++++++--------- .../src/operations/node/mod.rs | 16 ++++++++-------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/fe2o3-amqp-management/src/operations/entity/create.rs b/fe2o3-amqp-management/src/operations/entity/create.rs index 42767fe4..e561b292 100644 --- a/fe2o3-amqp-management/src/operations/entity/create.rs +++ b/fe2o3-amqp-management/src/operations/entity/create.rs @@ -43,25 +43,25 @@ pub struct CreateRequest<'a> { /// attribute value is not string, conversion into the correct type MUST be performed according /// to the following rules: /// - /// • A string that consists solely of characters from the ASCII character-set, will be + /// - A string that consists solely of characters from the ASCII character-set, will be /// converted into a symbol if so required. /// - /// • A string that can be parsed as a number according to [RFC7159] will be converted to a + /// - A string that can be parsed as a number according to [RFC7159] will be converted to a /// ubyte, ushort, uint, ulong, byte, short, int, or long if so required and the number lies /// within the domain of the given AMQP type and represents an integral number /// - /// • A string which can be parsed as a number according to [RFC7159] will be converted to an + /// - A string which can be parsed as a number according to [RFC7159] will be converted to an /// float, double, decimal32, decimal64 or decimal128 if so required and the number lies within /// the domain of the given AMQP type. /// - /// • A string which can be parsed as true or false according to [RFC7159] will be converted to + /// - A string which can be parsed as true or false according to [RFC7159] will be converted to /// a boolean value if so required. /// - /// • A string which can be parsed as an array according to [RFC7159] will be converted into a + /// - A string which can be parsed as an array according to [RFC7159] will be converted into a /// list (with the values type-converted into elements as necessary according to the same rules) /// if so required. /// - /// • A string which can be parsed as an object according to [RFC7159] will be converted into a + /// - A string which can be parsed as an object according to [RFC7159] will be converted into a /// map (with the values type-converted into map values as necessary according to the same /// rules) if so required. pub body: OrderedMap, @@ -108,9 +108,9 @@ impl<'a> Request for CreateRequest<'a> { /// message MUST consist of an amqp-value section that contains a map containing the actual /// attributes of the entity created. These MAY differ from those requested in two ways: /// -/// • Default values may be returned for values not specified -/// • Specific/concrete values may be returned for generic/base values specified -/// • The value associated with an attribute may have been converted into the correct amqp type +/// - Default values may be returned for values not specified +/// - Specific/concrete values may be returned for generic/base values specified +/// - The value associated with an attribute may have been converted into the correct amqp type /// /// (e.g. the string “2” into the integer value 2) A map containing attributes that are not /// applicable for the entity being created, or invalid values for a given attribute, MUST result in diff --git a/fe2o3-amqp-management/src/operations/node/mod.rs b/fe2o3-amqp-management/src/operations/node/mod.rs index 9635d538..fc8219b6 100644 --- a/fe2o3-amqp-management/src/operations/node/mod.rs +++ b/fe2o3-amqp-management/src/operations/node/mod.rs @@ -1,15 +1,15 @@ //! A Management Node Operation is an operation directed to the Management Node itself rather than an entity it is managing. //! Of the standard application-properties (see Section 3.1), name MUST be provided with a value of “self”, type MUST be provided with a value of “org.amqp.management” and identity MUST NOT be provided. //! The following Management Node Operations SHOULD be supported: -//! • QUERY -//! • GET-TYPES -//! • GET-ANNOTATIONS -//! • GET-ATTRIBUTES -//! • GET-OPERATIONS -//! • GET-MGMT-NODES +//! - QUERY +//! - GET-TYPES +//! - GET-ANNOTATIONS +//! - GET-ATTRIBUTES +//! - GET-OPERATIONS +//! - GET-MGMT-NODES //! The following Management Node Operations MAY be supported: -//! • REGISTER -//! • DEREGISTER +//! - REGISTER +//! - DEREGISTER mod deregister; mod get; From 7262f118a81de8e1afe7899599e6ab9c923517c5 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 13 Jun 2024 12:54:57 -0700 Subject: [PATCH 13/13] fixed bug in doc example --- fe2o3-amqp-ws/Cargo.toml | 6 +++++- fe2o3-amqp-ws/src/lib.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/fe2o3-amqp-ws/Cargo.toml b/fe2o3-amqp-ws/Cargo.toml index a09d0b64..cc968293 100644 --- a/fe2o3-amqp-ws/Cargo.toml +++ b/fe2o3-amqp-ws/Cargo.toml @@ -48,5 +48,9 @@ web-sys = { version = "0.3", features = [ "BinaryType", ] } +[dev-dependencies] +fe2o3-amqp = { workspace = true } + [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tokio = { workspace = true, features = ["net"] } +tokio = { workspace = true, features = ["net", "rt-multi-thread"] } + diff --git a/fe2o3-amqp-ws/src/lib.rs b/fe2o3-amqp-ws/src/lib.rs index 55a8238d..41457ca3 100644 --- a/fe2o3-amqp-ws/src/lib.rs +++ b/fe2o3-amqp-ws/src/lib.rs @@ -129,7 +129,7 @@ pin_project! { /// /// #[tokio::main] /// async fn main() { - /// let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673") + /// let ws_stream = WebSocketStream::connect("ws://localhost:5673") /// .await /// .unwrap(); /// let mut connection = Connection::builder()