Skip to content

Commit

Permalink
[Data] Fix event loop mismatch with async map (#47907)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

Fix a bug with async Map operator, where in Python < 3.11, there are
multiple event loops created and occurs in an event loop mismatch error.
See #47734 for more details.

## Related issue number
Closes #47734

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Scott Lee <[email protected]>
  • Loading branch information
scottjlee authored Oct 4, 2024
1 parent ede8246 commit 8d15847
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pyarrow as pa

import ray
from ray._private.utils import get_or_create_event_loop
from ray.data._internal.compute import get_compute
from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.execution.interfaces.task_context import TaskContext
Expand Down Expand Up @@ -65,7 +66,7 @@ def __init__(

def _init_async(self):
# Only used for callable class with async generator `__call__` method.
loop = asyncio.new_event_loop()
loop = get_or_create_event_loop()

def run_loop():
asyncio.set_event_loop(loop)
Expand Down

0 comments on commit 8d15847

Please sign in to comment.