Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-97696: Initial work on eager tasks #98137

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,38 @@ def create_task(self, coro, *, name=None, context=None):

return task

def eager_task_factory(self, coro, *, name=None, context=None):
Copy link
Member

@1st1 1st1 Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this doesn't have to be a loop-level method? I imagined this to be a top-level function in the asyncio package.

One would install it like this:

async def main():
    loop = asyncio.get_running_loop()
    loop.set_task_factory(asyncio.eager_task_factory)
    
    # ... the rest of the main() function ...

asyncio.run(main())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try very hard to make that work yet... The problem is that the new factory has to call the old factory, but (at least in the Python version) the old factory defaults to None and then create_task() just constructs an instance of the Task class. But when there's a different loop I don't know how to do the equivalent thing -- can I just assume that get_task_factory() shouldn't return None if they have a different task class? (Is uvloop's factory ever None?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the new factory has to call the old factory

Why? I don't think this is a requirement. I remember we discussed it with the instagram people and the idea was to have two functions in asyncio:

# The custom eager factory API that instagram wants to use because
# they have a bunch of instrumentation on top of asyncio.Task:
def asyncio.create_eager_task_factory(custom_task_constructor):

    def factory(coro):
        try:
            result = coro.send(None)
        except StopIteration as si:
             f = Future()
             f.set_result(si.value)
             return f
        except Exception as ex:
             f = Future()
             f.set_exception(ex)
             return f
        else:
             return custom_task_constructor(coro, coro_result=result)

    return factory

# The default eager factory API that most users will use:
asyncio.eager_task_factory = create_eager_task_factory(asyncio.Task)

The implementation of loop.set_task_factory and loop.create_task methods would stay the same. We'd have to add the coro_result= keyword-only argument to asyncio.Task (both Python and C versions), but that I think is acceptable.

I just assume that get_task_factory() shouldn't return None if they have a different task class? (Is uvloop's factory ever None?)

I think None is just a marker for the "default factory is used" scenario. I don't think we were ever wanted to chain factories.


Overall I really want this to be implemented not on the event loop, the loop API surface is already very large.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's a nice solution. If I get back to this before anyone else I'll use that.

"""Start a coroutine.

This runs the coroutine until it first suspends itself.

If it runs till completion or fails without suspending,
return a future with the result or exception.

Otherwise schedule the resumption and return a task.
"""
self._check_closed()
# Do not go through the task factory.
# This _is_ the task factory.
if tasks.Task is not tasks._PyTask:
task = tasks.Task(coro, loop=self, name=name, context=context)
else:
try:
yield_result = coro.send(None)
except BaseException as exc:
fut = self.create_future()
# XXX What about AsyncStopIteration?
if isinstance(exc, StopIteration):
fut.set_result(exc.value)
else:
fut.set_exception(exc)
return fut
task = tasks.Task(coro, loop=self, name=name, context=context,
yield_result=yield_result)
if task._source_traceback:
del task._source_traceback[-1]
return task

def set_task_factory(self, factory):
"""Set a task factory that will be used by loop.create_task().

Expand Down
7 changes: 5 additions & 2 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,14 @@ def create_task(self, coro, *, name=None, context=None):
raise RuntimeError(f"TaskGroup {self!r} is finished")
if self._aborting:
raise RuntimeError(f"TaskGroup {self!r} is shutting down")
if context is None:
if hasattr(self._loop, "eager_task_factory"):
task = self._loop.eager_task_factory(coro, name=name, context=context)
elif context is None:
task = self._loop.create_task(coro)
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
if not task.done(): # If it's done already, it's a future
tasks._set_task_name(task, name)
task.add_done_callback(self._on_task_done)
self._tasks.add(task)
return task
Expand Down
19 changes: 14 additions & 5 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def _set_task_name(task, name):
set_name(name)


_NOT_SET = object()

class Task(futures._PyFuture): # Inherit Python Task implementation
# from a Python Future implementation.

Expand All @@ -93,7 +95,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True

def __init__(self, coro, *, loop=None, name=None, context=None):
def __init__(self, coro, *, loop=None, name=None, context=None,
yield_result=_NOT_SET):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
Expand All @@ -117,7 +120,10 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
else:
self._context = context

self._loop.call_soon(self.__step, context=self._context)
if yield_result is _NOT_SET:
self._loop.call_soon(self.__step, context=self._context)
else:
self.__step2(yield_result)
_register_task(self)

def __del__(self):
Expand Down Expand Up @@ -287,6 +293,12 @@ def __step(self, exc=None):
except BaseException as exc:
super().set_exception(exc)
else:
self.__step2(result)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __step2(self, result):
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
Expand Down Expand Up @@ -333,9 +345,6 @@ def __step(self, exc=None):
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __wakeup(self, future):
try:
Expand Down