Minimal wrapper to use the Paho MQTT Client with Python asyncio.
Initially inspired by the loop_asyncio.py
example of the Paho Client.
- Automatic background reconnect: Wrapper will automatically and continuously try to reconnect in the background when the connection is lost.
No user intervention is necessary. Outbound messages are discarded while no connection is active.
The wrapper will quit upon
DISCONNECT
from broker with reason code0x00
(Normal disconnection). Delays between reconnect attempts can be configured arbitrarily. - Asynchronous iterator (
async for
) to consume inbound message instead ofon_message()
callback. The iterator persists over automatic reconnects and will only stop upon client-side termination. - Synchronous (default) or asynchronous initial connection support:
- In synchronous mode, the first connection attempt must succeed or an exception will be raised. Best for (semi-)interactive applications.
- In asynchronous mode, will try to connect indefinitely. Intended primarily for system services.
- Backlog prevention:
The Paho MQTT Client keeps an unlimited backlog of unsent QoS=0 messages, only limiting the number of in-flight QoS>0 messages.
If the upstream bandwidth drops below the average data rate of outbound messages, the backlog may grow indefinitely.
When periodically publishing data over an unstable connection, backlogged stale data will clog the connection and increase upstream message latency.
Instead, this wrapper will drop outbound messages if the socket send buffer (
SO_SNDBUF
) is full. If a message was dropped,publish()
returnsMQTTErrorCode.MQTT_ERR_QUEUE_SIZE
. - Optional logging of background events (connect, disconnect, failed reconnect attempt) via the Python
logging
facility. Useasyncio_mqtt.AsyncMqttClientLog
instead ofasyncio_mqtt.AsyncMqttClient
to enable logging.
import asyncio
import asyncio_mqtt as mqttc
# need to override on_connect() to reliably (re-)subscribe on (re-)connection
class Example(mqttc.AsyncMqttClient):
def on_connect(self, client, userdata, flags, reason_code, properties):
super().on_connect(client, userdata, flags, reason_code, properties)
self.client.subscribe("topic")
# exemplary coroutine periodically publishing messages
async def publish(client):
while True:
rc, mid = client.publish("another topic", b"some data")
if rc != mqttc.MQTTErrorCode.MQTT_ERR_SUCCESS:
pass
await asyncio.sleep(1)
async def main():
async with Example("test.mosquitto.org", keepalive=5) as client:
asyncio.create_task(publish(client))
async for msg in client:
print(f"{msg.topic}: {msg.payload}")
if __name__ == "__main__":
asyncio.run(main())
See example.py
for a more advanced example with logging.
Run unit tests with python3 -m unittest discover -v
.