From 7d66eff2c589f59cae7afd9ed1af4a4a5daa539d Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 6 Jan 2023 12:54:54 -0800 Subject: [PATCH] [Datasets] Enable lazy execution by default (#31286) This PR is to enable lazy execution by default. See https://github.com/ray-project/enhancements/pull/19 for motivation. The change includes: * Change `Dataset` constructor: `Dataset.__init__(lazy: bool = True)`. Also remove `defer_execution` field, as it's no longer needed. * `read_api.py:read_datasource()` returns a lazy `Dataset` with computing the first input block. * Add `ds.fully_executed()` calls to required unit tests, to make sure they are passing. TODO: - [x] Fix all unit tests - [x] https://github.com/ray-project/ray/pull/31459 - [x] https://github.com/ray-project/ray/pull/31460 - [ ] Remove the behavior to eagerly compute first block for read - [ ] https://github.com/ray-project/ray/issues/31417 - [ ] Update documentation Signed-off-by: tmynn --- .../datasets_train/datasets_train.py | 2 +- python/ray/data/_internal/plan.py | 5 +++ python/ray/data/dataset.py | 30 ++++++++--------- python/ray/data/read_api.py | 6 ++-- python/ray/data/tests/test_dataset.py | 32 +++++++++++++------ python/ray/data/tests/test_dataset_parquet.py | 5 ++- .../data/tests/test_dynamic_block_split.py | 4 +++ python/ray/data/tests/test_optimize.py | 15 ++------- python/ray/data/tests/test_stats.py | 6 ++-- python/ray/train/batch_predictor.py | 6 +++- 10 files changed, 64 insertions(+), 47 deletions(-) diff --git a/doc/source/ray-core/_examples/datasets_train/datasets_train.py b/doc/source/ray-core/_examples/datasets_train/datasets_train.py index 111e4d81a8dd..990e77c09527 100644 --- a/doc/source/ray-core/_examples/datasets_train/datasets_train.py +++ b/doc/source/ray-core/_examples/datasets_train/datasets_train.py @@ -580,7 +580,7 @@ def train_func(config): read_dataset(data_path) ) - num_columns = len(train_dataset.schema().names) + num_columns = len(train_dataset.schema(fetch_if_missing=True).names) # remove label column. num_features = num_columns - 1 diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 7d12acccfb5c..76a744db78a5 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -251,6 +251,11 @@ def schema( self.execute() else: return None + elif self._in_blocks is not None and self._snapshot_blocks is None: + # If the plan only has input blocks, we execute it, so snapshot has output. + # This applies to newly created dataset. For example, initial dataset from + # read, and output datasets of Dataset.split(). + self.execute() # Snapshot is now guaranteed to be the output of the final stage or None. blocks = self._snapshot_blocks if not blocks: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 9798c51ee51e..e39fea4fe204 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -177,19 +177,19 @@ class Dataset(Generic[T]): >>> ds = ray.data.range(1000) >>> # Transform in parallel with map_batches(). >>> ds.map_batches(lambda batch: [v * 2 for v in batch]) - Dataset(num_blocks=..., num_rows=1000, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Compute max. >>> ds.max() 999 >>> # Group the data. >>> ds.groupby(lambda x: x % 3).count() - Dataset(num_blocks=..., num_rows=3, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() - Dataset(num_blocks=..., num_rows=1000, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Sort it back in order. >>> ds.sort() - Dataset(num_blocks=..., num_rows=1000, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors without incurring a copy. Datasets support @@ -202,9 +202,7 @@ def __init__( self, plan: ExecutionPlan, epoch: int, - lazy: bool, - *, - defer_execution: bool = False, + lazy: bool = True, ): """Construct a Dataset (internal API). @@ -219,7 +217,7 @@ def __init__( self._epoch = epoch self._lazy = lazy - if not lazy and not defer_execution: + if not lazy: self._plan.execute(allow_clear_input_blocks=False) @staticmethod @@ -243,7 +241,7 @@ def map( >>> # Transform python objects. >>> ds = ray.data.range(1000) >>> ds.map(lambda x: x * 2) - Dataset(num_blocks=..., num_rows=1000, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Transform Arrow records. >>> ds = ray.data.from_items( ... [{"value": i} for i in range(1000)]) @@ -804,7 +802,7 @@ def flat_map( >>> import ray >>> ds = ray.data.range(1000) >>> ds.flat_map(lambda x: [x, x ** 2, x ** 3]) - Dataset(num_blocks=..., num_rows=3000, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) Time complexity: O(dataset size / parallelism) @@ -872,7 +870,7 @@ def filter( >>> import ray >>> ds = ray.data.range(100) >>> ds.filter(lambda x: x % 2 == 0) - Dataset(num_blocks=..., num_rows=50, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) Time complexity: O(dataset size / parallelism) @@ -966,10 +964,10 @@ def random_shuffle( >>> ds = ray.data.range(100) >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() - Dataset(num_blocks=..., num_rows=100, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Shuffle this dataset with a fixed random seed. >>> ds.random_shuffle(seed=12345) - Dataset(num_blocks=..., num_rows=100, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) Time complexity: O(dataset size / parallelism) @@ -1012,7 +1010,7 @@ def randomize_block_order( """ plan = self._plan.with_stage(RandomizeBlocksStage(seed)) - return Dataset(plan, self._epoch, self._lazy, defer_execution=True) + return Dataset(plan, self._epoch, self._lazy) def random_sample( self, fraction: float, *, seed: Optional[int] = None @@ -1533,7 +1531,7 @@ def groupby(self, key: Optional[KeyFn]) -> "GroupedDataset[T]": >>> import ray >>> # Group by a key function and aggregate. >>> ray.data.range(100).groupby(lambda x: x % 3).count() - Dataset(num_blocks=..., num_rows=3, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Group by an Arrow table column and aggregate. >>> ray.data.from_items([ ... {"A": x % 3, "B": x} for x in range(100)]).groupby( @@ -1933,7 +1931,7 @@ def sort( >>> # Sort using the entire record as the key. >>> ds = ray.data.range(100) >>> ds.sort() - Dataset(num_blocks=..., num_rows=100, schema=) + Dataset(num_blocks=..., num_rows=..., schema=...) >>> # Sort by a single column in descending order. >>> ds = ray.data.from_items( ... [{"value": i} for i in range(1000)]) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 65a75914b2bd..1bcf7264a58a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -339,9 +339,9 @@ def read_datasource( block_list.ensure_metadata_for_first_block() return Dataset( - ExecutionPlan(block_list, block_list.stats(), run_by_consumer=False), - 0, - False, + plan=ExecutionPlan(block_list, block_list.stats(), run_by_consumer=False), + epoch=0, + lazy=True, ) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 3aac96fdaf2f..d9b991b5f8a4 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -337,10 +337,10 @@ def test_zip(ray_start_regular_shared): ds1 = ray.data.range(5, parallelism=5) ds2 = ray.data.range(5, parallelism=5).map(lambda x: x + 1) ds = ds1.zip(ds2) - assert ds.schema() == tuple + assert ds.schema(fetch_if_missing=True) == tuple assert ds.take() == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)] with pytest.raises(ValueError): - ds.zip(ray.data.range(3)) + ds.zip(ray.data.range(3)).fully_executed() def test_zip_pandas(ray_start_regular_shared): @@ -366,8 +366,8 @@ def test_zip_arrow(ray_start_regular_shared): lambda r: {"a": r["value"] + 1, "b": r["value"] + 2} ) ds = ds1.zip(ds2) - assert "{id: int64, a: int64, b: int64}" in str(ds) assert ds.count() == 5 + assert "{id: int64, a: int64, b: int64}" in str(ds) result = [r.as_pydict() for r in ds.take()] assert result[0] == {"id": 0, "a": 1, "b": 2} @@ -749,6 +749,7 @@ def test_tensors_sort(ray_start_regular_shared): def test_tensors_inferred_from_map(ray_start_regular_shared): # Test map. ds = ray.data.range(10, parallelism=10).map(lambda _: np.ones((4, 4))) + ds.fully_executed() assert str(ds) == ( "Dataset(num_blocks=10, num_rows=10, " "schema={__value__: ArrowTensorType(shape=(4, 4), dtype=double)})" @@ -758,6 +759,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(16, parallelism=4).map_batches( lambda _: np.ones((3, 4, 4)), batch_size=2 ) + ds.fully_executed() assert str(ds) == ( "Dataset(num_blocks=4, num_rows=24, " "schema={__value__: ArrowTensorType(shape=(4, 4), dtype=double)})" @@ -767,6 +769,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(10, parallelism=10).flat_map( lambda _: [np.ones((4, 4)), np.ones((4, 4))] ) + ds.fully_executed() assert str(ds) == ( "Dataset(num_blocks=10, num_rows=20, " "schema={__value__: ArrowTensorType(shape=(4, 4), dtype=double)})" @@ -776,6 +779,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ds = ray.data.range(16, parallelism=4).map_batches( lambda _: pd.DataFrame({"a": [np.ones((4, 4))] * 3}), batch_size=2 ) + ds.fully_executed() assert str(ds) == ( "Dataset(num_blocks=4, num_rows=24, " "schema={a: TensorDtype(shape=(4, 4), dtype=float64)})" @@ -785,6 +789,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): lambda _: pd.DataFrame({"a": [np.ones((2, 2)), np.ones((3, 3))]}), batch_size=2, ) + ds.fully_executed() assert str(ds) == ( "Dataset(num_blocks=4, num_rows=16, " "schema={a: TensorDtype(shape=(None, None), dtype=float64)})" @@ -1456,16 +1461,19 @@ def test_empty_dataset(ray_start_regular_shared): ds = ray.data.range(1) ds = ds.filter(lambda x: x > 1) + ds.fully_executed() assert str(ds) == "Dataset(num_blocks=1, num_rows=0, schema=Unknown schema)" # Test map on empty dataset. ds = ray.data.from_items([]) ds = ds.map(lambda x: x) + ds.fully_executed() assert ds.count() == 0 # Test filter on empty dataset. ds = ray.data.from_items([]) ds = ds.filter(lambda: True) + ds.fully_executed() assert ds.count() == 0 @@ -1473,7 +1481,9 @@ def test_schema(ray_start_regular_shared): ds = ray.data.range(10, parallelism=10) ds2 = ray.data.range_table(10, parallelism=10) ds3 = ds2.repartition(5) + ds3.fully_executed() ds4 = ds3.map(lambda x: {"a": "hi", "b": 1.0}).limit(5).repartition(1) + ds4.fully_executed() assert str(ds) == "Dataset(num_blocks=10, num_rows=10, schema=)" assert str(ds2) == "Dataset(num_blocks=10, num_rows=10, schema={value: int64})" assert str(ds3) == "Dataset(num_blocks=5, num_rows=10, schema={value: int64})" @@ -2284,7 +2294,7 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): ] # Test dropping non-existent column with pytest.raises(KeyError): - ds.drop_columns(["dummy_col", "col1", "col2"]) + ds.drop_columns(["dummy_col", "col1", "col2"]).fully_executed() def test_select_columns(ray_start_regular_shared): @@ -2315,13 +2325,13 @@ def test_select_columns(ray_start_regular_shared): ] # Test selecting a column that is not in the dataset schema with pytest.raises(KeyError): - each_ds.select_columns(cols=["col1", "col2", "dummy_col"]) + each_ds.select_columns(cols=["col1", "col2", "dummy_col"]).fully_executed() # Test simple ds3 = ray.data.range(10) assert ds3.dataset_format() == "simple" with pytest.raises(ValueError): - ds3.select_columns(cols=[]) + ds3.select_columns(cols=[]).fully_executed() def test_map_batches_basic(ray_start_regular_shared, tmp_path): @@ -2684,11 +2694,13 @@ def mutate(df): ds = ray.data.range_table(num_rows, parallelism=num_blocks).repartition(num_blocks) # Convert to Pandas blocks. ds = ds.map_batches(lambda df: df, batch_format="pandas", batch_size=None) + ds.fully_executed() # Apply UDF that mutates the batches, which should fail since the batch is # read-only. with pytest.raises(ValueError, match="tried to mutate a zero-copy read-only batch"): - ds.map_batches(mutate, batch_size=batch_size, zero_copy_batch=True) + ds = ds.map_batches(mutate, batch_size=batch_size, zero_copy_batch=True) + ds.fully_executed() BLOCK_BUNDLING_TEST_CASES = [ @@ -2710,10 +2722,12 @@ def test_map_batches_block_bundling_auto( # Blocks should be bundled up to the batch size. ds1 = ds.map_batches(lambda x: x, batch_size=batch_size) + ds1.fully_executed() assert ds1.num_blocks() == math.ceil(num_blocks / max(batch_size // block_size, 1)) # Blocks should not be bundled up when batch_size is not specified. ds2 = ds.map_batches(lambda x: x) + ds2.fully_executed() assert ds2.num_blocks() == num_blocks @@ -2796,7 +2810,7 @@ def good_fn(row): ds = ray.data.range(10, parallelism=1) error_message = "Current row has different columns compared to previous rows." with pytest.raises(ValueError) as e: - ds.map(bad_fn) + ds.map(bad_fn).fully_executed() assert error_message in str(e.value) ds_map = ds.map(good_fn) assert ds_map.take() == [{"a": "hello1", "b": "hello2"} for _ in range(10)] @@ -5364,7 +5378,7 @@ def f(x): compute_strategy = ray.data.ActorPoolStrategy() ray.data.range(10, parallelism=10).map_batches( f, batch_size=1, compute=compute_strategy - ) + ).fully_executed() expected_max_num_workers = math.ceil( num_cpus * (1 / compute_strategy.ready_to_total_workers_ratio) ) diff --git a/python/ray/data/tests/test_dataset_parquet.py b/python/ray/data/tests/test_dataset_parquet.py index 09bffcf5a908..6bf861e416ed 100644 --- a/python/ray/data/tests/test_dataset_parquet.py +++ b/python/ray/data/tests/test_dataset_parquet.py @@ -194,7 +194,10 @@ def prefetch_file_metadata(self, pieces): # Expect precomputed row counts and block sizes to be missing. assert ds._meta_count() is None - assert ds._plan._snapshot_blocks.size_bytes() == -1 + assert ( + ds._plan._snapshot_blocks is None + or ds._plan._snapshot_blocks.size_bytes() == -1 + ) # Expect to lazily compute all metadata correctly. assert ds._plan.execute()._num_computed() == 1 diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index ad5aa1646850..f290807df435 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -82,10 +82,13 @@ def test_dataset( assert ds.size_bytes() >= 0.7 * block_size * num_blocks * num_tasks map_ds = ds.map_batches(lambda x: x) + map_ds.fully_executed() assert map_ds.num_blocks() == num_tasks map_ds = ds.map_batches(lambda x: x, batch_size=num_blocks * num_tasks) + map_ds.fully_executed() assert map_ds.num_blocks() == 1 map_ds = ds.map(lambda x: x) + map_ds.fully_executed() assert map_ds.num_blocks() == num_blocks * num_tasks ds_list = ds.split(5) @@ -109,6 +112,7 @@ def test_dataset( assert ds.groupby("one").count().count() == num_blocks * num_tasks new_ds = ds.zip(ds) + new_ds.fully_executed() assert new_ds.num_blocks() == num_blocks * num_tasks assert len(ds.take(5)) == 5 diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index a374ee564e31..fee16b1bcc12 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -63,6 +63,7 @@ def test_memory_sanity(shutdown_only): info = ray.init(num_cpus=1, object_store_memory=500e6) ds = ray.data.range(10) ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8)) + ds.fully_executed() meminfo = memory_summary(info.address_info["address"], stats_only=True) # Sanity check spilling is happening as expected. @@ -291,23 +292,11 @@ def _assert_has_stages(stages, stage_names): def test_stage_linking(ray_start_regular_shared): - # NOTE: This tests the internals of `ExecutionPlan`, which is bad practice. Remove - # this test once we have proper unit testing of `ExecutionPlan`. - # Test eager dataset. - ds = ray.data.range(10) - assert len(ds._plan._stages_before_snapshot) == 0 - assert len(ds._plan._stages_after_snapshot) == 0 - assert len(ds._plan._last_optimized_stages) == 0 - ds = ds.map(lambda x: x + 1) - _assert_has_stages(ds._plan._stages_before_snapshot, ["map"]) - assert len(ds._plan._stages_after_snapshot) == 0 - _assert_has_stages(ds._plan._last_optimized_stages, ["read->map"]) - # Test lazy dataset. ds = ray.data.range(10).lazy() assert len(ds._plan._stages_before_snapshot) == 0 assert len(ds._plan._stages_after_snapshot) == 0 - assert len(ds._plan._last_optimized_stages) == 0 + assert ds._plan._last_optimized_stages is None ds = ds.map(lambda x: x + 1) assert len(ds._plan._stages_before_snapshot) == 0 _assert_has_stages(ds._plan._stages_after_snapshot, ["map"]) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 1af089a25c75..6f3ed68b1d20 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -31,7 +31,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): ) with patch.object(logger, "info") as mock_logger: ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(lambda x: x) + ds = ds.map_batches(lambda x: x).fully_executed() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args @@ -47,7 +47,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): """ ) - ds = ds.map(lambda x: x) + ds = ds.map(lambda x: x).fully_executed() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args assert ( @@ -239,7 +239,7 @@ def test_dataset_pipeline_stats_basic(ray_start_regular_shared, enable_auto_log_ ) with patch.object(logger, "info") as mock_logger: ds = ray.data.range(1000, parallelism=10) - ds = ds.map_batches(lambda x: x) + ds = ds.map_batches(lambda x: x).fully_executed() if enable_auto_log_stats: logger_args, logger_kwargs = mock_logger.call_args diff --git a/python/ray/train/batch_predictor.py b/python/ray/train/batch_predictor.py index 56672e46b238..302adc67e5fc 100644 --- a/python/ray/train/batch_predictor.py +++ b/python/ray/train/batch_predictor.py @@ -99,7 +99,7 @@ def predict( separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs, - ) -> ray.data.Dataset: + ) -> Union[ray.data.Dataset, ray.data.DatasetPipeline]: """Run batch scoring on a Dataset. Args: @@ -309,6 +309,10 @@ def __call__(self, input_batch: DataBatchType) -> DataBatchType: **ray_remote_args, ) + if isinstance(prediction_results, ray.data.Dataset): + # Force execution because Dataset uses lazy execution by default. + prediction_results.fully_executed() + return prediction_results def predict_pipelined(