Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deflake occasional deadlock in test_dataset.py::test_basic_actors[True] #21970

Merged
merged 11 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2475,6 +2475,18 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
"""
return self._blocks.get_blocks()

@DeveloperAPI
def force_reads(self) -> "Dataset[T]":
"""Force full evaluation of the blocks of this dataset.

This can be used to read all blocks into memory. By default, Datasets
doesn't read blocks from the datasource until the first transform.
"""
blocks = self.get_internal_block_refs()
bar = ProgressBar("Force reads", len(blocks))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

bar.block_until_complete(blocks)
return self

@DeveloperAPI
def stats(self) -> str:
"""Returns a string containing execution timing information."""
Expand Down
27 changes: 17 additions & 10 deletions python/ray/data/impl/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
from ray.data.dataset_pipeline import DatasetPipeline


# Temporarily use an actor here to avoid ownership issues with tasks:
# https://github.com/ray-project/ray/issues/20554
@ray.remote(num_cpus=0, placement_group=None)
Copy link
Contributor

@clarkzinzow clarkzinzow Jan 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that @stephanie-wang's PR that ports stage task launching to a threadpool will also fix this by launching all tasks from the driver, and she'll have to resolve some merge conflicts here. #21845

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be an alternative to the thread pools as well, though it's not tolerant of the actor failure.

def pipeline_stage(fn: Callable[[], Dataset[T]], context: DatasetContext) -> Dataset[T]:
DatasetContext._set_current(context)
try:
prev = set_progress_bars(False)
return fn()
finally:
set_progress_bars(prev)
class _StageRunner:
def run(self, fn: Callable[[], Dataset[T]], context: DatasetContext) -> Dataset[T]:
DatasetContext._set_current(context)
try:
prev = set_progress_bars(False)
# Force eager evaluation of all blocks in the pipeline stage. This
# prevents resource deadlocks due to overlapping stage execution
# (e.g., task -> actor stage).
return fn().force_reads()
finally:
set_progress_bars(prev)


class PipelineExecutor:
Expand All @@ -27,8 +33,9 @@ def __init__(self, pipeline: "DatasetPipeline[T]"):
self._stages: List[ObjectRef[Dataset[Any]]] = [None] * (
len(self._pipeline._stages) + 1
)
self._stage_runners = [_StageRunner.remote() for _ in self._stages]
self._iter = iter(self._pipeline._base_iterable)
self._stages[0] = pipeline_stage.remote(
self._stages[0] = self._stage_runners[0].run.remote(
next(self._iter), DatasetContext.get_current()
)

Expand Down Expand Up @@ -80,14 +87,14 @@ def __next__(self):
output = result
else:
fn = self._pipeline._stages[i]
self._stages[i + 1] = pipeline_stage.remote(
self._stages[i + 1] = self._stage_runners[i].run.remote(
lambda: fn(result), DatasetContext.get_current()
)

# Pull a new element for the initial slot if possible.
if self._stages[0] is None:
try:
self._stages[0] = pipeline_stage.remote(
self._stages[0] = self._stage_runners[0].run.remote(
next(self._iter), DatasetContext.get_current()
)
except StopIteration:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/impl/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def build(self, final_blocks: BlockList) -> "DatasetStats":
return stats


@ray.remote(num_cpus=0)
@ray.remote(num_cpus=0, placement_group=None)
class _StatsActor:
"""Actor holding stats for blocks created by LazyBlockList.

Expand Down
7 changes: 5 additions & 2 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ def _read_stream(self, f: "pa.NativeFile", path: str, **reader_args):
@pytest.mark.parametrize("pipelined", [False, True])
def test_basic_actors(shutdown_only, pipelined):
ray.init(num_cpus=2)
ds = ray.data.range(5)
n = 5
ds = ray.data.range(n)
ds = maybe_pipeline(ds, pipelined)
assert sorted(ds.map(lambda x: x + 1, compute="actors").take()) == [1, 2, 3, 4, 5]
assert sorted(ds.map(lambda x: x + 1, compute="actors").take()) == list(
range(1, n + 1)
)


@pytest.mark.parametrize("pipelined", [False, True])
Expand Down
25 changes: 25 additions & 0 deletions python/ray/data/tests/test_pipeline_nohang.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest

import ray
from ray.tests.conftest import * # noqa

NUM_REPEATS = 10
NUM_TASKS = 10


# This test can be flaky if there is resource deadlock between the pipeline
# stages. Run it a lot to ensure no regressions.
def test_basic_actors(shutdown_only):
ray.init(num_cpus=2)
for _ in range(NUM_REPEATS):
ds = ray.data.range(NUM_TASKS)
ds = ds.window(blocks_per_window=1)
assert sorted(ds.map(lambda x: x + 1, compute="actors").take()) == list(
range(1, NUM_TASKS + 1)
)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))