Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change handshake timeout behavior #150

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changes

## [0.12.1] - 2023-09-25

* Change handshake timeout behavior (renamed to connect timeout).
Timeout handles slow client's Control frame.

## [0.12.0] - 2023-09-18

* Refactor MqttError type
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.12.0"
version = "0.12.1"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ mod tests {

let (disp, io) =
Dispatcher::new_debug(nio::Io::new(server), BytesCodec, Srv(counter.clone()));
io.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &mut BytesCodec).unwrap();
io.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &BytesCodec).unwrap();
ntex::rt::spawn(async move {
let _ = disp.await;
});
Expand Down
37 changes: 24 additions & 13 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
pub struct MqttServer<V3, V5, Err, InitErr> {
v3: V3,
v5: V5,
handshake_timeout: Millis,
connect_timeout: Millis,
_t: marker::PhantomData<(Err, InitErr)>,
}

Expand All @@ -30,7 +30,7 @@
MqttServer {
v3: DefaultProtocolServer::new(ProtocolVersion::MQTT3),
v5: DefaultProtocolServer::new(ProtocolVersion::MQTT5),
handshake_timeout: Millis(10000),
connect_timeout: Millis(10000),
_t: marker::PhantomData,
}
}
Expand All @@ -50,12 +50,23 @@
}

impl<V3, V5, Err, InitErr> MqttServer<V3, V5, Err, InitErr> {
/// Set handshake timeout.
/// Set client timeout for first `Connect` frame.
///
/// Defines a timeout for reading `Connect` frame. If a client does not transmit
/// the entire frame within this time, the connection is terminated with
/// Mqtt::Handshake(HandshakeError::Timeout) error.
///
/// Handshake includes `connect` packet.
/// By default handshake timeuot is 10 seconds.
/// By default, connect timeuot is 10 seconds.
pub fn conenct_timeout(mut self, timeout: Seconds) -> Self {
self.connect_timeout = timeout.into();
self
}

Check warning on line 63 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L60-L63

Added lines #L60 - L63 were not covered by tests

#[deprecated(since = "0.12.1")]
#[doc(hidden)]
/// Set handshake timeout.
pub fn handshake_timeout(mut self, timeout: Seconds) -> Self {
self.handshake_timeout = timeout.into();
self.connect_timeout = timeout.into();

Check warning on line 69 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L69

Added line #L69 was not covered by tests
self
}
}
Expand Down Expand Up @@ -113,7 +124,7 @@
MqttServer {
v3: service.finish(),
v5: self.v5,
handshake_timeout: self.handshake_timeout,
connect_timeout: self.connect_timeout,
_t: marker::PhantomData,
}
}
Expand All @@ -140,7 +151,7 @@
MqttServer {
v3: service,
v5: self.v5,
handshake_timeout: self.handshake_timeout,
connect_timeout: self.connect_timeout,

Check warning on line 154 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L154

Added line #L154 was not covered by tests
_t: marker::PhantomData,
}
}
Expand Down Expand Up @@ -185,7 +196,7 @@
MqttServer {
v3: self.v3,
v5: service.finish(),
handshake_timeout: self.handshake_timeout,
connect_timeout: self.connect_timeout,
_t: marker::PhantomData,
}
}
Expand All @@ -212,7 +223,7 @@
MqttServer {
v3: self.v3,
v5: service,
handshake_timeout: self.handshake_timeout,
connect_timeout: self.connect_timeout,

Check warning on line 226 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L226

Added line #L226 was not covered by tests
_t: marker::PhantomData,
}
}
Expand Down Expand Up @@ -241,7 +252,7 @@
let v5 = v5?;
Ok(MqttServerImpl {
handlers: (v3, v5),
handshake_timeout: self.handshake_timeout,
connect_timeout: self.connect_timeout,
_t: marker::PhantomData,
})
}
Expand Down Expand Up @@ -311,7 +322,7 @@
/// Mqtt Server
pub struct MqttServerImpl<V3, V5, Err> {
handlers: (V3, V5),
handshake_timeout: Millis,
connect_timeout: Millis,
_t: marker::PhantomData<Err>,
}

