Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement automatic reconnection #287

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Implement automatic reconnection #287

wants to merge 1 commit into from

Conversation

empicano
Copy link
Owner

Hi there @frederikaalund @JonathanPlasse 😊

Frederik drafted how reconnection could look like a while ago already (thank you again, master of asyncio 🙏😄). I finally had some time to hack around with this! Some thoughts:

  • I'd like to avoid having two clients (one that reconnects and one that doesn't) because I suspect that this is going to be confusing to use and lead to more maintenance in the future. I'm thus trying to implement it as a reconnect=True parameter to the client and leave all method signatures the same.
  • Consequently, this means that all publish/subscribe/etc. calls have to be asynchronous, contrary to Frederik's original design. Instead of queuing the calls up, I'm thinking of blocking them and returning only when the connection returns and the call goes through.
  • This would eliminate the flip side that you mentioned as well, Frederik: "The user does not know if/when their events actually go through."

For now, I've implemented the reconnection background task and adapted the publish method to wait until the connection returns and the message could actually be published (both are probably still full of bugs). You can play around with it by shutting a local MQTT broker on and off (e.g. with ./scripts/develop) and by running:

import aiomqtt
import asyncio


async def main():
    client = aiomqtt.Client("localhost", 1883, reconnect=True, timeout=2)
    async with client:
        while True:
            await asyncio.sleep(1)
            print("Publishing message...")
            await client.publish("test", "Hello, World!")
            print("Message published.")


asyncio.run(main())

I'm still thinking about how to deal with existing subscriptions and last wills. We probably have to resend them when clean_session=True, otherwise they will cease to exist without notice to the user after a reconnection in the background.

Happy to hear what your thoughts are on this (or anyone else's who wants to chip in) 😊

@frederikaalund
Copy link
Collaborator

Thank you for looking into this! 👍 Automatic reconnect is definitely on the wish list of many of our users. :)

I do like the idea to have "everything in one client" as opposed to my approach with a separate low-level and high-level client. At least the single-client approach is easier to grok for our users. 👍

I looked through the code, and it looks good. I'm (as always) a little bit concerned about adding extra internal state to our client (the new reconnect task). Increases the maintenance burden. In any case, for this feature I don't think we can avoid it if we want to use a single client. 😄

Aside: There is a lot of "reset state" going on (re-creating the futures). That's not your fault (or the fault of this PR) but my own fault (using futures in the first place). 😅 I strongly suggest that we look at alternatives for all the internal futures and background tasks. I'm thinking anyio (to no ones surprise I guess). :)

I'm still thinking about how to deal with existing subscriptions and last wills. We probably have to resend them when clean_session=True, otherwise they will cease to exist without notice to the user after a reconnection in the background.

That is a concern of mine as well! Indeed, we would have to save all the subscriptions inside the client and then "resubscribe" when the connection is back online. These subscriptions are even more state to manage. :)


