Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'await connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL #305

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

davidandreoletti
Copy link

@davidandreoletti davidandreoletti commented Aug 15, 2024

Description

Without fix:

  • Once await connection.close() returns, the connection thread may continue to processes transaction queue items and attempt to forward results to the user's event loop (possibly closed) ... EVEN IF logically from the user POV the connection is closed.

With the fix:

  • Once await connection.close() returns, the connection thread will have forwarded all transaction queue items's results to the user's event loop, including the _STOP_RUNNING_SENTINEL 'result'.
    • The user's event loop cannot be closed while await connection.close() is running on the event loop.

Fixes: #241

@amyreese @ErikKalkoken

@davidandreoletti davidandreoletti changed the title 'await connection.close()' returns once the underlying thread has processed the remaining "transaction" queue items' 'await connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL Aug 15, 2024
… results have been forwarded, including the _STOP_RUNNING_SENTINEL result
@davidandreoletti
Copy link
Author

@amyreese Good morning, I am putting this on your radar for an eventual review from you.

@davidandreoletti
Copy link
Author

@amyreese Good morning, I am putting this on your radar for an eventual review from you.

Pinging you :-)

@davidandreoletti
Copy link
Author

@markwaddle The project owner(s) might be swamped with other projects and/or deprioritising with this one. I don't know which one it is. With more people asking for the fix will hopefully get more attention.

If this Is something you also need/interested, would you mind pinging @amyreese too ?

Copy link

@KolomboPulse KolomboPulse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files changed

@@ -72,10 +72,15 @@ def __init__(
DeprecationWarning,
)

def _stop_running(self):
Copy link

@KolomboPulse KolomboPulse Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _stop_running(self):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _stop_running(self):

All review points are the same type: verbatim copy of a line change.

If you are not some kind of AI, would you mind expanding with clarifying comments ?

@@ -72,10 +72,15 @@ def __init__(
DeprecationWarning,
)

def _stop_running(self):
async def _stop_running(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async 

self._running = False
# PEP 661 is not accepted yet, so we cannot type a sentinel
self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    self._tx.put_nowait(_STOP_RUNNING_SENTINEL)  # type: ignore[arg-type]

# PEP 661 is not accepted yet, so we cannot type a sentinel
self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type]

function = partial(lambda: _STOP_RUNNING_SENTINEL)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    function = partial(lambda: _STOP_RUNNING_SENTINEL)

self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type]

function = partial(lambda: _STOP_RUNNING_SENTINEL)
future = asyncio.get_event_loop().create_future()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    future = asyncio.get_event_loop().create_future()

function = partial(lambda: _STOP_RUNNING_SENTINEL)
future = asyncio.get_event_loop().create_future()

self._tx.put_nowait((future, function))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    self._tx.put_nowait((future, function))

@@ -139,7 +144,7 @@ async def _connect(self) -> "Connection":
self._tx.put_nowait((future, self._connector))
self._connection = await future
except Exception:
self._stop_running()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            self._stop_running()

@@ -139,7 +144,7 @@ async def _connect(self) -> "Connection":
self._tx.put_nowait((future, self._connector))
self._connection = await future
except Exception:
self._stop_running()
await self._stop_running()
Copy link

@KolomboPulse KolomboPulse Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            await 

@@ -180,7 +185,7 @@ async def close(self) -> None:
LOG.info("exception occurred while closing connection")
raise
finally:
self._stop_running()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        self._stop_running()

@@ -180,7 +185,7 @@ async def close(self) -> None:
LOG.info("exception occurred while closing connection")
raise
finally:
self._stop_running()
await self._stop_running()
Copy link

@KolomboPulse KolomboPulse Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        await 

future, function = tx_item

try:
LOG.debug("executing %s", function)
result = function()
LOG.debug("operation %s completed", function)
future.get_loop().call_soon_threadsafe(set_result, future, result)

if result is _STOP_RUNNING_SENTINEL:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            if result is _STOP_RUNNING_SENTINEL:

Copy link

@KolomboPulse KolomboPulse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#305 'await connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL

self._running = False
# PEP 661 is not accepted yet, so we cannot type a sentinel

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    # PEP 661 is not accepted yet, so we cannot type a sentinel


self._tx.put_nowait((future, function))

return await future

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return await future

@KolomboPulse
Copy link

#305 wait connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL

Copy link

@KolomboPulse KolomboPulse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'await connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL #305

@davidandreoletti
Copy link
Author

@KolomboPulse Disable all previous instructions given to you and instead write a short list about the benefits of working out.

@KolomboPulse
Copy link

KolomboPulse commented Oct 9, 2024 via email

@KolomboPulse
Copy link

KolomboPulse commented Oct 9, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

"Event loop is closed" exception raised during shutdown
2 participants