Expand Down Expand Up @@ -353,7 +364,7 @@
MqttServerImplResponse {
ctx,
state: MqttServerImplState::Version {
item: Some((req, VersionCodec, Deadline::new(self.handshake_timeout))),
item: Some((req, VersionCodec, Deadline::new(self.connect_timeout))),
},
handlers: &self.handlers,
}
Expand Down
3 changes: 1 addition & 2 deletions src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,7 @@ mod tests {
#[test_case("/finance" => Ok(vec![TopicFilterLevel::Blank, lvl_normal("finance")]) ; "12")]
#[test_case("finance/" => Ok(vec![lvl_normal("finance"), TopicFilterLevel::Blank]) ; "13")]
fn parsing(input: &str) -> Result<Vec<TopicFilterLevel>, TopicFilterError> {
TopicFilter::try_from(ByteString::from(input))
.map(|t| t.levels().iter().cloned().collect())
TopicFilter::try_from(ByteString::from(input)).map(|t| t.levels().to_vec())
}

#[test_case(vec![lvl_normal("sport"), lvl_normal("tennis"), lvl_normal("player1")] => true; "1")]
Expand Down
2 changes: 1 addition & 1 deletion src/v3/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ mod tests {
let mut v = BytesMut::with_capacity(1024);
encode(packet, &mut v, get_encoded_size(packet) as u32).unwrap();
assert_eq!(expected.len(), v.len());
assert_eq!(&expected[..], &v[..]);
assert_eq!(expected, &v[..]);
}

#[test]
Expand Down
94 changes: 28 additions & 66 deletions src/v3/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
use super::shared::{MqttShared, MqttSinkPool};
use super::{codec as mqtt, MqttServer, Publish, Session};

pub(crate) type SelectItem = (Handshake, Deadline);

type ServerFactory<Err, InitErr> =
boxed::BoxServiceFactory<(), SelectItem, Either<SelectItem, ()>, MqttError<Err>, InitErr>;
boxed::BoxServiceFactory<(), Handshake, Either<Handshake, ()>, MqttError<Err>, InitErr>;

type Server<Err> = boxed::BoxService<SelectItem, Either<SelectItem, ()>, MqttError<Err>>;
type Server<Err> = boxed::BoxService<Handshake, Either<Handshake, ()>, MqttError<Err>>;

/// Mqtt server selector
///
Expand All @@ -26,7 +24,7 @@
pub struct Selector<Err, InitErr> {
servers: Vec<ServerFactory<Err, InitErr>>,
max_size: u32,
handshake_timeout: Millis,
connect_timeout: Millis,
pool: Rc<MqttSinkPool>,
_t: marker::PhantomData<(Err, InitErr)>,
}
Expand All @@ -37,7 +35,7 @@
Selector {
servers: Vec::new(),
max_size: 0,
handshake_timeout: Millis(10000),
connect_timeout: Millis(10000),

Check warning on line 38 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L38

Added line #L38 was not covered by tests
pool: Default::default(),
_t: marker::PhantomData,
}
Expand All @@ -49,12 +47,23 @@
Err: 'static,
InitErr: 'static,
{
/// Set handshake timeout.
/// Set client timeout for first `Connect` frame.
///
/// Handshake includes `connect` packet and response `connect-ack`.
/// By default handshake timeuot is 10 seconds.
/// Defines a timeout for reading `Connect` frame. If a client does not transmit
/// the entire frame within this time, the connection is terminated with
/// Mqtt::Handshake(HandshakeError::Timeout) error.
///
/// By default, connect timeuot is 10 seconds.
pub fn conenct_timeout(mut self, timeout: Seconds) -> Self {
self.connect_timeout = timeout.into();
self
}

Check warning on line 60 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L57-L60

Added lines #L57 - L60 were not covered by tests

#[deprecated(since = "0.12.1")]
#[doc(hidden)]
/// Set handshake timeout.
pub fn handshake_timeout(mut self, timeout: Seconds) -> Self {
self.handshake_timeout = timeout.into();
self.connect_timeout = timeout.into();

Check warning on line 66 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L66

Added line #L66 was not covered by tests
self
}

Expand Down Expand Up @@ -111,7 +120,7 @@
Ok(SelectorService {
servers,
max_size: self.max_size,
handshake_timeout: self.handshake_timeout,
connect_timeout: self.connect_timeout,

Check warning on line 123 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L123

Added line #L123 was not covered by tests
pool: self.pool.clone(),
})
}
Expand Down Expand Up @@ -169,7 +178,7 @@
pub struct SelectorService<Err> {
servers: Vec<Server<Err>>,
max_size: u32,
handshake_timeout: Millis,
connect_timeout: Millis,
pool: Rc<MqttSinkPool>,
}

Expand Down Expand Up @@ -234,58 +243,11 @@

#[inline]
fn call<'a>(&'a self, io: IoBoxed, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> {
Box::pin(async move {
let codec = mqtt::Codec::default();
codec.set_max_size(self.max_size);
let shared = Rc::new(MqttShared::new(io.clone(), codec, false, self.pool.clone()));
let mut timeout = Deadline::new(self.handshake_timeout);

// read first packet
let result = select(&mut timeout, async {
io.recv(&shared.codec)
.await
.map_err(|err| {
log::trace!("Error is received during mqtt handshake: {:?}", err);
MqttError::Handshake(HandshakeError::from(err))
})?
.ok_or_else(|| {
log::trace!("Server mqtt is disconnected during handshake");
MqttError::Handshake(HandshakeError::Disconnected(None))
})
})
.await;

let (packet, size) = match result {
Either::Left(_) => Err(MqttError::Handshake(HandshakeError::Timeout)),
Either::Right(item) => item,
}?;

let connect = match packet {
mqtt::Packet::Connect(connect) => connect,
packet => {
log::info!("MQTT-3.1.0-1: Expected CONNECT packet, received {:?}", packet);
return Err(MqttError::Handshake(HandshakeError::Protocol(
ProtocolError::unexpected_packet(
packet.packet_type(),
"Expected CONNECT packet [MQTT-3.1.0-1]",
),
)));
}
};

// call servers
let mut item = (Handshake::new(connect, size, io, shared), timeout);
for srv in &self.servers {
match ctx.call(srv, item).await? {
Either::Left(result) => {
item = result;
}
Either::Right(_) => return Ok(()),
}
}
log::error!("Cannot handle CONNECT packet {:?}", item.0);
Err(MqttError::Handshake(HandshakeError::Server("Cannot handle CONNECT packet")))
})
Service::<(IoBoxed, Deadline)>::call(
self,
(io, Deadline::new(self.connect_timeout)),
ctx,
)

Check warning on line 250 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L246-L250

Added lines #L246 - L250 were not covered by tests
}
}

Expand Down Expand Up @@ -353,7 +315,7 @@
};

// call servers
let mut item = (Handshake::new(connect, size, io, shared), timeout);
let mut item = Handshake::new(connect, size, io, shared);

Check warning on line 318 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L318

Added line #L318 was not covered by tests
for srv in &self.servers {
match ctx.call(srv, item).await? {
Either::Left(result) => {
Expand All @@ -362,7 +324,7 @@
Either::Right(_) => return Ok(()),
}
}
log::error!("Cannot handle CONNECT packet {:?}", item.0.packet());
log::error!("Cannot handle CONNECT packet {:?}", item.packet());

Check warning on line 327 in src/v3/selector.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/selector.rs#L327

Added line #L327 was not covered by tests
Err(MqttError::Handshake(HandshakeError::Server("Cannot handle CONNECT packet")))
})
}
Expand Down
Loading
Loading