Here is a suggestion that is a slight variation of the current approach:

  • Single public client (that now can also automatically reconnect)
    • Rename the existing Client to _InternalClient (or something like that)
    • Create a new Client class with the exact same API as the existing client (our users won't see a difference!)
    • Use streams and broadcast mechanisms in the new Client to implement the reconnect logic.

This way, we get the separation of high-level and low-level and the easier maintenance that follows. It also allows allows to replace the _InternalClient with a new implementation in the future (maybe written in rust, using anyio, or something else).

Consequently, this means that all publish/subscribe/etc. calls have to be asynchronous, contrary to Frederik's original design. Instead of queuing the calls up, I'm thinking of blocking them and returning only when the connection returns and the call goes through.

I see the benefits of this approach! 👍 I suggest that we do both 😄 That is, to have both async def publish() and def publish_nowait(). This mimicks how both asyncio and anyio handles the "do you want to wait or not" question. In turn, it let's the user get to decide whether they want to wait for the connection or not. :)

Again, thank you for looking into this issue and very well done on the draft implementation. 👍 Let me know what you think of my comments above and do say if you have any questions. 😄

@empicano
Copy link
Owner Author

Thanks for your thoughts on this! Your reviews are one of the main reasons this project is so fun for me 😉

You're right about the internal state getting slightly out of control. While implementing this draft I already got bitten resetting futures while they were awaited elsewhere (which raises CancelledError) 😅

Use streams and broadcast mechanisms in the new Client to implement the reconnect logic.

I'm not sure I understand this point. Do you mean using streams and broadcast for the _nowait variants like you did in your example, or actually a different way to implement the reconnection? Apart from that, I like the idea of bringing more structure into it with the low-level client class 🙂

I'm on board with the publish and publish_nowait design as well 👍 I can imagine how these can be implemented in the reconnection case, but I wonder how publish_nowait could work when the client shouldn't reconnect. (How) Do we fail in the case the client is disconnected? Given that we probably work through the queued messages in the background without awaiting the task anywhere, throwing an exception won't show to the user, or can we propagate it somehow?

@frederikaalund
Copy link
Collaborator

Thanks for your thoughts on this! Your reviews are one of the main reasons this project is so fun for me 😉

Thank you for saying that. 😄 I don't have that much time these days but I do try to find it anyhow to at least do these reviews. :) It's a bit easier here during Easter.

I'm not sure I understand this point. Do you mean using streams and broadcast for the _nowait variants like you did in your example, or actually a different way to implement the reconnection? Apart from that, I like the idea of bringing more structure into it with the low-level client class 🙂

I'm was leaning towards doing "streams and broadcasts" to do the reconnection itself (like in my sample code). E.g., keep the current Client class more or less as is. Mostly, I'm worried about whether we (not just you and the code in this PR but all of us maintainers) can maintain the solution going forward. It's difficult enough as it is with all the internal futures and "partial reset state". Specifically, I'm concerned whether some task may await a future (e.g., _connected or _disconnected) while another task accidentally resets it (assigns a new value to _connected leaving the task to hang forever). I have similar worries for our _lock that we manually acquire and release. Do you get me? 😄


In any case, let's be pragmatic and review our options here:

  1. Continue with the current implementation in this PR (single client with reconnect=True flag, futures, background tasks, state resets).
    • Pro: We already have most of the implementation (thanks to this PR! 🙏)
    • Con: Client is more difficult to maintain now that the logic is more involved.
  2. Address the maintenance concerns first as another PR and build on that. E.g., split the client into a low-level and high-level implementation, and/or, use anyio instead of all the futures/asyncio.Tasks.
    • Pro: It's easier to maintain this project going forward and it's easier to add new features without accidentally breaking stuff.
    • Con: It takes a lot of time to write this. It'll delay the reconnection feature.

So in the larger perspective (time and resources being essential) I do actually lean towards option (1). Option 1 provides value here and now at the cost of future maintenance. 😄

If you agree, I think the next steps is to write out some test cases for this (to mitigate the maintenance cost). With tests in place, the pro-con calculation becomes easy since the maintenance cost goes towards zero. 😉

Again, thank you for all the time that you put into this PR (and the aiomqtt project in general). 👍 Let me know if you have any comments or questions. :)

@empicano
Copy link
Owner Author

Thanks for elaborating, I think I understand what you mean now 😊 I'll play around with streams and broadcasts to understand them a little better and see how much work the second option would bring! You convinced me that that's the better option 😄

I'll report back once I have more, could take a bit, though, as I'm busy the next few days 😋

@spacemanspiff2007
Copy link
Contributor

spacemanspiff2007 commented Apr 25, 2024

I just stumbled across this so sorry if I am late to the party. From my experience it might be better to split the client into a low level and high level because as mentioned it makes maintenance much easier, however I have no strong opinion on that.

While looking through the PR I am not sure I understood everything correctly:
It seems like publish will never return until the client is connected again possibly locking up the program. This is imho very unexpected.
Here I have a small program that only publishes and I just discard all messages on disconnect. I even made publish non-blocking and even non async through the use of a Queue.
On the other hand _messages still raises an error when the connection is lost. I think it would be good if the behavior of these two methods is aligned.
In case of a disconnect there will be a retry every two seconds. It would be nice if there is the possibility to provide something (e.g. generator) that backs up gracefully (e.g. 2, 4, 8 ... 300) secs delay.
Additionally it would be nice to have the possibility to specify an additional message that gets sent on graceful disconnect on the last will topic because last will is only used on abnormal disconnect.

I think he usage of the client is hard because there are multiple places where the disconnect error can occur. Typically I have a publish and a messages worker task and both can be the cause for a reconnect which I have to sync back to the task where I create the client and do the connect. Do you know by chance a good solution for that? I've come up with a quite complex solution, a stateful connection manager which connects and then creates the tasks. If one task throws an MqttError I cancel both tasks and try to trigger a reconnect.
Example from one application:
connection handler
publish task
messages task

Edit:
I think one of the reasons why it's currently hard is that the client object can not be reused.
On reconnect it's a new client and the method for processing messages has also be entered again.

@empicano
Copy link
Owner Author

empicano commented Jun 2, 2024

Thanks for chipping in and sorry for the late reply! 🙂

