Skip to content

Commit

Permalink
bpo-32751: Wait for task cancellation in asyncio.wait_for() (pythonGH…
Browse files Browse the repository at this point in the history
…-7216)

Currently, asyncio.wait_for(fut), upon reaching the timeout deadline,
cancels the future and returns immediately.  This is problematic for
when *fut* is a Task, because it will be left running for an arbitrary
amount of time.  This behavior is iself surprising and may lead to
related bugs such as the one described in bpo-33638:

    condition = asyncio.Condition()
    async with condition:
        await asyncio.wait_for(condition.wait(), timeout=0.5)

Currently, instead of raising a TimeoutError, the above code will fail
with `RuntimeError: cannot wait on un-acquired lock`, because
`__aexit__` is reached _before_ `condition.wait()` finishes its
cancellation and re-acquires the condition lock.

To resolve this, make `wait_for` await for the task cancellation.
The tradeoff here is that the `timeout` promise may be broken if the
task decides to handle its cancellation in a slow way.  This represents
a behavior change and should probably not be back-patched to 3.6 and
earlier.
(cherry picked from commit e2b340a)

Co-authored-by: Elvis Pranskevichus <[email protected]>
  • Loading branch information
elprans authored and miss-islington committed May 29, 2018
1 parent 2a7eb0b commit e0f2c5d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 3 deletions.
9 changes: 8 additions & 1 deletion Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,9 @@ Task functions

Returns result of the Future or coroutine. When a timeout occurs, it
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
cancellation, wrap it in :func:`shield`.
cancellation, wrap it in :func:`shield`. The function will wait until
the future is actually cancelled, so the total wait time may exceed
the *timeout*.

If the wait is cancelled, the future *fut* is also cancelled.

Expand All @@ -796,3 +798,8 @@ Task functions

.. versionchanged:: 3.4.3
If the wait is cancelled, the future *fut* is now also cancelled.

.. versionchanged:: 3.7
When *fut* is cancelled due to a timeout, ``wait_for`` now waits
for *fut* to be cancelled. Previously,
it raised :exc:`~asyncio.TimeoutError` immediately.
23 changes: 21 additions & 2 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,17 @@ async def wait_for(fut, timeout, *, loop=None):
return fut.result()
else:
fut.remove_done_callback(cb)
fut.cancel()
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise futures.TimeoutError()
finally:
timeout_handle.cancel()


async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait() and wait_for().
"""Internal helper for wait().
The fs argument must be a collection of Futures.
"""
Expand Down Expand Up @@ -461,6 +464,22 @@ def _on_completion(f):
return done, pending


async def _cancel_and_wait(fut, loop):
"""Cancel the *fut* future or task and wait until it completes."""

waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)

try:
fut.cancel()
# We cannot wait on *fut* directly to make
# sure _cancel_and_wait itself is reliably cancellable.
await waiter
finally:
fut.remove_done_callback(cb)


# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.
Expand Down
13 changes: 13 additions & 0 deletions Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,19 @@ def test_ambiguous_loops(self):
with self.assertRaises(ValueError):
asyncio.Condition(lock, loop=loop)

def test_timeout_in_block(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)

async def task_timeout():
condition = asyncio.Condition(loop=loop)
async with condition:
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(condition.wait(), timeout=0.5,
loop=loop)

loop.run_until_complete(task_timeout())


class SemaphoreTests(test_utils.TestCase):

Expand Down
56 changes: 56 additions & 0 deletions Lib/test/test_asyncio/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,62 @@ def gen():
res = loop.run_until_complete(task)
self.assertEqual(res, "ok")

def test_wait_for_waits_for_task_cancellation(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)

task_done = False

async def foo():
async def inner():
nonlocal task_done
try:
await asyncio.sleep(0.2, loop=loop)
finally:
task_done = True

inner_task = self.new_task(loop, inner())

with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)

self.assertTrue(task_done)

loop.run_until_complete(foo())

def test_wait_for_self_cancellation(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)

async def foo():
async def inner():
try:
await asyncio.sleep(0.3, loop=loop)
except asyncio.CancelledError:
try:
await asyncio.sleep(0.3, loop=loop)
except asyncio.CancelledError:
await asyncio.sleep(0.3, loop=loop)

return 42

inner_task = self.new_task(loop, inner())

wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)

# Test that wait_for itself is properly cancellable
# even when the initial task holds up the initial cancellation.
task = self.new_task(loop, wait)
await asyncio.sleep(0.2, loop=loop)
task.cancel()

with self.assertRaises(asyncio.CancelledError):
await task

self.assertEqual(await inner_task, 42)

loop.run_until_complete(foo())

def test_wait(self):

def gen():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now
wait until the cancellation is complete.

0 comments on commit e0f2c5d

Please sign in to comment.