Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32751: Wait for task cancellation in asyncio.wait_for() #7216

Merged
merged 3 commits into from
May 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,9 @@ Task functions

Returns result of the Future or coroutine. When a timeout occurs, it
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
cancellation, wrap it in :func:`shield`.
cancellation, wrap it in :func:`shield`. The function will wait until
the future is actually cancelled, so the total wait time may exceed
the *timeout*.

If the wait is cancelled, the future *fut* is also cancelled.

Expand All @@ -796,3 +798,8 @@ Task functions

.. versionchanged:: 3.4.3
If the wait is cancelled, the future *fut* is now also cancelled.

.. versionchanged:: 3.7
When *fut* is cancelled due to a timeout, ``wait_for`` now waits
for *fut* to be cancelled. Previously,
it raised :exc:`~asyncio.TimeoutError` immediately.
23 changes: 21 additions & 2 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,17 @@ async def wait_for(fut, timeout, *, loop=None):
return fut.result()
else:
fut.remove_done_callback(cb)
fut.cancel()
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise futures.TimeoutError()
finally:
timeout_handle.cancel()


async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait() and wait_for().
"""Internal helper for wait().

The fs argument must be a collection of Futures.
"""
Expand Down Expand Up @@ -461,6 +464,22 @@ def _on_completion(f):
return done, pending


async def _cancel_and_wait(fut, loop):
"""Cancel the *fut* future or task and wait until it completes."""

waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)

try:
fut.cancel()
# We cannot wait on *fut* directly to make
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice trick!

# sure _cancel_and_wait itself is reliably cancellable.
await waiter
finally:
fut.remove_done_callback(cb)


# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.
Expand Down
13 changes: 13 additions & 0 deletions Lib/test/test_asyncio/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,19 @@ def test_ambiguous_loops(self):
with self.assertRaises(ValueError):
asyncio.Condition(lock, loop=loop)

def test_timeout_in_block(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)

async def task_timeout():
condition = asyncio.Condition(loop=loop)
async with condition:
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(condition.wait(), timeout=0.5,
loop=loop)

loop.run_until_complete(task_timeout())


class SemaphoreTests(test_utils.TestCase):

Expand Down
56 changes: 56 additions & 0 deletions Lib/test/test_asyncio/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,62 @@ def gen():
res = loop.run_until_complete(task)
self.assertEqual(res, "ok")

def test_wait_for_waits_for_task_cancellation(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)

task_done = False

async def foo():
async def inner():
nonlocal task_done
try:
await asyncio.sleep(0.2, loop=loop)
finally:
task_done = True

inner_task = self.new_task(loop, inner())

with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)

self.assertTrue(task_done)

loop.run_until_complete(foo())

def test_wait_for_self_cancellation(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)

async def foo():
async def inner():
try:
await asyncio.sleep(0.3, loop=loop)
except asyncio.CancelledError:
try:
await asyncio.sleep(0.3, loop=loop)
except asyncio.CancelledError:
await asyncio.sleep(0.3, loop=loop)

return 42

inner_task = self.new_task(loop, inner())

wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)

# Test that wait_for itself is properly cancellable
# even when the initial task holds up the initial cancellation.
task = self.new_task(loop, wait)
await asyncio.sleep(0.2, loop=loop)
task.cancel()

with self.assertRaises(asyncio.CancelledError):
await task

self.assertEqual(await inner_task, 42)

loop.run_until_complete(foo())

def test_wait(self):

def gen():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now
wait until the cancellation is complete.