Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Yield remaining results from async map_batches #47696

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ async def process_all_batches():
future = asyncio.run_coroutine_threadsafe(process_all_batches(), loop)

# Yield results as they become available.
while not future.done():
# After all futures are completed, drain the queue to
# yield any remaining results.
while not future.done() or not output_batch_queue.empty():
# Here, `out_batch` is a one-row output batch
# from the async generator, corresponding to a
# single row from the input batch.
Expand Down
41 changes: 39 additions & 2 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,7 @@ def test_map_batches_async_generator(shutdown_only):
ray.init(num_cpus=10)

async def sleep_and_yield(i):
print("sleep", i)
await asyncio.sleep(i % 5)
print("yield", i)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extraneous prints

return {"input": [i], "output": [2**i]}

class AsyncActor:
Expand Down Expand Up @@ -1119,6 +1117,45 @@ async def __call__(self, batch):
assert "assert False" in str(exc_info.value)


def test_map_batches_async_generator_fast_yield(shutdown_only):
# Tests the case where the async generator yields immediately,
# with a high number of tasks in flight, which results in
# the internal queue being almost instantaneously filled.
# This test ensures that the internal queue is completely drained in this scenario.

ray.shutdown()
ray.init(num_cpus=4)

async def task_yield(row):
return row

class AsyncActor:
def __init__(self):
pass

async def __call__(self, batch):
rows = [{"id": np.array([i])} for i in batch["id"]]
tasks = [asyncio.create_task(task_yield(row)) for row in rows]
for task in tasks:
yield await task

n = 8
ds = ray.data.range(n, override_num_blocks=n)
ds = ds.map_batches(
AsyncActor,
batch_size=n,
compute=ray.data.ActorPoolStrategy(size=1, max_tasks_in_flight_per_actor=n),
concurrency=1,
max_concurrency=n,
)

output = ds.take_all()
expected_output = [{"id": i} for i in range(n)]
# Because all tasks are submitted almost simultaneously,
# the output order may be different compared to the original input.
assert len(output) == len(expected_output), (len(output), len(expected_output))


if __name__ == "__main__":
import sys

Expand Down