Skip to content

Commit

Permalink
#140 Connection now defaults to MQTT version set in the client create…
Browse files Browse the repository at this point in the history
… options.
  • Loading branch information
fpagliughi committed Jan 25, 2022
1 parent 5c52162 commit 7c3aef7
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 41 deletions.
3 changes: 1 addition & 2 deletions examples/rpc_math_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ fn main() -> mqtt::Result<()> {
// but it's still a good habit to start consuming first.
let rx = cli.start_consuming();

// Connect with default options
// Connect with default options, and a clean start
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.mqtt_version(MQTTV5)
.clean_start(true)
.finalize();

Expand Down
5 changes: 2 additions & 3 deletions examples/rpc_math_srvr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//

/*******************************************************************************
* Copyright (c) 2019 Frank Pagliughi <[email protected]>
* Copyright (c) 2019-2022 Frank Pagliughi <[email protected]>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand Down Expand Up @@ -196,9 +196,8 @@ fn main() -> mqtt::Result<()> {
// but it's still a good habit to start consuming first.
let rx = cli.start_consuming();

// Connect with default options
// Connect with default options, and no clean start
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.mqtt_version(MQTTV5)
.clean_start(false)
.finalize();

Expand Down
57 changes: 43 additions & 14 deletions src/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub struct AsyncClient {
pub(crate) struct InnerAsyncClient {
// The handle to the Paho C client
handle: ffi::MQTTAsync,
// The version with which the client was created
// This is the default for connecting
mqtt_version: u32,
// The options for connecting to the broker
opts: Mutex<ConnectOptions>,
// The context to give to the C callbacks
Expand Down Expand Up @@ -145,6 +148,7 @@ impl AsyncClient {

let mut cli = InnerAsyncClient {
handle: ptr::null_mut(),
mqtt_version: opts.mqtt_version(),
opts: Mutex::new(ConnectOptions::new()),
callback_context: Mutex::new(CallbackContext::default()),
server_uri: CString::new(opts.server_uri)?,
Expand Down Expand Up @@ -328,8 +332,21 @@ impl AsyncClient {
}

/// Gets the MQTT version for which the client was created.
///
/// This is the default version used when connecting, unless a specific
/// value is set in the connect options. It is typically the highest
/// version that can be used by the client. Specifically, if the client
/// is created for v3, then it can not be used to connect with the v5
/// protocol.
pub fn mqtt_version(&self) -> u32 {
// TODO: It's getting this from the connect options, not the create options!
self.inner.mqtt_version
}

/// The current version of the protol being used, if the client is
/// connected.
pub fn current_mqtt_version(&self) -> u32 {
// TODO: This is the requested version. The connect response
// contains the actual, negotiated protocol version
self.inner.opts.lock().unwrap().copts.MQTTVersion as u32
}

Expand All @@ -349,21 +366,27 @@ impl AsyncClient {
///
/// # Arguments
///
/// * `opts` The connect options
/// * `opts` The connect options. This can be `None`, in which case the
/// default options are used.
///
pub fn connect<T>(&self, opt_opts: T) -> ConnectToken
pub fn connect<T>(&self, opts: T) -> ConnectToken
where
T: Into<Option<ConnectOptions>>,
{
let opts = opt_opts.into().unwrap_or_default();
debug!("Connecting handle: {:?}", self.inner.handle);
debug!("Connect options: {:?}", opts);

let mut opts = opts.into().unwrap_or_default();

if opts.mqtt_version() == 0 && self.inner.mqtt_version >= 5 {
opts.set_mqtt_version(self.inner.mqtt_version);
}

let tok = Token::from_request(ServerRequest::Connect);
opts.set_token(tok.clone());

debug!("Connect options: {:?}", opts);
let mut lkopts = self.inner.opts.lock().unwrap();
*lkopts = opts;
lkopts.set_token(tok.clone());

let rc = unsafe { ffi::MQTTAsync_connect(self.inner.handle, &lkopts.copts) };

Expand All @@ -380,7 +403,8 @@ impl AsyncClient {
/// # Arguments
///
/// * `opts` The connect options
///
/// * `success_cb` The callback for a successful connection.
/// * `failure_cb` The callback for a failed connection attempt.
pub fn connect_with_callbacks<FS, FF>(
&self,
mut opts: ConnectOptions,
Expand All @@ -392,22 +416,27 @@ impl AsyncClient {
FF: Fn(&AsyncClient, u16, i32) + 'static,
{
debug!("Connecting handle with callbacks: {:?}", self.inner.handle);

if opts.mqtt_version() == 0 && self.inner.mqtt_version >= 5 {
opts.set_mqtt_version(self.inner.mqtt_version);
}

let tok = Token::from_client(self, ServerRequest::Connect, success_cb, failure_cb);
opts.set_token(tok.clone());

debug!("Connect opts: {:?}", opts);
unsafe {
if !opts.copts.will.is_null() {
debug!("Will: {:?}", *(opts.copts.will));
}
}
let mut lkopts = self.inner.opts.lock().unwrap();
*lkopts = opts;

let tok = Token::from_client(self, ServerRequest::Connect, success_cb, failure_cb);
opts.set_token(tok.clone());

*self.inner.opts.lock().unwrap() = opts.clone();

let rc = unsafe { ffi::MQTTAsync_connect(self.inner.handle, &opts.copts) };
let rc = unsafe { ffi::MQTTAsync_connect(self.inner.handle, &lkopts.copts) };

if rc != 0 {
let _ = unsafe { Token::from_raw(opts.copts.context) };
let _ = unsafe { Token::from_raw(lkopts.copts.context) };
return ConnectToken::from_error(rc);
}

Expand Down
127 changes: 110 additions & 17 deletions src/connect_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//

/*******************************************************************************
* Copyright (c) 2017-2020 Frank Pagliughi <[email protected]>
* Copyright (c) 2017-2022 Frank Pagliughi <[email protected]>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand All @@ -21,6 +21,7 @@
*******************************************************************************/

//! Connect options for the Paho MQTT Rust client library.
//!
//! This contains the structures to define the options for connecting to the
//! MQTT broker/server.

Expand All @@ -42,8 +43,9 @@ use std::{ffi::CString, os::raw::c_int, pin::Pin, ptr, time::Duration};
// Connections

/// The collection of options for connecting to a broker.
/// This can be constructed using a
/// [ConnectOptionsBuilder](struct.ConnectOptionsBuilder.html).
///
/// This can be constructed using a [`ConnectOptionsBuilder`] to set all the
/// options.
#[derive(Debug)]
pub struct ConnectOptions {
/// The underlying C options structure.
Expand Down Expand Up @@ -75,8 +77,9 @@ impl ConnectOptions {
ConnectOptions::default()
}

// Fixes up the underlying C struct to point to our cached values.
// This should be called any time a cached object is modified.
// Creates a set of options from a C struct and cached values.
// Fixes up the underlying C struct to point to the cached values,
// then returns a new options object with them combined.
fn from_data(mut copts: ffi::MQTTAsync_connectOptions, data: ConnectOptionsData) -> Self {
let mut data = Box::pin(data);

Expand Down Expand Up @@ -148,36 +151,104 @@ impl ConnectOptions {
Self { copts, data }
}

// Sets the proper callbacks depending on the MQTT version and the
// presence of a context in the C opts.
fn fix_callbacks(&mut self) {
self.copts.onSuccess = None;
self.copts.onFailure = None;
self.copts.onSuccess5 = None;
self.copts.onFailure5 = None;

if !self.copts.context.is_null() {
if self.copts.MQTTVersion < ffi::MQTTVERSION_5 as i32 {
self.copts.onSuccess = Some(TokenInner::on_success);
self.copts.onFailure = Some(TokenInner::on_failure);
}
else {
self.copts.onSuccess5 = Some(TokenInner::on_success5);
self.copts.onFailure5 = Some(TokenInner::on_failure5);
}
}
}

/// Gets the MQTT protocol version that should be used for the
/// connection.
pub fn mqtt_version(&self) -> u32 {
self.copts.MQTTVersion as u32
}

/// Sets the MQTT protocol version that should be used for the
/// connection.
///
/// This also insures that the other options are compatible with the
/// selected version. For example, when setting for v5, it will make
/// sure the `cleansession` flag is cleared, since v5 uses cleanstart,
/// not cleansession.
pub fn set_mqtt_version(&mut self, ver: u32) {
self.copts.MQTTVersion = ver as i32;

if ver < ffi::MQTTVERSION_5 {
self.copts.cleanstart = 0;
}
else {
self.copts.cleansession = 0;
}
self.fix_callbacks();
}

/// Gets the "clean session" setting in the options.
///
/// This is only used in MQTT v3 connections.
pub fn clean_session(&self) -> bool {
from_c_bool(self.copts.cleansession)
}

/// This sets the "clean session" behavior for connecting to the server.
///
/// When set to true, this directs the server to throw away any state
/// related to the client, as determined by the client identifier.
/// When set to false, the server keeps the state information and
/// resumes the previous session.
///
/// This is only used in MQTT v3 connections.
pub fn set_clean_session(&mut self, clean: bool) {
self.copts.cleansession = to_c_bool(clean);
if clean {
self.copts.cleanstart = 0;
}
}

/// Gets the "clean start" setting in the options.
///
/// This is only used in MQTT v5 connections.
pub fn clean_start(&self) -> bool {
from_c_bool(self.copts.cleanstart)
}

/// This sets the "clean start" behavior for connecting to the server.
///
/// When set to true, this directs the server to throw away any state
/// related to the client, as determined by the client identifier.
/// When set to false, the server keeps the state information and
/// resumes the previous session.
///
/// This is only used in MQTT v5 connections.
pub fn set_clean_start(&mut self, clean: bool) {
self.copts.cleanstart = to_c_bool(clean);
if clean {
self.copts.cleansession = 0;
}
}

/// Sets the token to ber used for connect completion callbacks.
///
/// Note that we leak the token to give to the C lib. When we're
/// done with it, we must recover and drop it (i.e. in the completion
/// callback).
pub fn set_token(&mut self, tok: ConnectToken) {
let tok: Token = tok;

if self.copts.MQTTVersion < ffi::MQTTVERSION_5 as i32 {
self.copts.onSuccess = Some(TokenInner::on_success);
self.copts.onFailure = Some(TokenInner::on_failure);
}
else {
self.copts.onSuccess5 = Some(TokenInner::on_success5);
self.copts.onFailure5 = Some(TokenInner::on_failure5);
}
self.copts.context = tok.into_raw();
self.fix_callbacks();
}
}

Expand Down Expand Up @@ -231,27 +302,45 @@ impl ConnectOptionsBuilder {

/// Sets the 'clean session' flag to send to the broker.
///
/// This is for MQTT v3.x connections, only.
/// This is for MQTT v3.x connections only, and if set, will set the
/// other options to be compatible with v3.
///
/// # Arguments
///
/// `clean` Whether the broker should remove any previously-stored
/// information for this client.
pub fn clean_session(&mut self, clean: bool) -> &mut Self {
self.copts.cleansession = to_c_bool(clean);

// Force the options to those compatible with v3 if set
if clean {
self.copts.cleanstart = 0;
if self.copts.MQTTVersion >= ffi::MQTTVERSION_5 as i32 {
self.copts.MQTTVersion = 0;
}
}
self
}

/// Sets the 'clean start' flag to send to the broker.
///
/// This is for MQTT v5 connections only.
/// This is for MQTT v5 connections only, and if set, will set the
/// other options to be compatible with v5.
///
/// # Arguments
///
/// `clean` Whether the broker should remove any previously-stored
/// information for this client.
pub fn clean_start(&mut self, clean: bool) -> &mut Self {
self.copts.cleanstart = to_c_bool(clean);

// Force the options to those compatible with v5 if set
if clean {
self.copts.cleansession = 0;
if self.copts.MQTTVersion < ffi::MQTTVERSION_5 as i32 {
self.copts.MQTTVersion = ffi::MQTTVERSION_5 as i32;
}
}
self
}

Expand Down Expand Up @@ -372,10 +461,14 @@ impl ConnectOptionsBuilder {

/// Sets the version of MQTT to use on the connect.
///
/// Note that this value can not be greater than the version used to
/// create the client. Specifically, if the client was created for v3,
/// you can not try to connect with v5.
///
/// # Arguments
///
/// `ver` The version of MQTT to use when connecting to the broker.
/// * (0) try the latest version (3.1.1) and work backwards
/// * (0) try the latest version and work backwards
/// * (3) only try v3.1
/// * (4) only try v3.1.1
/// * (5) only try v5
Expand Down
Loading

0 comments on commit 7c3aef7

Please sign in to comment.