It seems like publish will never return until the client is connected again possibly locking up the program. This is imho very unexpected. [...] On the other hand _messages still raises an error when the connection is lost. I think it would be good if the behavior of these two methods is aligned.

I agree that publish and _messages should behave the same. If I understand you right, you're suggesting that the client throws an exception in both cases until the client has successfully reconnected in the background? Of the top of my head, I can't think of a way to know when to retry calling publish or _messages in that case without accessing internals (which I'd like to avoid). Waiting a fixed amount of time seems crude, and with the other option we could always use asyncio.timeout if we want to set an upper limit for how long we want to wait.

What benefits do you see from failing right away?

In case of a disconnect there will be a retry every two seconds. It would be nice if there is the possibility to provide something (e.g. generator) that backs up gracefully (e.g. 2, 4, 8 ... 300) secs delay.

Good idea 👍

Additionally it would be nice to have the possibility to specify an additional message that gets sent on graceful disconnect on the last will topic because last will is only used on abnormal disconnect.

I noticed that, too. Currently we can only set the last will on client initialization. Maybe we can add a function to set the last will dynamically in the future.

@spacemanspiff2007
Copy link
Contributor

I agree that publish and _messages should behave the same. If I understand you right, you're suggesting that the client throws an exception in both cases until the client has successfully reconnected in the background? Of the top of my head, I can't think of a way to know when to retry calling publish or _messages in that case without accessing internals (which I'd like to avoid). Waiting a fixed amount of time seems crude, and with the other option we could always use asyncio.timeout if we want to set an upper limit for how long we want to wait.

Maybe expose some client status through a property - e.g. client.is_connected?

I am suggesting that publish should not block when the client is disconnected because it's very unexpected. If I have a small program likes this

while True:
    await read_sensors()
    try:
        await client.publish(...)
    except Exception:
        pass
    await write_sensor_to_local_db()

it will stop reading sensors when the client is disconnected and it's not clear at all that it will behave like that.


I think it depends on the program how publish should behave and currently I can think of three different desired behaviors

  • Block until published
  • Notify the calling code that the client is disconnected e.g. through return value or exception and discard the message
  • Return some object which then can be
    • awaited - which will wait until the message is published
    • canceled - which will not try to publish it any more
    • queried - which will return the current status (published, pending), maybe the connection status, etc.

The third one is the most generic one because with it it's easy to implement the first two behaviors and I would expect a high level client to return something like that.

Maybe we can add a function to set the last will dynamically in the future.

I currently don't understand how a function will provide a message when gracefully disconnecting.
Here I have a dedicated publish task and when I cancel the task (graceful disconnect) I catch that and publish the message.

@empicano
Copy link
Owner Author

empicano commented Jun 5, 2024

Maybe expose some client status through a property - e.g. client.is_connected?

I'd like to avoid that, if possible. The whole internal connected / disconnected state is already all over the place, so I'm concerned that exposing this would lead to problems down the line.

I am suggesting that publish should not block when the client is disconnected because it's very unexpected.

I think I see why now. When reconnect=False, publish blocks until the message is published or fails when the connection is lost. With the initial idea and reconnect=True, publish blocks until the message is published, but doesn't fail when the connection is lost. The most intuitive design would be that publish behaves exactly the same in both cases, meaning also in the case of disconnection, right?

If we think about how an asynchronous publish and a synchronous publish_nowait with Frederik's suggestion could work in the two reconnect=True/False cases, with the above the most logical would thus be:

  1. reconnect=True/False & publish: block until the message is sent, fail on disconnection
  2. reconnect=True/False & publish_nowait: queue the message and return immediately, no failure on disconnection

However, publish_nowait doesn't make too much sense in case of reconnect=False because if that's all we use, we'll append and append messages to the queue without ever noticing that the client has disconnected. And if we don't block publish until reconnection in the reconnect=True case, publish_nowait doesn't really have a benefit any more, either.

I think you convinced me that a better design would be to implement only an asynchronous publish, but also make it fail on disconnection in the reconnect=True case. What I miss is a good way to know when to retry a failed publish in the case of reconnect=True.

I think it depends on the program how publish should behave and currently I can think of three different desired behaviors

Returning something (third bullet point) wouldn't work very well in the case of the _messages generator, so I'd focus on the second point and think about how we can achieve the first one on top.

One option would be to do the reconnect manually, so we could do await client.reconnect() to achieve something similar to client.is_connected. That would eliminate the whole discussion about publish in the reconnect=False and reconnect=True cases, because there won't be a reconnect client argument anymore. We're basically pulling the reconnection from the background into a manual function call. This seems pretty flexible.

I was in the past, and am still very against manual connect and disconnect methods, I'm not yet sure about how I feel about a reconnect method with regards to that.

This turned out to be a long rambling, I hope it's understandable 😄 Do say if something's not clear, and let me know what you think! Again, thanks for chipping in, it helps quite a lot to have other perspectives on this. You make some good points, not only here, but also in the other issues and PRs that you're involved with 👍


Here I have a dedicated publish task and when I cancel the task (graceful disconnect) I catch that and publish the message.

Ah, I think I understand what you mean now 👍 #28 is related, and a very interesting discussion. There already has been a solution proposed, I'm interested to hear what you think about it. Maybe you are up to do a PR? 🙂

@spacemanspiff2007
Copy link
Contributor

The most intuitive design would be that publish behaves exactly the same in both cases, meaning also in the case of disconnection, right?

This is not very nice but not my main issue. With reconnect=True the application will be stuck in await client.publish() until the client is connected again. In my example above the program also writes the sensor values to a database. If the broker goes down all values will be lost until the broker is up again and there will be a gap in the db values.
This behavior is very unexpected and it's not clear that this will happen from looking at the code snippet.

However, publish_nowait doesn't make too much sense in case of reconnect=False because if that's all we use, we'll append and append messages to the queue without ever noticing that the client has disconnected.

I would have expected publish_nowait to discard messages when the client is not connected. On disconnect the queue should be emptied.
In the sml2mqtt program I implemented it an same manner:
publish only works when the client is connected and on disconnect the queue is discarded.
A behavior like this would make small programs that continuously collect data very easy to implement.

Returning something (third bullet point) wouldn't work very well in the case of the _messages generator, so I'd focus on the second point and think about how we can achieve the first one on top.

Could you please elaborate why this would be an issue? Currently I fail to see how the publish call interferes with messages which is used to process incoming messages.
I still think from a usability perspective the is the best option because it covers all use cases and makes it very clear from the calling code what is happening:

# create obj
msg = client.publish(...)

msg.cancel_publish()

# status or flags
msg.status
msg.is_published

# wait for publish
await msg.published()
await msg.published(reconnect=True)  # Could be on a per message basis

One option would be to do the reconnect manually, so we could do await client.reconnect() to achieve something similar to client.is_connected. That would eliminate the whole discussion about publish in the reconnect=False and reconnect=True cases, because there won't be a reconnect client argument anymore. We're basically pulling the reconnection from the background into a manual function call. This seems pretty flexible.

What would your goal be with that and can you make a pseudo example? Do I as a caller still have to catch an error and manually call reconnect? Do I have to do it both in publish and messages?

Again, thanks for chipping in, it helps quite a lot to have other perspectives on this. You make some good points, not only here, but also in the other issues and PRs that you're involved with 👍

Thanks - that means a lot!

@carstenandrich
Copy link

@empicano Opened discussion #316 regarding a possibly slightly simpler implementation approach.

@mirko
Copy link

mirko commented Sep 10, 2024

I'm coming from #334 - and would just like to provide some input from my perspective, as I was asked to. Keep in mind, though, that I'm having a more low-level background (microcontrollers, C) and I'm only having one - my above referenced - use-case in mind.

Assuming the connect()/disconnect() methods are not coming back, I - as a dev using this lib - would expect publish calls to fail and not (a) being queued and potentially fail later and (b) wait (potentially indefinitely) until completed.

That being said, I'd expect if await client.publish() finishes, the message was sent (or at least written to the underlying socket). And I'd expect (different) exceptions being thrown if it wasn't published (in time) - to individually be able to react to network issues and potentially (indefinitely) try again. Like, a timeout is something I'd react differently to as to a "connection refused" or a routing problem. Re timeout I probably wouldn't over-complicate things and just use the timeout of the underlying TCP conn.

Network issues while listening to subscribed topics appears a more challenging topic at first glance to me. Depending on the use-case I might definitely wanna know if I potentially missed a message - or I don't care and expect the loop iterating over messages, most likely running in its dedicated task, to keep going at best effort. What really doesn't help is how exceptions thrown in asyncio tasks are supposed to be handled, which - according to my knowledge - only works with a global asyncio exception handler. I guess it's still better than just leaving the user in the dark whether messages were potentially unhandled due to network issues, though.

That doesn't even address the actual issue of how to actually implement reconnection, though. At first thought I'd be fine with both scenarios - given above assumptions hold true for both:
a) re-connection happens transparently in background (and I loop manually loop around e.g. failed publish() calls and try again until I decide it's enough)
b) I need to trigger a (re-)connection manually, e.g. in the exception handler handling a failed publish() call

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants