Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 25, 2023
1 parent 08dde14 commit 002439e
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.12.0"
version = "0.12.1"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ mod tests {

let (disp, io) =
Dispatcher::new_debug(nio::Io::new(server), BytesCodec, Srv(counter.clone()));
io.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &mut BytesCodec).unwrap();
io.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec).unwrap();
ntex::rt::spawn(async move {
let _ = disp.await;
});
Expand Down
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl<Err, InitErr> DefaultProtocolServer<Err, InitErr> {
}
}

impl<Err, InitErr> ServiceFactory<IoBoxed> for DefaultProtocolServer<Err, InitErr> {
impl<Err, InitErr> ServiceFactory<(IoBoxed, Deadline)> for DefaultProtocolServer<Err, InitErr> {
type Response = ();
type Error = MqttError<Err>;
type Service = DefaultProtocolServer<Err, InitErr>;
Expand All @@ -514,12 +514,12 @@ impl<Err, InitErr> ServiceFactory<IoBoxed> for DefaultProtocolServer<Err, InitEr
}
}

impl<Err, InitErr> Service<IoBoxed> for DefaultProtocolServer<Err, InitErr> {
impl<Err, InitErr> Service<(IoBoxed, Deadline)> for DefaultProtocolServer<Err, InitErr> {
type Response = ();
type Error = MqttError<Err>;
type Future<'f> = Ready<Self::Response, Self::Error> where Self: 'f;

fn call<'a>(&'a self, _: IoBoxed, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
fn call<'a>(&'a self, _: (IoBoxed, Deadline), _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Ready::Err(MqttError::Handshake(HandshakeError::Disconnected(Some(io::Error::new(
io::ErrorKind::Other,
format!("Protocol is not supported: {:?}", self.ver),
Expand Down
3 changes: 1 addition & 2 deletions src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,7 @@ mod tests {
#[test_case("/finance" => Ok(vec![TopicFilterLevel::Blank, lvl_normal("finance")]) ; "12")]
#[test_case("finance/" => Ok(vec![lvl_normal("finance"), TopicFilterLevel::Blank]) ; "13")]
fn parsing(input: &str) -> Result<Vec<TopicFilterLevel>, TopicFilterError> {
TopicFilter::try_from(ByteString::from(input))
.map(|t| t.levels().iter().cloned().collect())
TopicFilter::try_from(ByteString::from(input)).map(|t| t.levels().to_vec())
}

#[test_case(vec![lvl_normal("sport"), lvl_normal("tennis"), lvl_normal("player1")] => true; "1")]
Expand Down
2 changes: 1 addition & 1 deletion src/v3/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod tests {
let mut v = BytesMut::with_capacity(1024);
encode(packet, &mut v, get_encoded_size(packet) as u32).unwrap();
assert_eq!(expected.len(), v.len());
assert_eq!(&expected[..], &v[..]);
assert_eq!(expected, &v[..]);
}

#[test]
Expand Down
8 changes: 2 additions & 6 deletions src/v5/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,8 @@ mod tests {
let (_len, consumed) = decode_variable_length(&bytes[1..]).unwrap().unwrap();
let cur = Bytes::copy_from_slice(&bytes[consumed + 1..]);
let mut tmp = BytesMut::with_capacity(4096);
ntex::codec::Encoder::encode(
&mut crate::v5::codec::Codec::new(),
res.clone(),
&mut tmp,
)
.unwrap();
ntex::codec::Encoder::encode(&crate::v5::codec::Codec::new(), res.clone(), &mut tmp)
.unwrap();
let decoded = decode_packet(cur, fixed);
let res = Ok(res);
if decoded != res {
Expand Down
2 changes: 1 addition & 1 deletion src/v5/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ mod tests {
let mut v = BytesMut::with_capacity(1024);
packet.encode(&mut v, packet.encoded_size(1024) as u32).unwrap();
assert_eq!(expected.len(), v.len());
assert_eq!(&expected[..], &v[..]);
assert_eq!(expected, &v[..]);
}

#[test]
Expand Down
7 changes: 2 additions & 5 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,8 @@ async fn test_max_qos() -> std::io::Result<()> {
let violated = violated.clone();
match msg {
ControlMessage::ProtocolError(err) => {
match err.get_ref() {
ProtocolError::ProtocolViolation(_) => {
violated.store(true, Relaxed);
}
_ => (),
if let ProtocolError::ProtocolViolation(_) = err.get_ref() {
violated.store(true, Relaxed);
}
Ready::Ok(err.ack())
}
Expand Down
7 changes: 2 additions & 5 deletions tests/test_server_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,8 @@ async fn test_max_qos() -> std::io::Result<()> {
let violated = violated.clone();
match msg {
ControlMessage::ProtocolError(msg) => {
match msg.get_ref() {
error::ProtocolError::ProtocolViolation(_) => {
violated.store(true, Relaxed);
}
_ => (),
if let error::ProtocolError::ProtocolViolation(_) = msg.get_ref() {
violated.store(true, Relaxed);
}
Ready::Ok::<_, TestError>(msg.ack())
}
Expand Down

0 comments on commit 002439e

Please sign in to comment.