Skip to content

Commit

Permalink
Refactor ExecutionPlan to maintain complete lineage and eagerly unlin…
Browse files Browse the repository at this point in the history
…k block references.
  • Loading branch information
clarkzinzow committed Apr 15, 2022
1 parent fb14e82 commit b4a2cbd
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 166 deletions.
45 changes: 17 additions & 28 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2547,22 +2547,22 @@ def repeat(self, times: Optional[int] = None) -> "DatasetPipeline[T]":
to repeat indefinitely.
"""
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.impl.plan import _rewrite_read_stage

# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
if self._plan._is_read_stage() and ctx.optimize_fuse_read_stages:
self._plan._in_blocks.clear()
blocks, read_stage = self._plan._rewrite_read_stage()
outer_stats = DatasetStats(stages={}, parent=None)
if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _ = self._plan._get_source_blocks()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
read_stage = None
outer_stats = self._plan.stats()
read_stage = None
uuid = self._get_uuid()
outer_stats.dataset_uuid = uuid

if times is not None and times < 1:
raise ValueError("`times` must be >= 1, got {}".format(times))
uuid = self._get_uuid()

class Iterator:
def __init__(self, blocks):
Expand Down Expand Up @@ -2660,24 +2660,23 @@ def window(
exclusive with ``blocks_per_window``.
"""
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.impl.plan import _rewrite_read_stage

if blocks_per_window is not None and bytes_per_window is not None:
raise ValueError("Only one windowing scheme can be specified.")

if blocks_per_window is None:
blocks_per_window = 10

# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
if self._plan._is_read_stage() and ctx.optimize_fuse_read_stages:
self._plan._in_blocks.clear()
blocks, read_stage = self._plan._rewrite_read_stage()
outer_stats = DatasetStats(stages={}, parent=None)
if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _ = self._plan._get_source_blocks()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
read_stage = None
outer_stats = self._plan.stats()
read_stage = None

class Iterator:
def __init__(self, splits, epoch):
Expand Down Expand Up @@ -2755,19 +2754,9 @@ def fully_executed(self) -> "Dataset[T]":
Returns:
A Dataset with all blocks fully materialized in memory.
"""
blocks, metadata = [], []
for b, m in self._plan.execute().get_blocks_with_metadata():
blocks.append(b)
metadata.append(m)
ds = Dataset(
ExecutionPlan(
BlockList(blocks, metadata),
self._plan.stats(),
dataset_uuid=self._get_uuid(),
),
self._epoch,
lazy=False,
)
plan = self._plan.deep_copy(preserve_uuid=True)
plan.execute(force_read=True)
ds = Dataset(plan, self._epoch, lazy=False)
ds._set_uuid(self._get_uuid())
return ds

Expand Down
9 changes: 7 additions & 2 deletions python/ray/data/dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,16 +752,21 @@ def _optimize_stages(self):
self._optimized_stages = self._stages
return

# This dummy dataset will be used to get a set of optimized stages.
dummy_ds = Dataset(
ExecutionPlan(BlockList([], []), DatasetStats(stages={}, parent=None)),
0,
True,
)
# Apply all pipeline operations to the dummy dataset.
for stage in self._stages:
dummy_ds = stage(dummy_ds)
dummy_ds._plan._optimize()
# Get the optimized stages.
_, _, stages = dummy_ds._plan._optimize()
# Apply these optimized stages to the datasets underlying the pipeline.
# These optimized stages will be executed by the PipelineExecutor.
optimized_stages = []
for stage in dummy_ds._plan._stages:
for stage in stages:
optimized_stages.append(
lambda ds, stage=stage: Dataset(
ds._plan.with_stage(stage), ds._epoch, True
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/impl/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ def _get_blocks_with_metadata(
self._cached_metadata = metadata
return block_refs, metadata

def compute_to_blocklist(self) -> BlockList:
"""Launch all tasks and return a concrete BlockList."""
blocks, metadata = self._get_blocks_with_metadata()
return BlockList(blocks, metadata)

def compute_first_block(self):
"""Kick off computation for the first block in the list.
Expand Down
Loading

0 comments on commit b4a2cbd

Please sign in to comment.