Skip to content

Commit

Permalink
re-create initial patch with manual_ack flag to address eclipse-paho#348
Browse files Browse the repository at this point in the history


second try of eclipse-paho#554
  • Loading branch information
petersilva committed Oct 3, 2023
1 parent 9782ab8 commit 5afebe8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ transport
set to "websockets" to send MQTT over WebSockets. Leave at the default of
"tcp" to use raw TCP.

manual_ack
defaults to False, allowing the library to acknowledge messages before
passing them to on_message callback. When set to True, every message
must be manually acknowledged by application call to
client.ack( *message.mid* )


Constructor Example
...................
Expand Down
35 changes: 33 additions & 2 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ def on_connect(client, userdata, flags, rc):
"""

def __init__(self, client_id="", clean_session=None, userdata=None,
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True,
manual_ack=False ):
"""client_id is the unique client id string used when connecting to the
broker. If client_id is zero length or None, then the behaviour is
defined by which protocol version is in use. If using MQTT v3.1.1, then
Expand Down Expand Up @@ -523,11 +524,20 @@ def __init__(self, client_id="", clean_session=None, userdata=None,
Set transport to "websockets" to use WebSockets as the transport
mechanism. Set to "tcp" to use raw TCP, which is the default.
Normally, when a message is received, the library automatically
acknowledges immediately. manual_ack=True allows the application to
acknowledge receipt after it has completed processing of a message
using a the ack() method. This addresses vulnerabilty to message loss
if applications fails while processing a message, or while it pending
locally.
"""

if transport.lower() not in ('websockets', 'tcp'):
raise ValueError(
'transport must be "websockets" or "tcp", not %s' % transport)
self._manual_ack = manual_ack
self._transport = transport.lower()
self._protocol = protocol
self._userdata = userdata
Expand Down Expand Up @@ -3328,7 +3338,10 @@ def _handle_publish(self):
return MQTT_ERR_SUCCESS
elif message.qos == 1:
self._handle_on_message(message)
return self._send_puback(message.mid)
if self._manual_ack:
return MQTT_ERR_SUCCESS
else:
return self._send_puback(message.mid)
elif message.qos == 2:
rc = self._send_pubrec(message.mid)
message.state = mqtt_ms_wait_for_pubrel
Expand All @@ -3338,6 +3351,24 @@ def _handle_publish(self):
else:
return MQTT_ERR_PROTOCOL

def ack( self, mid ):
"""
send an acknowledgement for a given message id. (stored in message.mid )
only useful in QoS=1 and auto_ack=False
"""
if not self._manual_ack :
return MQTT_ERR_SUCCESS
return self._send_puback(mid)

def manual_ack( self, on=False ):
"""
The paho library normally acknowledges messages as soon as they are delivered to the caller.
If manual_ack is turned on, then the caller MUST manually acknowledge every message once
application processing is complete.
"""
self._manual_ack = on


def _handle_pubrel(self):
if self._protocol == MQTTv5:
if self._in_packet['remaining_length'] < 2:
Expand Down

0 comments on commit 5afebe8

Please sign in to comment.