diff --git a/README.rst b/README.rst index e060cef2..4f092806 100644 --- a/README.rst +++ b/README.rst @@ -197,6 +197,12 @@ transport set to "websockets" to send MQTT over WebSockets. Leave at the default of "tcp" to use raw TCP. +manual_ack + defaults to False, allowing the library to acknowledge messages before + passing them to on_message callback. When set to True, every message + must be manually acknowledged by application call to + client.ack( *message.mid* ) + Constructor Example ................... diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 1c0236e4..5916e643 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -491,7 +491,8 @@ def on_connect(client, userdata, flags, rc): """ def __init__(self, client_id="", clean_session=None, userdata=None, - protocol=MQTTv311, transport="tcp", reconnect_on_failure=True): + protocol=MQTTv311, transport="tcp", reconnect_on_failure=True, + manual_ack=False ): """client_id is the unique client id string used when connecting to the broker. If client_id is zero length or None, then the behaviour is defined by which protocol version is in use. If using MQTT v3.1.1, then @@ -523,11 +524,20 @@ def __init__(self, client_id="", clean_session=None, userdata=None, Set transport to "websockets" to use WebSockets as the transport mechanism. Set to "tcp" to use raw TCP, which is the default. + + Normally, when a message is received, the library automatically + acknowledges immediately. manual_ack=True allows the application to + acknowledge receipt after it has completed processing of a message + using a the ack() method. This addresses vulnerabilty to message loss + if applications fails while processing a message, or while it pending + locally. + """ if transport.lower() not in ('websockets', 'tcp'): raise ValueError( 'transport must be "websockets" or "tcp", not %s' % transport) + self._manual_ack = manual_ack self._transport = transport.lower() self._protocol = protocol self._userdata = userdata @@ -3328,7 +3338,10 @@ def _handle_publish(self): return MQTT_ERR_SUCCESS elif message.qos == 1: self._handle_on_message(message) - return self._send_puback(message.mid) + if self._manual_ack: + return MQTT_ERR_SUCCESS + else: + return self._send_puback(message.mid) elif message.qos == 2: rc = self._send_pubrec(message.mid) message.state = mqtt_ms_wait_for_pubrel @@ -3338,6 +3351,24 @@ def _handle_publish(self): else: return MQTT_ERR_PROTOCOL + def ack( self, mid ): + """ + send an acknowledgement for a given message id. (stored in message.mid ) + only useful in QoS=1 and auto_ack=False + """ + if not self._manual_ack : + return MQTT_ERR_SUCCESS + return self._send_puback(mid) + + def manual_ack( self, on=False ): + """ + The paho library normally acknowledges messages as soon as they are delivered to the caller. + If manual_ack is turned on, then the caller MUST manually acknowledge every message once + application processing is complete. + """ + self._manual_ack = on + + def _handle_pubrel(self): if self._protocol == MQTTv5: if self._in_packet['remaining_length'] < 2: