Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[streaming] Current task being changed during async iteration #37147

Closed
edoakes opened this issue Jul 6, 2023 · 3 comments · Fixed by #37972
Closed

[streaming] Current task being changed during async iteration #37147

edoakes opened this issue Jul 6, 2023 · 3 comments · Fixed by #37972
Assignees
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks

Comments

@edoakes
Copy link
Contributor

edoakes commented Jul 6, 2023

After each yield of an async iterator, the task running the user code seems to change. This breaks user code and libraries (it was discovered because it causes an exception when trying to release a lock that uses the current task as an ID).

Repro:

import asyncio
import ray

@ray.remote
class A:
    async def gen(self):
        print("BEFORE LOOP:", asyncio.current_task())
        for i in range(5):
            print("BEFORE YIELD:", asyncio.current_task())
            yield i
            print("AFTER YIELD:", asyncio.current_task())

        print("AFTER LOOP:", asyncio.current_task())

a = A.remote()
for obj_ref in ray.get(a.gen.options(num_returns="streaming").remote()):
    print(ray.get(obj_ref))

Output:

(ray) eoakes@Edwards-MBP-2 serve % python test_ray.py
2023-07-06 10:43:47,140 INFO worker.py:1610 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(A pid=82093) BEFORE LOOP: <Task pending name='Task-2' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) BEFORE YIELD: <Task pending name='Task-2' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
0
1
2
3
4
(A pid=82093) AFTER YIELD: <Task pending name='Task-3' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) BEFORE YIELD: <Task pending name='Task-3' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) AFTER YIELD: <Task pending name='Task-4' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) BEFORE YIELD: <Task pending name='Task-4' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) AFTER YIELD: <Task pending name='Task-5' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) BEFORE YIELD: <Task pending name='Task-5' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) AFTER YIELD: <Task pending name='Task-6' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) BEFORE YIELD: <Task pending name='Task-6' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) AFTER YIELD: <Task pending name='Task-7' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>
(A pid=82093) AFTER LOOP: <Task pending name='Task-7' coro=<<async_generator_asend without __name__>()> cb=[_chain_future.<locals>._call_set_state() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/futures.py:367]>

In comparison, vanilla asyncio:

import asyncio

async def gen():
    print("BEFORE LOOP:", asyncio.current_task())
    for i in range(5):
        print("BEFORE YIELD:", asyncio.current_task())
        yield i
        print("AFTER YIELD:", asyncio.current_task())
    print("AFTER LOOP:", asyncio.current_task())

async def consume():
    async for i in gen():
        print(i)

asyncio.new_event_loop().run_until_complete(consume())
(ray) eoakes@Edwards-MBP-2 serve % python test.py
BEFORE LOOP: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
BEFORE YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
0
AFTER YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
BEFORE YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
1
AFTER YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
BEFORE YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
2
AFTER YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
BEFORE YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
3
AFTER YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
BEFORE YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
4
AFTER YIELD: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
AFTER LOOP: <Task pending name='Task-1' coro=<consume() running at test.py:14> cb=[_run_until_complete_cb() at /Users/eoakes/miniforge3/envs/ray/lib/python3.8/asyncio/base_events.py:184]>
@edoakes edoakes added bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks streaming labels Jul 6, 2023
@edoakes edoakes changed the title [streaming] Current task being changed in every async iterator iteration [streaming] Current task being changed during async iteration Jul 6, 2023
@rkooo567 rkooo567 added the core Issues that should be addressed in Ray Core label Jul 7, 2023
@rkooo567
Copy link
Contributor

rkooo567 commented Jul 7, 2023

@edoakes I think refactoring requires some amount of work. I am working on prototyping to check how difficult it is.

Besides, do you think it is too hacky if I just set the same task name for the all tasks created by the generator? It seems like I can get the name of the task using task.get_name() and set it using task.set_name(). I can probably just set the same name for all newly created tasks via generator.asend?

@rkooo567
Copy link
Contributor

rkooo567 commented Jul 7, 2023

Hmm ^ approach doesn't seem to work with 3.7...

@edoakes
Copy link
Contributor Author

edoakes commented Jul 7, 2023

I don't think setting task name alone is sufficient, user/library code maybe be relying on the full Task object.

rkooo567 added a commit that referenced this issue Aug 4, 2023
…new task name #37713 (#37972)

This PR fixes #37147 by dispatching the whole generator task into an event loop (instead of dispatching individual anext).

The PR could have a slight performance impact because the task output serialization code is inside the event loop unlike before (the approach to avoid this was tried in this PR #37713, but it is too hacky).

Putting the whole generator task into an event loop instead of dispatching individual anext.
This means some of core APIs are called inside an event loop. Had to remove the usage of worker_context because it is not working well when it is called inside a different thread (event loop thread). Instead we pass necessary argument
NripeshN pushed a commit to NripeshN/ray that referenced this issue Aug 15, 2023
…new task name ray-project#37713 (ray-project#37972)

This PR fixes ray-project#37147 by dispatching the whole generator task into an event loop (instead of dispatching individual anext).

The PR could have a slight performance impact because the task output serialization code is inside the event loop unlike before (the approach to avoid this was tried in this PR ray-project#37713, but it is too hacky).

Putting the whole generator task into an event loop instead of dispatching individual anext.
This means some of core APIs are called inside an event loop. Had to remove the usage of worker_context because it is not working well when it is called inside a different thread (event loop thread). Instead we pass necessary argument

Signed-off-by: NripeshN <[email protected]>
arvind-chandra pushed a commit to lmco/ray that referenced this issue Aug 31, 2023
…new task name ray-project#37713 (ray-project#37972)

This PR fixes ray-project#37147 by dispatching the whole generator task into an event loop (instead of dispatching individual anext).

The PR could have a slight performance impact because the task output serialization code is inside the event loop unlike before (the approach to avoid this was tried in this PR ray-project#37713, but it is too hacky).

Putting the whole generator task into an event loop instead of dispatching individual anext.
This means some of core APIs are called inside an event loop. Had to remove the usage of worker_context because it is not working well when it is called inside a different thread (event loop thread). Instead we pass necessary argument

Signed-off-by: e428265 <[email protected]>
vymao pushed a commit to vymao/ray that referenced this issue Oct 11, 2023
…new task name ray-project#37713 (ray-project#37972)

This PR fixes ray-project#37147 by dispatching the whole generator task into an event loop (instead of dispatching individual anext).

The PR could have a slight performance impact because the task output serialization code is inside the event loop unlike before (the approach to avoid this was tried in this PR ray-project#37713, but it is too hacky).

Putting the whole generator task into an event loop instead of dispatching individual anext.
This means some of core APIs are called inside an event loop. Had to remove the usage of worker_context because it is not working well when it is called inside a different thread (event loop thread). Instead we pass necessary argument

Signed-off-by: Victor <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
2 participants