diff --git a/python/ray/data/_internal/block_list.py b/python/ray/data/_internal/block_list.py index 73a745af5b90..fc93dd692a54 100644 --- a/python/ray/data/_internal/block_list.py +++ b/python/ray/data/_internal/block_list.py @@ -15,11 +15,20 @@ class BlockList: change after execution due to block splitting. """ - def __init__(self, blocks: List[ObjectRef[Block]], metadata: List[BlockMetadata]): + def __init__( + self, + blocks: List[ObjectRef[Block]], + metadata: List[BlockMetadata], + *, + owned_by_consumer: bool, + ): assert len(blocks) == len(metadata), (blocks, metadata) self._blocks: List[ObjectRef[Block]] = blocks self._num_blocks = len(self._blocks) self._metadata: List[BlockMetadata] = metadata + # Whether the block list is owned by consuming APIs, and if so it can be + # eagerly deleted after read by the consumer. + self._owned_by_consumer = owned_by_consumer def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]: """Get the metadata for all blocks.""" @@ -27,7 +36,9 @@ def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]: def copy(self) -> "BlockList": """Perform a shallow copy of this BlockList.""" - return BlockList(self._blocks, self._metadata) + return BlockList( + self._blocks, self._metadata, owned_by_consumer=self._owned_by_consumer + ) def clear(self) -> None: """Erase references to the tasks tracked by the BlockList.""" @@ -57,7 +68,11 @@ def split(self, split_size: int) -> List["BlockList"]: meta = np.array_split(self._metadata, num_splits) output = [] for b, m in zip(blocks, meta): - output.append(BlockList(b.tolist(), m.tolist())) + output.append( + BlockList( + b.tolist(), m.tolist(), owned_by_consumer=self._owned_by_consumer + ) + ) return output def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]: @@ -78,7 +93,11 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]: ) size = m.size_bytes if cur_blocks and cur_size + size > bytes_per_split: - output.append(BlockList(cur_blocks, cur_meta)) + output.append( + BlockList( + cur_blocks, cur_meta, owned_by_consumer=self._owned_by_consumer + ) + ) cur_blocks = [] cur_meta = [] cur_size = 0 @@ -86,7 +105,11 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]: cur_meta.append(m) cur_size += size if cur_blocks: - output.append(BlockList(cur_blocks, cur_meta)) + output.append( + BlockList( + cur_blocks, cur_meta, owned_by_consumer=self._owned_by_consumer + ) + ) return output def size_bytes(self) -> int: @@ -110,8 +133,16 @@ def divide(self, block_idx: int) -> ("BlockList", "BlockList"): """ self._check_if_cleared() return ( - BlockList(self._blocks[:block_idx], self._metadata[:block_idx]), - BlockList(self._blocks[block_idx:], self._metadata[block_idx:]), + BlockList( + self._blocks[:block_idx], + self._metadata[:block_idx], + owned_by_consumer=self._owned_by_consumer, + ), + BlockList( + self._blocks[block_idx:], + self._metadata[block_idx:], + owned_by_consumer=self._owned_by_consumer, + ), ) def get_blocks(self) -> List[ObjectRef[Block]]: @@ -180,4 +211,4 @@ def randomize_block_order(self, seed: Optional[int] = None) -> "BlockList": random.shuffle(blocks_with_metadata) blocks, metadata = map(list, zip(*blocks_with_metadata)) - return BlockList(blocks, metadata) + return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 98e0d0fb1028..140518f9bf1f 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -102,6 +102,7 @@ def _apply( data_refs = [r[0] for r in all_refs] refs = [r[1] for r in all_refs] + in_block_owned_by_consumer = block_list._owned_by_consumer # Release input block references. if clear_input_blocks: del blocks @@ -134,7 +135,11 @@ def _apply( for block, metadata in zip(data_refs, results): new_blocks.append(block) new_metadata.append(metadata) - return BlockList(list(new_blocks), list(new_metadata)) + return BlockList( + list(new_blocks), + list(new_metadata), + owned_by_consumer=in_block_owned_by_consumer, + ) @PublicAPI @@ -213,6 +218,7 @@ def _apply( context = DatasetContext.get_current() blocks_in = block_list.get_blocks_with_metadata() + owned_by_consumer = block_list._owned_by_consumer # Early release block references. if clear_input_blocks: @@ -368,7 +374,9 @@ def map_block_nosplit( new_blocks.append(block) new_metadata.append(metadata_mapping[block]) new_metadata = ray.get(new_metadata) - return BlockList(new_blocks, new_metadata) + return BlockList( + new_blocks, new_metadata, owned_by_consumer=owned_by_consumer + ) except Exception as e: try: diff --git a/python/ray/data/_internal/fast_repartition.py b/python/ray/data/_internal/fast_repartition.py index c63afdc71b25..df66f561ed67 100644 --- a/python/ray/data/_internal/fast_repartition.py +++ b/python/ray/data/_internal/fast_repartition.py @@ -13,7 +13,13 @@ def fast_repartition(blocks, num_blocks): from ray.data.dataset import Dataset wrapped_ds = Dataset( - ExecutionPlan(blocks, DatasetStats(stages={}, parent=None)), 0, lazy=False + ExecutionPlan( + blocks, + DatasetStats(stages={}, parent=None), + run_by_consumer=blocks._owned_by_consumer, + ), + 0, + lazy=False, ) # Compute the (n-1) indices needed for an equal split of the data. count = wrapped_ds.count() @@ -40,6 +46,8 @@ def fast_repartition(blocks, num_blocks): if s.num_blocks() > 0 ] + owned_by_consumer = blocks._owned_by_consumer + # Early-release memory. del splits, blocks, wrapped_ds @@ -71,4 +79,4 @@ def fast_repartition(blocks, num_blocks): new_blocks += empty_blocks new_metadata += empty_metadata - return BlockList(new_blocks, new_metadata), {} + return BlockList(new_blocks, new_metadata, owned_by_consumer=owned_by_consumer), {} diff --git a/python/ray/data/_internal/lazy_block_list.py b/python/ray/data/_internal/lazy_block_list.py index a0af8360c292..44113e239f9d 100644 --- a/python/ray/data/_internal/lazy_block_list.py +++ b/python/ray/data/_internal/lazy_block_list.py @@ -40,6 +40,8 @@ def __init__( cached_metadata: Optional[List[BlockPartitionMetadata]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, stats_uuid: str = None, + *, + owned_by_consumer: bool, ): """Create a LazyBlockList on the provided read tasks. @@ -90,6 +92,9 @@ def __init__( tasks, self._cached_metadata, ) + # Whether the block list is owned by consuming APIs, and if so it can be + # eagerly deleted after read by the consumer. + self._owned_by_consumer = owned_by_consumer def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]: """Get the metadata for all blocks.""" @@ -121,6 +126,7 @@ def copy(self) -> "LazyBlockList": block_partition_meta_refs=self._block_partition_meta_refs.copy(), cached_metadata=self._cached_metadata, ray_remote_args=self._remote_args.copy(), + owned_by_consumer=self._owned_by_consumer, stats_uuid=self._stats_uuid, ) @@ -159,6 +165,7 @@ def split(self, split_size: int) -> List["LazyBlockList"]: b.tolist(), m.tolist(), c.tolist(), + owned_by_consumer=self._owned_by_consumer, ) ) return output @@ -187,6 +194,7 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]: cur_blocks, cur_blocks_meta, cur_cached_meta, + owned_by_consumer=self._owned_by_consumer, ), ) cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta = [], [], [], [] @@ -198,7 +206,13 @@ def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]: cur_size += size if cur_blocks: output.append( - LazyBlockList(cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta) + LazyBlockList( + cur_tasks, + cur_blocks, + cur_blocks_meta, + cur_cached_meta, + owned_by_consumer=self._owned_by_consumer, + ) ) return output @@ -209,12 +223,14 @@ def divide(self, part_idx: int) -> ("LazyBlockList", "LazyBlockList"): self._block_partition_refs[:part_idx], self._block_partition_meta_refs[:part_idx], self._cached_metadata[:part_idx], + owned_by_consumer=self._owned_by_consumer, ) right = LazyBlockList( self._tasks[part_idx:], self._block_partition_refs[part_idx:], self._block_partition_meta_refs[part_idx:], self._cached_metadata[part_idx:], + owned_by_consumer=self._owned_by_consumer, ) return left, right @@ -281,7 +297,7 @@ def _get_blocks_with_metadata( def compute_to_blocklist(self) -> BlockList: """Launch all tasks and return a concrete BlockList.""" blocks, metadata = self._get_blocks_with_metadata() - return BlockList(blocks, metadata) + return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer) def compute_first_block(self): """Kick off computation for the first block in the list. @@ -432,6 +448,7 @@ def randomize_block_order(self, seed: Optional[int] = None) -> "LazyBlockList": block_partition_meta_refs=block_partition_meta_refs, cached_metadata=cached_metadata, ray_remote_args=self._remote_args.copy(), + owned_by_consumer=self._owned_by_consumer, stats_uuid=self._stats_uuid, ) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 65cd45be994e..d08a783ac18d 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -85,13 +85,22 @@ class ExecutionPlan: # execution, any future executions will only have to execute the "after the # snapshot" subchain, using the snapshot as the input to that subchain. - def __init__(self, in_blocks: BlockList, stats: DatasetStats, dataset_uuid=None): + def __init__( + self, + in_blocks: BlockList, + stats: DatasetStats, + dataset_uuid=None, + *, + run_by_consumer: bool, + ): """Create a plan with no transformation stages. Args: in_blocks: Base list of blocks. stats: Stats for the base blocks. dataset_uuid: Dataset's UUID. + run_by_consumer: Whether this plan is invoked to run by the consumption + APIs (e.g. .iter_batches()). """ self._in_blocks = in_blocks self._in_stats = stats @@ -108,6 +117,19 @@ def __init__(self, in_blocks: BlockList, stats: DatasetStats, dataset_uuid=None) if not stats.dataset_uuid: stats.dataset_uuid = self._dataset_uuid + self._run_by_consumer = run_by_consumer + + def __repr__(self) -> str: + return ( + f"ExecutionPlan(" + f"dataset_uuid={self._dataset_uuid}, " + f"run_by_consumer={self._run_by_consumer}, " + f"in_blocks={self._in_blocks}, " + f"stages_before_snapshot={self._stages_before_snapshot}, " + f"stages_after_snapshot={self._stages_after_snapshot}, " + f"snapshot_blocks={self._snapshot_blocks})" + ) + def with_stage(self, stage: "Stage") -> "ExecutionPlan": """Return a copy of this plan with the given stage appended. @@ -130,7 +152,9 @@ def copy(self) -> "ExecutionPlan": Returns: A shallow copy of this execution plan. """ - plan_copy = ExecutionPlan(self._in_blocks, self._in_stats) + plan_copy = ExecutionPlan( + self._in_blocks, self._in_stats, run_by_consumer=self._run_by_consumer + ) if self._snapshot_blocks is not None: # Copy over the existing snapshot. plan_copy._snapshot_blocks = self._snapshot_blocks @@ -157,7 +181,10 @@ def deep_copy(self, preserve_uuid: bool = False) -> "ExecutionPlan": if isinstance(in_blocks, BlockList): in_blocks = in_blocks.copy() plan_copy = ExecutionPlan( - in_blocks, copy.copy(self._in_stats), dataset_uuid=dataset_uuid + in_blocks, + copy.copy(self._in_stats), + dataset_uuid=dataset_uuid, + run_by_consumer=self._run_by_consumer, ) if self._snapshot_blocks: # Copy over the existing snapshot. @@ -278,7 +305,9 @@ def execute( else: clear_input_blocks = False stats_builder = stats.child_builder(stage.name) - blocks, stage_info = stage(blocks, clear_input_blocks) + blocks, stage_info = stage( + blocks, clear_input_blocks, self._run_by_consumer + ) if stage_info: stats = stats_builder.build_multistage(stage_info) else: @@ -618,12 +647,18 @@ def block_fn( ) def __call__( - self, blocks: BlockList, clear_input_blocks: bool + self, blocks: BlockList, clear_input_blocks: bool, run_by_consumer: bool ) -> Tuple[BlockList, dict]: compute = get_compute(self.compute) assert ( self.fn_constructor_args is None and self.fn_constructor_kwargs is None ) or isinstance(compute, ActorPoolStrategy) + + if blocks._owned_by_consumer: + assert ( + run_by_consumer + ), "Blocks owned by consumer can only be consumed by consumer" + blocks = compute._apply( self.block_fn, self.ray_remote_args, @@ -637,6 +672,7 @@ def __call__( fn_constructor_kwargs=self.fn_constructor_kwargs, ) assert isinstance(blocks, BlockList), blocks + blocks._owned_by_consumer = run_by_consumer return blocks, {} @@ -707,12 +743,27 @@ def block_udf(block: Block) -> Iterable[Block]: ) def __call__( - self, blocks: BlockList, clear_input_blocks: bool + self, blocks: BlockList, clear_input_blocks: bool, run_by_consumer: bool ) -> Tuple[BlockList, dict]: + from ray.data._internal.stage_impl import RandomizeBlocksStage + + in_blocks_owned_by_consumer = blocks._owned_by_consumer + if in_blocks_owned_by_consumer: + assert ( + run_by_consumer + ), "Blocks owned by consumer can only be consumed by consumer" blocks, stage_info = self.fn( blocks, clear_input_blocks, self.block_udf, self.ray_remote_args ) assert isinstance(blocks, BlockList), blocks + + # RandomizeBlocksStage is an in-place transformation, so the ownership + # of blocks doesn't change. + if isinstance(self, RandomizeBlocksStage): + blocks._owned_by_consumer = in_blocks_owned_by_consumer + else: + blocks._owned_by_consumer = run_by_consumer + return blocks, stage_info @@ -756,7 +807,9 @@ def _rewrite_read_stage( for read_task in in_blocks._tasks: blocks.append(ray.put(read_task._read_fn)) metadata.append(read_task.get_metadata()) - block_list = BlockList(blocks, metadata) + block_list = BlockList( + blocks, metadata, owned_by_consumer=in_blocks._owned_by_consumer + ) def block_fn(read_fn: Callable[[], Iterator[Block]]) -> Iterator[Block]: for block in read_fn(): @@ -773,7 +826,12 @@ def block_fn(read_fn: Callable[[], Iterator[Block]]) -> Iterator[Block]: stages = stages[1:] name += "->randomize_block_order" - stage = OneToOneStage(name, block_fn, "tasks", remote_args) + stage = OneToOneStage( + name, + block_fn, + "tasks", + remote_args, + ) stats = DatasetStats(stages={}, parent=None) stages.insert(0, stage) return block_list, stats, stages diff --git a/python/ray/data/_internal/push_based_shuffle.py b/python/ray/data/_internal/push_based_shuffle.py index 70da7cbfc9be..902c2b0466de 100644 --- a/python/ray/data/_internal/push_based_shuffle.py +++ b/python/ray/data/_internal/push_based_shuffle.py @@ -380,6 +380,7 @@ def execute( # during map-merge stage, by limiting how many partitions can be # processed concurrently. input_blocks_list = input_blocks.get_blocks() + owned_by_consumer = input_blocks._owned_by_consumer # Preemptively clear the blocks list since we will incrementally delete # the last remaining references as we submit the dependent map tasks # during the map-merge stage. @@ -515,7 +516,14 @@ def merge(*args, **kwargs): "reduce": reduce_stage_metadata, } - return BlockList(list(new_blocks), list(reduce_stage_metadata)), stats + return ( + BlockList( + list(new_blocks), + list(reduce_stage_metadata), + owned_by_consumer=owned_by_consumer, + ), + stats, + ) @staticmethod def _map_partition( diff --git a/python/ray/data/_internal/shuffle.py b/python/ray/data/_internal/shuffle.py index 192adf4dc948..505d79fe5c72 100644 --- a/python/ray/data/_internal/shuffle.py +++ b/python/ray/data/_internal/shuffle.py @@ -92,6 +92,8 @@ def execute( shuffle_map_metadata.append(refs[-1]) shuffle_map_out[i] = refs[:-1] + in_blocks_owned_by_consumer = input_blocks._owned_by_consumer + # Eagerly delete the input block references in order to eagerly release # the blocks' memory. del input_blocks_list @@ -120,4 +122,11 @@ def execute( "reduce": new_metadata, } - return BlockList(list(new_blocks), list(new_metadata)), stats + return ( + BlockList( + list(new_blocks), + list(new_metadata), + owned_by_consumer=in_blocks_owned_by_consumer, + ), + stats, + ) diff --git a/python/ray/data/_internal/stage_impl.py b/python/ray/data/_internal/stage_impl.py index c764dcf3579e..4477f9098c66 100644 --- a/python/ray/data/_internal/stage_impl.py +++ b/python/ray/data/_internal/stage_impl.py @@ -163,7 +163,9 @@ def do_zip(block1: Block, block2: Block) -> (Block, BlockMetadata): # TODO(ekl) it might be nice to have a progress bar here. metadata = ray.get(metadata) - blocks = BlockList(blocks, metadata) + blocks = BlockList( + blocks, metadata, owned_by_consumer=block_list._owned_by_consumer + ) return blocks, {} super().__init__("zip", None, do_zip_all) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 340d445e6885..7bb21560e691 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -147,6 +147,10 @@ class Dataset(Generic[T]): >>> # Save dataset back to external storage system. >>> ds.write_csv("s3//bucket/output") # doctest: +SKIP + Datasets has two kinds of operations: tranformation, which takes in Datasets and + outputs a new Dataset (e.g. :py:meth:`.map_batches()`); and consumption, which + produces values (not Dataset) as output (e.g. :py:meth:`.iter_batches()`). + Datasets supports parallel processing at scale: transformations such as :py:meth:`.map_batches()`, aggregations such as :py:meth:`.min()`/:py:meth:`.max()`/:py:meth:`.mean()`, grouping via @@ -1058,6 +1062,7 @@ def equalize(splits: List[Dataset[T]], num_splits: int) -> List[Dataset[T]]: block_refs, metadata = zip(*blocks.get_blocks_with_metadata()) metadata_mapping = {b: m for b, m in zip(block_refs, metadata)} + owned_by_consumer = blocks._owned_by_consumer if locality_hints is None: ds = equalize( @@ -1065,9 +1070,12 @@ def equalize(splits: List[Dataset[T]], num_splits: int) -> List[Dataset[T]]: Dataset( ExecutionPlan( BlockList( - list(blocks), [metadata_mapping[b] for b in blocks] + list(blocks), + [metadata_mapping[b] for b in blocks], + owned_by_consumer=owned_by_consumer, ), stats, + run_by_consumer=owned_by_consumer, ), self._epoch, self._lazy, @@ -1178,8 +1186,10 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]: BlockList( allocation_per_actor[actor], [metadata_mapping[b] for b in allocation_per_actor[actor]], + owned_by_consumer=owned_by_consumer, ), stats, + run_by_consumer=owned_by_consumer, ), self._epoch, self._lazy, @@ -1223,7 +1233,8 @@ def split_at_indices(self, indices: List[int]) -> List["Dataset[T]"]: if indices[0] < 0: raise ValueError("indices must be positive") start_time = time.perf_counter() - blocks_with_metadata = self._plan.execute().get_blocks_with_metadata() + block_list = self._plan.execute() + blocks_with_metadata = block_list.get_blocks_with_metadata() blocks, metadata = _split_at_indices(blocks_with_metadata, indices) split_duration = time.perf_counter() - start_time parent_stats = self._plan.stats() @@ -1234,8 +1245,11 @@ def split_at_indices(self, indices: List[int]) -> List["Dataset[T]"]: splits.append( Dataset( ExecutionPlan( - BlockList(bs, ms), + BlockList( + bs, ms, owned_by_consumer=block_list._owned_by_consumer + ), stats, + run_by_consumer=block_list._owned_by_consumer, ), self._epoch, self._lazy, @@ -1329,6 +1343,7 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": start_time = time.perf_counter() + owned_by_consumer = self._plan.execute()._owned_by_consumer datasets = [self] + list(other) bls = [] has_nonlazy = False @@ -1347,7 +1362,7 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": bs, ms = bl._blocks, bl._metadata blocks.extend(bs) metadata.extend(ms) - blocklist = BlockList(blocks, metadata) + blocklist = BlockList(blocks, metadata, owned_by_consumer=owned_by_consumer) else: tasks: List[ReadTask] = [] block_partition_refs: List[ObjectRef[BlockPartition]] = [] @@ -1357,7 +1372,10 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": block_partition_refs.extend(bl._block_partition_refs) block_partition_meta_refs.extend(bl._block_partition_meta_refs) blocklist = LazyBlockList( - tasks, block_partition_refs, block_partition_meta_refs + tasks, + block_partition_refs, + block_partition_meta_refs, + owned_by_consumer=owned_by_consumer, ) epochs = [ds._get_epoch() for ds in datasets] @@ -1378,7 +1396,7 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]": ) dataset_stats.time_total_s = time.perf_counter() - start_time return Dataset( - ExecutionPlan(blocklist, dataset_stats), + ExecutionPlan(blocklist, dataset_stats, run_by_consumer=owned_by_consumer), max_epoch, self._lazy, ) @@ -3037,7 +3055,9 @@ def __next__(self) -> "Dataset[T]": def gen(): ds = Dataset( - ExecutionPlan(blocks, outer_stats, dataset_uuid=uuid), + ExecutionPlan( + blocks, outer_stats, dataset_uuid=uuid, run_by_consumer=True + ), epoch, lazy=False, ) @@ -3151,7 +3171,9 @@ def __next__(self) -> "Dataset[T]": def gen(): ds = Dataset( - ExecutionPlan(blocks, outer_stats), self._epoch, lazy=True + ExecutionPlan(blocks, outer_stats, run_by_consumer=True), + self._epoch, + lazy=True, ) return ds @@ -3400,7 +3422,8 @@ def _split( self, index: int, return_right_half: bool ) -> ("Dataset[T]", "Dataset[T]"): start_time = time.perf_counter() - blocks_with_metadata = self._plan.execute().get_blocks_with_metadata() + block_list = self._plan.execute() + blocks_with_metadata = block_list.get_blocks_with_metadata() left_blocks, left_metadata, right_blocks, right_metadata = _split_at_index( blocks_with_metadata, index, @@ -3424,8 +3447,13 @@ def _split( left_dataset_stats.time_total_s = split_duration left = Dataset( ExecutionPlan( - BlockList(left_blocks, left_metadata), + BlockList( + left_blocks, + left_metadata, + owned_by_consumer=block_list._owned_by_consumer, + ), left_dataset_stats, + run_by_consumer=block_list._owned_by_consumer, ), self._epoch, self._lazy, @@ -3448,8 +3476,13 @@ def _split( right_dataset_stats.time_total_s = split_duration right = Dataset( ExecutionPlan( - BlockList(right_blocks, right_metadata), + BlockList( + right_blocks, + right_metadata, + owned_by_consumer=block_list._owned_by_consumer, + ), right_dataset_stats, + run_by_consumer=block_list._owned_by_consumer, ), self._epoch, self._lazy, @@ -3459,10 +3492,21 @@ def _split( return left, right def _divide(self, block_idx: int) -> ("Dataset[T]", "Dataset[T]"): - left, right = self._plan.execute().divide(block_idx) - l_ds = Dataset(ExecutionPlan(left, self._plan.stats()), self._epoch, self._lazy) + block_list = self._plan.execute() + left, right = block_list.divide(block_idx) + l_ds = Dataset( + ExecutionPlan( + left, self._plan.stats(), run_by_consumer=block_list._owned_by_consumer + ), + self._epoch, + self._lazy, + ) r_ds = Dataset( - ExecutionPlan(right, self._plan.stats()), self._epoch, self._lazy + ExecutionPlan( + right, self._plan.stats(), run_by_consumer=block_list._owned_by_consumer + ), + self._epoch, + self._lazy, ) return l_ds, r_ds diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index b545be1bc330..8cca433d1731 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -69,7 +69,6 @@ class DatasetPipeline(Generic[T]): def __init__( self, base_iterable: Iterable[Callable[[], Dataset[T]]], - base_datasets_can_be_cleared: bool, stages: List[Callable[[Dataset[Any]], Dataset[Any]]] = None, length: int = None, progress_bars: bool = progress_bar._enabled, @@ -82,11 +81,6 @@ def __init__( ``DatasetPipeline.from_iterable()`` methods to construct a pipeline. """ self._base_iterable = base_iterable - # Whether the base datasets (from _base_iterable) can be cleared - # after read. For example, if the base datasets are generated by - # the other pipeline, they can be safely cleared after read, ensured - # by the DatasetPipeline API semantics that it can only be read once. - self._base_datasets_can_be_cleared = base_datasets_can_be_cleared self._stages = stages or [] self._optimized_stages = None self._length = length @@ -188,11 +182,17 @@ def iter_batches( if self._executed[0]: raise RuntimeError("Pipeline cannot be read multiple times.") time_start = time.perf_counter() + if self._first_dataset is not None: + blocks_owned_by_consumer = ( + self._first_dataset._plan.execute()._owned_by_consumer + ) + else: + blocks_owned_by_consumer = self._peek()._plan.execute()._owned_by_consumer yield from batch_blocks( self._iter_blocks(), self._stats, prefetch_blocks=prefetch_blocks, - clear_block_after_read=self._can_clear_output_blocks_after_read(), + clear_block_after_read=blocks_owned_by_consumer, batch_size=batch_size, batch_format=batch_format, drop_last=drop_last, @@ -356,7 +356,6 @@ def __next__(self): # overwhelm the console. DatasetPipeline( SplitIterator(idx, coordinator), - self._can_clear_output_blocks_after_read(), length=self._length, progress_bars=False, ) @@ -432,7 +431,6 @@ def __iter__(self): return DatasetPipeline( WindowIterable(self.iter_datasets()), - self._can_clear_output_blocks_after_read(), length=length, ) @@ -515,7 +513,6 @@ def __iter__(self): return DatasetPipeline( RepeatIterable(iter(self._base_iterable)), - self._base_datasets_can_be_cleared, stages=self._stages.copy(), length=length, ) @@ -1035,7 +1032,6 @@ def foreach_window( raise RuntimeError("Pipeline cannot be read multiple times.") return DatasetPipeline( self._base_iterable, - self._base_datasets_can_be_cleared, self._stages + [fn], self._length, self._progress_bars, @@ -1093,7 +1089,11 @@ def _optimize_stages(self): # This dummy dataset will be used to get a set of optimized stages. dummy_ds = Dataset( - ExecutionPlan(BlockList([], []), DatasetStats(stages={}, parent=None)), + ExecutionPlan( + BlockList([], [], owned_by_consumer=True), + DatasetStats(stages={}, parent=None), + run_by_consumer=True, + ), 0, True, ) @@ -1106,10 +1106,13 @@ def _optimize_stages(self): # These optimized stages will be executed by the PipelineExecutor. optimized_stages = [] for stage in stages: + + def add_stage(ds, stage): + ds._plan._run_by_consumer = True + return ds._plan.with_stage(stage) + optimized_stages.append( - lambda ds, stage=stage: Dataset( - ds._plan.with_stage(stage), ds._epoch, True - ) + lambda ds, stage=stage: Dataset(add_stage(ds, stage), ds._epoch, True) ) self._optimized_stages = optimized_stages @@ -1120,17 +1123,6 @@ def _peek(self) -> Dataset[T]: self._first_dataset = next(self._dataset_iter) return self._first_dataset - def _can_clear_output_blocks_after_read(self) -> bool: - # 1) If this pipeline actually performed transformations (i.e. the self._stages - # isn't empty), the output blocks are created by this pipeline and are safe - # to clear right after read, because we know they will never be accessed again, - # given that DatasetPipeline can be read at most once. - # 2) If there is no transformation performed (i.e. self._stages is empty), this - # pipeline will simply access the base datasets (from self._base_iterable), - # so whether we can clear the output blocks depends on the - # self._base_datasets_can_be_cleared. - return len(self._stages) > 0 or self._base_datasets_can_be_cleared - def _write_each_dataset(self, write_fn: Callable[[Dataset[T]], None]) -> None: """Write output for each dataset. diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index b405dc0ce9e8..1710cbb55577 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -109,8 +109,9 @@ def from_items(items: List[Any], *, parallelism: int = -1) -> Dataset[Any]: return Dataset( ExecutionPlan( - BlockList(blocks, metadata), + BlockList(blocks, metadata, owned_by_consumer=False), DatasetStats(stages={"from_items": metadata}, parent=None), + run_by_consumer=False, ), 0, False, @@ -304,12 +305,14 @@ def read_datasource( ): ray_remote_args["scheduling_strategy"] = "SPREAD" - block_list = LazyBlockList(read_tasks, ray_remote_args=ray_remote_args) + block_list = LazyBlockList( + read_tasks, ray_remote_args=ray_remote_args, owned_by_consumer=False + ) block_list.compute_first_block() block_list.ensure_metadata_for_first_block() return Dataset( - ExecutionPlan(block_list, block_list.stats()), + ExecutionPlan(block_list, block_list.stats(), run_by_consumer=False), 0, False, ) @@ -889,8 +892,9 @@ def from_pandas_refs( metadata = ray.get([get_metadata.remote(df) for df in dfs]) return Dataset( ExecutionPlan( - BlockList(dfs, metadata), + BlockList(dfs, metadata, owned_by_consumer=False), DatasetStats(stages={"from_pandas_refs": metadata}, parent=None), + run_by_consumer=False, ), 0, False, @@ -903,8 +907,9 @@ def from_pandas_refs( metadata = ray.get(metadata) return Dataset( ExecutionPlan( - BlockList(blocks, metadata), + BlockList(blocks, metadata, owned_by_consumer=False), DatasetStats(stages={"from_pandas_refs": metadata}, parent=None), + run_by_consumer=False, ), 0, False, @@ -961,8 +966,9 @@ def from_numpy_refs( metadata = ray.get(metadata) return Dataset( ExecutionPlan( - BlockList(blocks, metadata), + BlockList(blocks, metadata, owned_by_consumer=False), DatasetStats(stages={"from_numpy_refs": metadata}, parent=None), + run_by_consumer=False, ), 0, False, @@ -1012,8 +1018,9 @@ def from_arrow_refs( metadata = ray.get([get_metadata.remote(t) for t in tables]) return Dataset( ExecutionPlan( - BlockList(tables, metadata), + BlockList(tables, metadata, owned_by_consumer=False), DatasetStats(stages={"from_arrow_refs": metadata}, parent=None), + run_by_consumer=False, ), 0, False, diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index f87aabb3a442..014a5cc02293 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -4427,6 +4427,32 @@ def flaky_mapper(x): ).take() +def test_split_is_not_disruptive(ray_start_regular): + ds = ( + ray.data.range(100, parallelism=10).map_batches(lambda x: x).experimental_lazy() + ) + + def verify_integrity(splits): + for dss in splits: + for batch in dss.iter_batches(): + pass + for batch in ds.iter_batches(): + pass + + # No block splitting invovled: split 10 even blocks into 2 groups. + verify_integrity(ds.split(2, equal=True)) + # Block splitting invovled: split 10 even blocks into 3 groups. + verify_integrity(ds.split(3, equal=True)) + + # Same as above but having tranforms post converting to lazy. + verify_integrity(ds.map_batches(lambda x: x).split(2, equal=True)) + verify_integrity(ds.map_batches(lambda x: x).split(3, equal=True)) + + # Same as above but having in-place tranforms post converting to lazy. + verify_integrity(ds.randomize_block_order().split(2, equal=True)) + verify_integrity(ds.randomize_block_order().split(3, equal=True)) + + def test_datasource(ray_start_regular): source = ray.data.datasource.RandomIntRowDatasource() assert len(ray.data.read_datasource(source, n=10, num_columns=2).take()) == 10 diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index bf93a5411349..39b57515e9f1 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -627,68 +627,128 @@ def test_randomize_block_order_each_window(ray_start_regular_shared): assert pipe.take() == [0, 1, 4, 5, 2, 3, 6, 7, 10, 11, 8, 9] -def test_preserve_whether_base_datasets_can_be_cleared(ray_start_regular_shared): - ds = ray.data.from_items([1, 3, 4, 5]) - pipe = ds.repeat() - # pipe is directly consuming blocks of dataset, so cannot clear base dataset. - assert not pipe._base_datasets_can_be_cleared - # pipe has no transformation, so output blocks are blocks of dataset, so cannot - # be cleared. - assert not pipe._can_clear_output_blocks_after_read() - - pipe = ds.repeat().map_batches(lambda x: x) - assert not pipe._base_datasets_can_be_cleared - # pipe now has transformation, so output blocks are created on-the-fly and can - # be cleared. - assert pipe._can_clear_output_blocks_after_read() - - # rewindow(): collapse previous stages when creating new DatasetPipeline. - pipe = ds.repeat().map_batches(lambda x: x).rewindow(blocks_per_window=1) - assert len(pipe._stages) == 0 - # The base datasets are generated by a pipeline prior to rewindow, so we - # can clear them. - assert pipe._base_datasets_can_be_cleared - # When base datasets can be cleared, we can surely clear output blocks from this - # pipeline. - assert pipe._can_clear_output_blocks_after_read() - - # split(): Similar to rewindow, pipeline splitting will also collapse previous - # stages, so we expect same. - p1, p2 = ds.repeat().map_batches(lambda x: x).split(2) - assert len(p1._stages) == 0 - assert len(p2._stages) == 0 - assert p1._base_datasets_can_be_cleared - assert p2._base_datasets_can_be_cleared - assert p1._can_clear_output_blocks_after_read() - assert p2._can_clear_output_blocks_after_read() - - # foreach_window(): will preserve the _base_datasets_can_be_cleared. - p1 = ds.repeat() - p2 = p1.foreach_window(lambda x: x) - assert p1._base_datasets_can_be_cleared == p2._base_datasets_can_be_cleared - assert not p2._base_datasets_can_be_cleared - p1 = ds.repeat().map_batches(lambda x: x).rewindow(blocks_per_window=1) - p2 = p1.foreach_window(lambda x: x) - assert p1._base_datasets_can_be_cleared == p2._base_datasets_can_be_cleared - assert p2._base_datasets_can_be_cleared - - # repeat(): will preserve the _base_datasets_can_be_cleared. - p1 = ds.window(blocks_per_window=1) - p2 = p1.repeat() - assert p1._base_datasets_can_be_cleared == p2._base_datasets_can_be_cleared - assert not p2._base_datasets_can_be_cleared - p1 = ds.window(blocks_per_window=1).map_batches(lambda x: x) - p2 = p1.repeat() - assert p1._base_datasets_can_be_cleared == p2._base_datasets_can_be_cleared - assert not p2._base_datasets_can_be_cleared - - -def test_add_and_drop_columns(ray_start_regular_shared): +def test_drop_columns(ray_start_regular_shared): df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) ds = ray.data.from_pandas(df) pipe = ds.repeat() - pipe = pipe.add_column("col4", lambda _: 1) - assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3, "col4": 1}] + assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] + + +def test_in_place_transformation_doesnt_clear_objects(ray_start_regular_shared): + ds = ray.data.from_items([1, 2, 3, 4, 5, 6]) + + def verify_integrity(p): + # The pipeline's output blocks are from original dataset (i.e. not created + # by the pipeline itself), so those blocks must not be cleared -- verified + # below by re-reading the dataset. + for b in p.iter_batches(): + pass + # Verify the integrity of the blocks of original dataset. + assert ds.take_all() == [1, 2, 3, 4, 5, 6] + + verify_integrity(ds.repeat(10).randomize_block_order_each_window()) + verify_integrity( + ds.repeat(10) + .randomize_block_order_each_window() + .randomize_block_order_each_window() + ) + # Mix in-place and non-in place transforms. + verify_integrity( + ds.repeat(10).map_batches(lambda x: x).randomize_block_order_each_window() + ) + verify_integrity( + ds.repeat(10).randomize_block_order_each_window().map_batches(lambda x: x) + ) + + +def test_in_place_transformation_split_doesnt_clear_objects(ray_start_regular_shared): + ds = ray.data.from_items([1, 2, 3, 4, 5, 6], parallelism=3) + + @ray.remote + def consume(p): + for batch in p.iter_batches(): + pass + + def verify_integrity(p): + # Divide 3 blocks ([1, 2], [3, 4] and [5, 6]) into 2 splits equally must + # have one block got splitted. Since the blocks are not created by the + # pipeline (randomize_block_order_each_window() didn't create new + # blocks since it's in-place), so the block splitting will not clear + # the input block -- verified below by re-reading the dataset. + splits = p.split(2, equal=True) + ray.get([consume.remote(p) for p in splits]) + # Verify the integrity of the blocks of original dataset + assert ds.take_all() == [1, 2, 3, 4, 5, 6] + + verify_integrity(ds.repeat(10).randomize_block_order_each_window()) + verify_integrity( + ds.repeat(10) + .randomize_block_order_each_window() + .randomize_block_order_each_window() + ) + # TODO(https://github.com/ray-project/ray/issues/26766): re-enable this after the + # bug is fixed. + # verify_integrity( + # ds.repeat(10).randomize_block_order_each_window().rewindow(blocks_per_window=1) + # ) + # Mix in-place and non-in place transforms. + verify_integrity( + ds.repeat(10) + .randomize_block_order_each_window() + .randomize_block_order_each_window() + .map_batches(lambda x: x) + ) + verify_integrity( + ds.repeat(10) + .map_batches(lambda x: x) + .randomize_block_order_each_window() + .randomize_block_order_each_window() + ) + + +def test_if_blocks_owned_by_consumer(ray_start_regular_shared): + ds = ray.data.from_items([1, 2, 3, 4, 5, 6], parallelism=3) + assert not ds._plan.execute()._owned_by_consumer + assert not ds.randomize_block_order()._plan.execute()._owned_by_consumer + assert not ds.map_batches(lambda x: x)._plan.execute()._owned_by_consumer + + def verify_blocks(pipe, owned_by_consumer): + for ds in pipe.iter_datasets(): + assert ds._plan.execute()._owned_by_consumer == owned_by_consumer + + verify_blocks(ds.repeat(1), False) + verify_blocks(ds.repeat(1).randomize_block_order_each_window(), False) + verify_blocks(ds.repeat(1).randomize_block_order_each_window().repeat(2), False) + verify_blocks( + ds.repeat(1).randomize_block_order_each_window().map_batches(lambda x: x), True + ) + verify_blocks(ds.repeat(1).map_batches(lambda x: x), True) + verify_blocks(ds.repeat(1).map(lambda x: x), True) + verify_blocks(ds.repeat(1).filter(lambda x: x > 3), True) + verify_blocks(ds.repeat(1).sort_each_window(), True) + verify_blocks(ds.repeat(1).random_shuffle_each_window(), True) + verify_blocks(ds.repeat(1).repartition_each_window(2), True) + verify_blocks(ds.repeat(1).rewindow(blocks_per_window=1), False) + verify_blocks(ds.repeat(1).rewindow(blocks_per_window=1).repeat(2), False) + verify_blocks( + ds.repeat(1).map_batches(lambda x: x).rewindow(blocks_per_window=1), True + ) + verify_blocks( + ds.repeat(1).rewindow(blocks_per_window=1).map_batches(lambda x: x), True + ) + + @ray.remote + def consume(pipe, owned_by_consumer): + verify_blocks(pipe, owned_by_consumer) + + splits = ds.repeat(1).split(2) + ray.get([consume.remote(splits[0], False), consume.remote(splits[1], False)]) + + splits = ds.repeat(1).randomize_block_order_each_window().split(2) + ray.get([consume.remote(splits[0], False), consume.remote(splits[1], False)]) + + splits = ds.repeat(1).map_batches(lambda x: x).split(2) + ray.get([consume.remote(splits[0], True), consume.remote(splits[1], True)]) if __name__ == "__main__":