From 329302310c249c85cd926b5335e7a019b26ffb70 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Mon, 26 Sep 2022 18:46:50 -0700 Subject: [PATCH 01/15] create abstract get_model in checkpoint and update sklearn checkpoint Signed-off-by: Michael Mui --- python/ray/air/checkpoint.py | 13 +++++++++++++ python/ray/train/sklearn/sklearn_checkpoint.py | 2 +- python/ray/train/sklearn/sklearn_predictor.py | 2 +- python/ray/train/tests/test_sklearn_trainer.py | 4 ++-- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/python/ray/air/checkpoint.py b/python/ray/air/checkpoint.py index 1f4cb0738012..6673d57e8fdd 100644 --- a/python/ray/air/checkpoint.py +++ b/python/ray/air/checkpoint.py @@ -12,6 +12,9 @@ import uuid import warnings +import abc +from abc import abstractmethod + import ray from ray import cloudpickle as pickle from ray.air._internal.checkpointing import load_preprocessor_from_dir @@ -635,6 +638,16 @@ def __fspath__(self): "Use `Checkpoint.to_directory()` or `Checkpoint.as_directory()` instead." ) + @abstractmethod + def get_model(self, model): + """ + Retrieve the framework-specific model stored in this checkpoint. + + Returns: + A framework-specific model. + """ + raise NotImplementedError + def get_preprocessor(self) -> Optional["Preprocessor"]: """Return the saved preprocessor, if one exists.""" diff --git a/python/ray/train/sklearn/sklearn_checkpoint.py b/python/ray/train/sklearn/sklearn_checkpoint.py index 79607796e1b2..9886d6f47cfa 100644 --- a/python/ray/train/sklearn/sklearn_checkpoint.py +++ b/python/ray/train/sklearn/sklearn_checkpoint.py @@ -64,7 +64,7 @@ def from_estimator( return checkpoint - def get_estimator(self) -> BaseEstimator: + def get_model(self) -> BaseEstimator: """Retrieve the ``Estimator`` stored in this checkpoint.""" with self.as_directory() as checkpoint_path: estimator_path = os.path.join(checkpoint_path, MODEL_KEY) diff --git a/python/ray/train/sklearn/sklearn_predictor.py b/python/ray/train/sklearn/sklearn_predictor.py index 6bedec0715cb..545ac9316e4c 100644 --- a/python/ray/train/sklearn/sklearn_predictor.py +++ b/python/ray/train/sklearn/sklearn_predictor.py @@ -55,7 +55,7 @@ def from_checkpoint(cls, checkpoint: Checkpoint) -> "SklearnPredictor": ``SklearnTrainer`` run. """ checkpoint = SklearnCheckpoint.from_checkpoint(checkpoint) - estimator = checkpoint.get_estimator() + estimator = checkpoint.get_model() preprocessor = checkpoint.get_preprocessor() return cls(estimator=estimator, preprocessor=preprocessor) diff --git a/python/ray/train/tests/test_sklearn_trainer.py b/python/ray/train/tests/test_sklearn_trainer.py index c6b3a62c6170..250fd8b04b1d 100644 --- a/python/ray/train/tests/test_sklearn_trainer.py +++ b/python/ray/train/tests/test_sklearn_trainer.py @@ -93,7 +93,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": result = trainer.fit() checkpoint = SklearnCheckpoint.from_checkpoint(result.checkpoint) - model = checkpoint.get_estimator() + model = checkpoint.get_model() assert model.n_jobs == 1 @@ -129,7 +129,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": checkpoint = SklearnCheckpoint.from_checkpoint(resume_from) - model = checkpoint.get_estimator() + model = checkpoint.get_model() preprocessor = checkpoint.get_preprocessor() assert hasattr(model, "feature_importances_") assert preprocessor.is_same From f5887981788a479e46d9be2570f9005d3df9a12a Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Tue, 4 Oct 2022 14:55:34 -0700 Subject: [PATCH 02/15] add dataset select_columns and unit tests Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 45 +++++++++++++++++++++++++++ python/ray/data/tests/test_dataset.py | 20 ++++++++++++ 2 files changed, 65 insertions(+) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 2e3348ba29b0..8d601afc0e7b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -602,6 +602,51 @@ def drop_columns( lambda batch: batch.drop(columns=cols), compute=compute, **ray_remote_args ) + def select_columns( + self, + columns: List[str], + *, + compute: Union[str, ComputeStrategy] = None, + **ray_remote_args, + ) -> "Dataset[T]": + """Select one or more columns from the dataset. + + This is a blocking operation. + + Examples: + >>> import ray + >>> ds = ray.data.range_table(100) + >>> # Add a new column equal to value * 2. + >>> ds = ds.add_column( + ... "new_col", lambda df: df["value"] * 2) + >>> # Select only the "new_col" column. + >>> ds = ds.select_columns(["new_col"]) + >>> ds + Dataset(num_blocks=17, num_rows=100, schema={new_col: int64}) + + + Time complexity: O(dataset size / parallelism) + + Args: + columns: Names of the columns to select. Columns not included in this + will be filtered out. + compute: The compute strategy, either "tasks" (default) to use Ray + tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. + ray_remote_args: Additional resource requirements to request from + ray (e.g., num_gpus=1 to request GPUs for the map tasks). + """ + if ray_remote_args.get("batch_format") == "numpy": + raise TypeError( + "Unable to create a block accessor for block type `numpy`. " + "Remove `batch_format` or change it to `default`.") + + # dedup since Arrow/PandasBlock's `select` does not handle dup columns + unique_columns = list(set(columns)) + return self.map_batches( + lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), + compute=compute, **ray_remote_args + ) + def flat_map( self, fn: RowUDF[T, U], diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index d1dc91bf28cf..4e381cdba254 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2224,6 +2224,26 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): ds.drop_columns(["dummy_col", "col1", "col2"]) +def test_select_columns(ray_start_regular_shared): + df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) + # Test pandas and base cases + ds = ray.data.from_pandas(df) + assert ds._dataset_format() == "pandas" + assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [{'col1': 1, 'col2': 2, "col3": 3}] + assert ds.select_columns(columns=["col1", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + assert ds.select_columns(columns=[]).take(1) == [{}] + assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + + # Test arrow + ds = ds.select_columns(columns=["col1", "col2"], batch_format="pyarrow") + assert ds._dataset_format() == "arrow" + assert ds.take(1) == [{'col1': 1, 'col2': 2}] + + # Test `batch_format` validation + with pytest.raises(TypeError): + ds.select_columns(columns=["col1", "col2"], batch_format="numpy") + + def test_map_batches_basic(ray_start_regular_shared, tmp_path): # Test input validation ds = ray.data.range(5) From 1d1a69805ee1d7f31f9ab3d948bdad04d1e1c160 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 5 Oct 2022 01:38:13 -0700 Subject: [PATCH 03/15] address comments Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 23 ++++++++----------- python/ray/data/tests/test_dataset.py | 4 ---- python/ray/experimental/state/custom_types.py | 1 + .../ray/train/sklearn/sklearn_checkpoint.py | 2 +- python/ray/train/sklearn/sklearn_predictor.py | 2 +- .../ray/train/tests/test_sklearn_trainer.py | 4 ++-- 6 files changed, 14 insertions(+), 22 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 8d601afc0e7b..a288ef51db89 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -611,18 +611,18 @@ def select_columns( ) -> "Dataset[T]": """Select one or more columns from the dataset. - This is a blocking operation. + Columns passed in will be de-duped since ArrowBlock and PandasBlock + `select` does not explicitly handle duplicated columns. Examples: >>> import ray - >>> ds = ray.data.range_table(100) - >>> # Add a new column equal to value * 2. - >>> ds = ds.add_column( - ... "new_col", lambda df: df["value"] * 2) - >>> # Select only the "new_col" column. - >>> ds = ds.select_columns(["new_col"]) + >>> # Create a dataset with 3 columns + >>> ds = ray.data.from_items([{"col1": i, "col2": i+1, "col3": i+2} + ... for i in range(10)]) + >>> # Select only "col1" and "col2" columns. + >>> ds = ds.select_columns(["col1", "col2"]) >>> ds - Dataset(num_blocks=17, num_rows=100, schema={new_col: int64}) + Dataset(num_blocks=10, num_rows=10, schema={col1: int64, col2: int64}) Time complexity: O(dataset size / parallelism) @@ -635,12 +635,7 @@ def select_columns( ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - if ray_remote_args.get("batch_format") == "numpy": - raise TypeError( - "Unable to create a block accessor for block type `numpy`. " - "Remove `batch_format` or change it to `default`.") - - # dedup since Arrow/PandasBlock's `select` does not handle dup columns + # dedup the input columns used for selection unique_columns = list(set(columns)) return self.map_batches( lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 4e381cdba254..46ba355bbc80 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2239,10 +2239,6 @@ def test_select_columns(ray_start_regular_shared): assert ds._dataset_format() == "arrow" assert ds.take(1) == [{'col1': 1, 'col2': 2}] - # Test `batch_format` validation - with pytest.raises(TypeError): - ds.select_columns(columns=["col1", "col2"], batch_format="numpy") - def test_map_batches_basic(ray_start_regular_shared, tmp_path): # Test input validation diff --git a/python/ray/experimental/state/custom_types.py b/python/ray/experimental/state/custom_types.py index 7cd54f352ed8..71158b6aa953 100644 --- a/python/ray/experimental/state/custom_types.py +++ b/python/ray/experimental/state/custom_types.py @@ -82,6 +82,7 @@ def validate_protobuf_enum(grpc_enum, custom_enum): # doesn't include any values in that case. if len(enum_vals) > 0: assert enum_vals == set(custom_enum) + pass # Do the enum validation here. diff --git a/python/ray/train/sklearn/sklearn_checkpoint.py b/python/ray/train/sklearn/sklearn_checkpoint.py index 9886d6f47cfa..79607796e1b2 100644 --- a/python/ray/train/sklearn/sklearn_checkpoint.py +++ b/python/ray/train/sklearn/sklearn_checkpoint.py @@ -64,7 +64,7 @@ def from_estimator( return checkpoint - def get_model(self) -> BaseEstimator: + def get_estimator(self) -> BaseEstimator: """Retrieve the ``Estimator`` stored in this checkpoint.""" with self.as_directory() as checkpoint_path: estimator_path = os.path.join(checkpoint_path, MODEL_KEY) diff --git a/python/ray/train/sklearn/sklearn_predictor.py b/python/ray/train/sklearn/sklearn_predictor.py index 545ac9316e4c..6bedec0715cb 100644 --- a/python/ray/train/sklearn/sklearn_predictor.py +++ b/python/ray/train/sklearn/sklearn_predictor.py @@ -55,7 +55,7 @@ def from_checkpoint(cls, checkpoint: Checkpoint) -> "SklearnPredictor": ``SklearnTrainer`` run. """ checkpoint = SklearnCheckpoint.from_checkpoint(checkpoint) - estimator = checkpoint.get_model() + estimator = checkpoint.get_estimator() preprocessor = checkpoint.get_preprocessor() return cls(estimator=estimator, preprocessor=preprocessor) diff --git a/python/ray/train/tests/test_sklearn_trainer.py b/python/ray/train/tests/test_sklearn_trainer.py index 250fd8b04b1d..c6b3a62c6170 100644 --- a/python/ray/train/tests/test_sklearn_trainer.py +++ b/python/ray/train/tests/test_sklearn_trainer.py @@ -93,7 +93,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": result = trainer.fit() checkpoint = SklearnCheckpoint.from_checkpoint(result.checkpoint) - model = checkpoint.get_model() + model = checkpoint.get_estimator() assert model.n_jobs == 1 @@ -129,7 +129,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": checkpoint = SklearnCheckpoint.from_checkpoint(resume_from) - model = checkpoint.get_model() + model = checkpoint.get_estimator() preprocessor = checkpoint.get_preprocessor() assert hasattr(model, "feature_importances_") assert preprocessor.is_same From 32323effbd8b1b4d5439a5f5a631b3019fa05be4 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 5 Oct 2022 01:42:03 -0700 Subject: [PATCH 04/15] fix rebase Signed-off-by: Michael Mui --- python/ray/air/checkpoint.py | 13 ------------- python/ray/experimental/state/custom_types.py | 1 - 2 files changed, 14 deletions(-) diff --git a/python/ray/air/checkpoint.py b/python/ray/air/checkpoint.py index 6673d57e8fdd..1f4cb0738012 100644 --- a/python/ray/air/checkpoint.py +++ b/python/ray/air/checkpoint.py @@ -12,9 +12,6 @@ import uuid import warnings -import abc -from abc import abstractmethod - import ray from ray import cloudpickle as pickle from ray.air._internal.checkpointing import load_preprocessor_from_dir @@ -638,16 +635,6 @@ def __fspath__(self): "Use `Checkpoint.to_directory()` or `Checkpoint.as_directory()` instead." ) - @abstractmethod - def get_model(self, model): - """ - Retrieve the framework-specific model stored in this checkpoint. - - Returns: - A framework-specific model. - """ - raise NotImplementedError - def get_preprocessor(self) -> Optional["Preprocessor"]: """Return the saved preprocessor, if one exists.""" diff --git a/python/ray/experimental/state/custom_types.py b/python/ray/experimental/state/custom_types.py index 71158b6aa953..7cd54f352ed8 100644 --- a/python/ray/experimental/state/custom_types.py +++ b/python/ray/experimental/state/custom_types.py @@ -82,7 +82,6 @@ def validate_protobuf_enum(grpc_enum, custom_enum): # doesn't include any values in that case. if len(enum_vals) > 0: assert enum_vals == set(custom_enum) - pass # Do the enum validation here. From 4cce85f872d18a47e4528c174f6adf51a836873a Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 5 Oct 2022 01:45:14 -0700 Subject: [PATCH 05/15] fix lint changes after scripts/format.sh Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 7 ++++--- python/ray/data/tests/test_dataset.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index a288ef51db89..427ef04d95b9 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -638,9 +638,10 @@ def select_columns( # dedup the input columns used for selection unique_columns = list(set(columns)) return self.map_batches( - lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), - compute=compute, **ray_remote_args - ) + lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), + compute=compute, + **ray_remote_args, + ) def flat_map( self, diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 46ba355bbc80..a6795a0a9c64 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2229,15 +2229,21 @@ def test_select_columns(ray_start_regular_shared): # Test pandas and base cases ds = ray.data.from_pandas(df) assert ds._dataset_format() == "pandas" - assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [{'col1': 1, 'col2': 2, "col3": 3}] - assert ds.select_columns(columns=["col1", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ + {"col1": 1, "col2": 2, "col3": 3} + ] + assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] # Test arrow ds = ds.select_columns(columns=["col1", "col2"], batch_format="pyarrow") assert ds._dataset_format() == "arrow" - assert ds.take(1) == [{'col1': 1, 'col2': 2}] + assert ds.take(1) == [{"col1": 1, "col2": 2}] def test_map_batches_basic(ray_start_regular_shared, tmp_path): From 115fde4a182e0ee70267af30b504da74e33ec006 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 12 Oct 2022 03:37:39 +0100 Subject: [PATCH 06/15] add schema validation checks and more unit tests Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 35 ++++++++++++++++++++----- python/ray/data/tests/test_dataset.py | 37 +++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 427ef04d95b9..dbada340920a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -611,8 +611,7 @@ def select_columns( ) -> "Dataset[T]": """Select one or more columns from the dataset. - Columns passed in will be de-duped since ArrowBlock and PandasBlock - `select` does not explicitly handle duplicated columns. + Note that all input columns need to be in the schema of the dataset. Examples: >>> import ray @@ -628,17 +627,39 @@ def select_columns( Time complexity: O(dataset size / parallelism) Args: - columns: Names of the columns to select. Columns not included in this - will be filtered out. + columns: Names of the columns to select. Columns that are not + included in this list will be filtered out. compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - # dedup the input columns used for selection - unique_columns = list(set(columns)) + + import pyarrow as pa + + schema = self.schema() + assert isinstance(schema, (type, PandasBlockSchema, pa.Schema)) + + # check to make sure all input columns are in the dataset schema + if isinstance(schema, PandasBlockSchema): + dataset_cols = schema.names + elif isinstance(schema, pa.Schema): + dataset_cols = [field.name for field in schema] + else: + raise ValueError( + "We currently only support select by column names. " + "Datasets with `simple` schema are not supported." + ) + + extra_cols = [col for col in columns if col not in dataset_cols] + if extra_cols: + raise ValueError( + "The `columns` passed in have to be in the schema of the dataset. " + f"Please remove {extra_cols} from your input `columns`." + ) + return self.map_batches( - lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), + lambda batch: BlockAccessor.for_block(batch).select(columns=columns), compute=compute, **ray_remote_args, ) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index a6795a0a9c64..742a8698e510 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2225,25 +2225,52 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): def test_select_columns(ray_start_regular_shared): + + # Test pandas df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) - # Test pandas and base cases ds = ray.data.from_pandas(df) assert ds._dataset_format() == "pandas" + assert ds.select_columns(columns=[]).take(1) == [{}] assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ {"col1": 1, "col2": 2, "col3": 3} ] assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ {"col1": 1, "col2": 2} ] - assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [ + assert ds.select_columns(columns=["col2", "col1"]).take(1) == [ {"col1": 1, "col2": 2} ] + assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ + "col1", + "col2", + "col2", + ] + with pytest.raises(ValueError): + ds.select_columns(columns=["col1", "col2", "dummy_col"]) # Test arrow - ds = ds.select_columns(columns=["col1", "col2"], batch_format="pyarrow") + ds = ds.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") assert ds._dataset_format() == "arrow" - assert ds.take(1) == [{"col1": 1, "col2": 2}] + assert ds.select_columns(columns=[]).take(1) == [{}] + assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ + {"col1": 1, "col2": 2, "col3": 3} + ] + assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] + assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ + "col1", + "col2", + "col2", + ] + with pytest.raises(ValueError): + ds.select_columns(columns=["col1", "col2", "dummy_col"]) + + # Test simple + ds = ray.data.range(10) + assert ds._dataset_format() == "simple" + with pytest.raises(ValueError): + ds.select_columns(columns=[]) def test_map_batches_basic(ray_start_regular_shared, tmp_path): From cebb21591a5439b8cfd0becbf08f54a2cbb43b8e Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Fri, 14 Oct 2022 02:59:49 +0200 Subject: [PATCH 07/15] remove schema validation and let library error surface, add api to dataset_pipeline, and update docs Signed-off-by: Michael Mui --- doc/source/data/api/dataset.rst | 3 + doc/source/data/api/dataset_pipeline.rst | 3 + python/ray/data/dataset.py | 36 ++-------- python/ray/data/dataset_pipeline.py | 13 ++++ python/ray/data/tests/test_dataset.py | 72 ++++++++----------- .../ray/data/tests/test_dataset_pipeline.py | 16 +++++ 6 files changed, 71 insertions(+), 72 deletions(-) diff --git a/doc/source/data/api/dataset.rst b/doc/source/data/api/dataset.rst index 69777c953e90..ead18c8854c8 100644 --- a/doc/source/data/api/dataset.rst +++ b/doc/source/data/api/dataset.rst @@ -16,6 +16,7 @@ Dataset API ray.data.Dataset.filter ray.data.Dataset.add_column ray.data.Dataset.drop_columns + ray.data.Dataset.select_columns ray.data.Dataset.random_sample ray.data.Dataset.limit @@ -144,6 +145,8 @@ Basic Transformations .. automethod:: ray.data.Dataset.drop_columns +.. automethod:: ray.data.Dataset.select_columns + .. automethod:: ray.data.Dataset.random_sample .. automethod:: ray.data.Dataset.limit diff --git a/doc/source/data/api/dataset_pipeline.rst b/doc/source/data/api/dataset_pipeline.rst index a82bd488aac7..92a98f60207c 100644 --- a/doc/source/data/api/dataset_pipeline.rst +++ b/doc/source/data/api/dataset_pipeline.rst @@ -17,6 +17,7 @@ DatasetPipeline API ray.data.DatasetPipeline.filter ray.data.DatasetPipeline.add_column ray.data.DatasetPipeline.drop_columns + ray.data.DatasetPipeline.select_columns **Sorting, Shuffling, Repartitioning** @@ -98,6 +99,8 @@ Basic transformations .. automethod:: ray.data.DatasetPipeline.drop_columns +.. automethod:: ray.data.DatasetPipeline.select_columns + Sorting, Shuffling, Repartitioning ---------------------------------- diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index dbada340920a..196b58ba495d 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -604,14 +604,14 @@ def drop_columns( def select_columns( self, - columns: List[str], + cols: List[str], *, compute: Union[str, ComputeStrategy] = None, **ray_remote_args, ) -> "Dataset[T]": """Select one or more columns from the dataset. - Note that all input columns need to be in the schema of the dataset. + All input columns used to select need to be in the schema of the dataset. Examples: >>> import ray @@ -619,7 +619,7 @@ def select_columns( >>> ds = ray.data.from_items([{"col1": i, "col2": i+1, "col3": i+2} ... for i in range(10)]) >>> # Select only "col1" and "col2" columns. - >>> ds = ds.select_columns(["col1", "col2"]) + >>> ds = ds.select_columns(cols=["col1", "col2"]) >>> ds Dataset(num_blocks=10, num_rows=10, schema={col1: int64, col2: int64}) @@ -627,39 +627,15 @@ def select_columns( Time complexity: O(dataset size / parallelism) Args: - columns: Names of the columns to select. Columns that are not - included in this list will be filtered out. + cols: Names of the columns to select. If any name is not included in the + dataset schema, an exception will be raised. compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - - import pyarrow as pa - - schema = self.schema() - assert isinstance(schema, (type, PandasBlockSchema, pa.Schema)) - - # check to make sure all input columns are in the dataset schema - if isinstance(schema, PandasBlockSchema): - dataset_cols = schema.names - elif isinstance(schema, pa.Schema): - dataset_cols = [field.name for field in schema] - else: - raise ValueError( - "We currently only support select by column names. " - "Datasets with `simple` schema are not supported." - ) - - extra_cols = [col for col in columns if col not in dataset_cols] - if extra_cols: - raise ValueError( - "The `columns` passed in have to be in the schema of the dataset. " - f"Please remove {extra_cols} from your input `columns`." - ) - return self.map_batches( - lambda batch: BlockAccessor.for_block(batch).select(columns=columns), + lambda batch: BlockAccessor.for_block(batch).select(columns=cols), compute=compute, **ray_remote_args, ) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 059c10f73cf0..acd359f51d82 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -810,6 +810,19 @@ def drop_columns( lambda ds: ds.drop_columns(cols, compute=compute, **ray_remote_args) ) + def select_columns( + self, + cols: List[str], + *, + compute: Optional[str] = None, + **ray_remote_args, + ) -> "DatasetPipeline[U]": + """Apply :py:meth:`Dataset.select_columns ` to + each dataset/window in this pipeline.""" + return self.foreach_window( + lambda ds: ds.select_columns(cols, compute=compute, **ray_remote_args) + ) + def repartition_each_window( self, num_blocks: int, *, shuffle: bool = False ) -> "DatasetPipeline[U]": diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 742a8698e510..fb6d58645336 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2225,52 +2225,40 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): def test_select_columns(ray_start_regular_shared): - - # Test pandas + # Test pandas and arrow df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) - ds = ray.data.from_pandas(df) - assert ds._dataset_format() == "pandas" - assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ - {"col1": 1, "col2": 2, "col3": 3} - ] - assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ - {"col1": 1, "col2": 2} - ] - assert ds.select_columns(columns=["col2", "col1"]).take(1) == [ - {"col1": 1, "col2": 2} - ] - assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ - "col1", - "col2", - "col2", - ] - with pytest.raises(ValueError): - ds.select_columns(columns=["col1", "col2", "dummy_col"]) - - # Test arrow - ds = ds.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") - assert ds._dataset_format() == "arrow" - assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ - {"col1": 1, "col2": 2, "col3": 3} - ] - assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ - {"col1": 1, "col2": 2} - ] - assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ - "col1", - "col2", - "col2", - ] - with pytest.raises(ValueError): - ds.select_columns(columns=["col1", "col2", "dummy_col"]) + ds1 = ray.data.from_pandas(df) + assert ds1._dataset_format() == "pandas" + + ds2 = ds1.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") + assert ds2._dataset_format() == "arrow" + + for each_ds in [ds1, ds2]: + assert each_ds.select_columns(cols=[]).take(1) == [{}] + assert each_ds.select_columns(cols=["col1", "col2", "col3"]).take(1) == [ + {"col1": 1, "col2": 2, "col3": 3} + ] + assert each_ds.select_columns(cols=["col1", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] + assert each_ds.select_columns(cols=["col2", "col1"]).take(1) == [ + {"col1": 1, "col2": 2} + ] + # Test selecting columns with duplicates + assert each_ds.select_columns(cols=["col1", "col2", "col2"]).schema().names == [ + "col1", + "col2", + "col2", + ] + # Test selecting a column that is not in the dataset schema + with pytest.raises(KeyError): + each_ds.select_columns(cols=["col1", "col2", "dummy_col"]) # Test simple - ds = ray.data.range(10) - assert ds._dataset_format() == "simple" + ds3 = ray.data.range(10) + assert ds3._dataset_format() == "simple" with pytest.raises(ValueError): - ds.select_columns(columns=[]) + ds3.select_columns(cols=[]) def test_map_batches_basic(ray_start_regular_shared, tmp_path): diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index c33c1602a691..9d26be0b4ecf 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -627,6 +627,15 @@ def test_randomize_block_order_each_window(ray_start_regular_shared): assert pipe.take() == [0, 1, 4, 5, 2, 3, 6, 7, 10, 11, 8, 9] +def test_add_column(ray_start_regular_shared): + df = pd.DataFrame({"col1": [1, 2, 3]}) + ds = ray.data.from_pandas(df) + pipe = ds.repeat() + assert pipe.add_column("col2", lambda x: x["col1"] + 1).take(1) == [ + {"col1": 1, "col2": 2} + ] + + def test_drop_columns(ray_start_regular_shared): df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) ds = ray.data.from_pandas(df) @@ -634,6 +643,13 @@ def test_drop_columns(ray_start_regular_shared): assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] +def test_select_columns(ray_start_regular_shared): + df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) + ds = ray.data.from_pandas(df) + pipe = ds.repeat() + assert pipe.select_columns(["col2", "col3"]).take(1) == [{"col2": 2, "col3": 3}] + + def test_in_place_transformation_doesnt_clear_objects(ray_start_regular_shared): ds = ray.data.from_items([1, 2, 3, 4, 5, 6]) From c8421c639ab07553d678ace8fc8ffc8188f47470 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Mon, 26 Sep 2022 18:46:50 -0700 Subject: [PATCH 08/15] create abstract get_model in checkpoint and update sklearn checkpoint Signed-off-by: Michael Mui --- python/ray/air/checkpoint.py | 13 +++++++++++++ python/ray/train/sklearn/sklearn_checkpoint.py | 2 +- python/ray/train/sklearn/sklearn_predictor.py | 2 +- python/ray/train/tests/test_sklearn_trainer.py | 4 ++-- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/python/ray/air/checkpoint.py b/python/ray/air/checkpoint.py index 9ef8a88780a4..b369ebd8db36 100644 --- a/python/ray/air/checkpoint.py +++ b/python/ray/air/checkpoint.py @@ -13,6 +13,9 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Tuple, Type, Union +import abc +from abc import abstractmethod + import ray from ray import cloudpickle as pickle from ray.air._internal.checkpointing import load_preprocessor_from_dir @@ -754,6 +757,16 @@ def __fspath__(self): "Use `Checkpoint.to_directory()` or `Checkpoint.as_directory()` instead." ) + @abstractmethod + def get_model(self, model): + """ + Retrieve the framework-specific model stored in this checkpoint. + + Returns: + A framework-specific model. + """ + raise NotImplementedError + def get_preprocessor(self) -> Optional["Preprocessor"]: """Return the saved preprocessor, if one exists.""" diff --git a/python/ray/train/sklearn/sklearn_checkpoint.py b/python/ray/train/sklearn/sklearn_checkpoint.py index 79607796e1b2..9886d6f47cfa 100644 --- a/python/ray/train/sklearn/sklearn_checkpoint.py +++ b/python/ray/train/sklearn/sklearn_checkpoint.py @@ -64,7 +64,7 @@ def from_estimator( return checkpoint - def get_estimator(self) -> BaseEstimator: + def get_model(self) -> BaseEstimator: """Retrieve the ``Estimator`` stored in this checkpoint.""" with self.as_directory() as checkpoint_path: estimator_path = os.path.join(checkpoint_path, MODEL_KEY) diff --git a/python/ray/train/sklearn/sklearn_predictor.py b/python/ray/train/sklearn/sklearn_predictor.py index 6bedec0715cb..545ac9316e4c 100644 --- a/python/ray/train/sklearn/sklearn_predictor.py +++ b/python/ray/train/sklearn/sklearn_predictor.py @@ -55,7 +55,7 @@ def from_checkpoint(cls, checkpoint: Checkpoint) -> "SklearnPredictor": ``SklearnTrainer`` run. """ checkpoint = SklearnCheckpoint.from_checkpoint(checkpoint) - estimator = checkpoint.get_estimator() + estimator = checkpoint.get_model() preprocessor = checkpoint.get_preprocessor() return cls(estimator=estimator, preprocessor=preprocessor) diff --git a/python/ray/train/tests/test_sklearn_trainer.py b/python/ray/train/tests/test_sklearn_trainer.py index c6b3a62c6170..250fd8b04b1d 100644 --- a/python/ray/train/tests/test_sklearn_trainer.py +++ b/python/ray/train/tests/test_sklearn_trainer.py @@ -93,7 +93,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": result = trainer.fit() checkpoint = SklearnCheckpoint.from_checkpoint(result.checkpoint) - model = checkpoint.get_estimator() + model = checkpoint.get_model() assert model.n_jobs == 1 @@ -129,7 +129,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": checkpoint = SklearnCheckpoint.from_checkpoint(resume_from) - model = checkpoint.get_estimator() + model = checkpoint.get_model() preprocessor = checkpoint.get_preprocessor() assert hasattr(model, "feature_importances_") assert preprocessor.is_same From f7a53dfaf0c85f804c5d4746cc1bb840e7f254cd Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Tue, 4 Oct 2022 14:55:34 -0700 Subject: [PATCH 09/15] add dataset select_columns and unit tests Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 45 +++++++++++++++++++++++++++ python/ray/data/tests/test_dataset.py | 20 ++++++++++++ 2 files changed, 65 insertions(+) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 064894f4a35b..4852309fe0ae 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -670,6 +670,51 @@ def drop_columns( lambda batch: batch.drop(columns=cols), compute=compute, **ray_remote_args ) + def select_columns( + self, + columns: List[str], + *, + compute: Union[str, ComputeStrategy] = None, + **ray_remote_args, + ) -> "Dataset[T]": + """Select one or more columns from the dataset. + + This is a blocking operation. + + Examples: + >>> import ray + >>> ds = ray.data.range_table(100) + >>> # Add a new column equal to value * 2. + >>> ds = ds.add_column( + ... "new_col", lambda df: df["value"] * 2) + >>> # Select only the "new_col" column. + >>> ds = ds.select_columns(["new_col"]) + >>> ds + Dataset(num_blocks=17, num_rows=100, schema={new_col: int64}) + + + Time complexity: O(dataset size / parallelism) + + Args: + columns: Names of the columns to select. Columns not included in this + will be filtered out. + compute: The compute strategy, either "tasks" (default) to use Ray + tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. + ray_remote_args: Additional resource requirements to request from + ray (e.g., num_gpus=1 to request GPUs for the map tasks). + """ + if ray_remote_args.get("batch_format") == "numpy": + raise TypeError( + "Unable to create a block accessor for block type `numpy`. " + "Remove `batch_format` or change it to `default`.") + + # dedup since Arrow/PandasBlock's `select` does not handle dup columns + unique_columns = list(set(columns)) + return self.map_batches( + lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), + compute=compute, **ray_remote_args + ) + def flat_map( self, fn: RowUDF[T, U], diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index f8d1f50bc30a..b9bfb3eb9a6a 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2217,6 +2217,26 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): ds.drop_columns(["dummy_col", "col1", "col2"]) +def test_select_columns(ray_start_regular_shared): + df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) + # Test pandas and base cases + ds = ray.data.from_pandas(df) + assert ds._dataset_format() == "pandas" + assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [{'col1': 1, 'col2': 2, "col3": 3}] + assert ds.select_columns(columns=["col1", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + assert ds.select_columns(columns=[]).take(1) == [{}] + assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + + # Test arrow + ds = ds.select_columns(columns=["col1", "col2"], batch_format="pyarrow") + assert ds._dataset_format() == "arrow" + assert ds.take(1) == [{'col1': 1, 'col2': 2}] + + # Test `batch_format` validation + with pytest.raises(TypeError): + ds.select_columns(columns=["col1", "col2"], batch_format="numpy") + + def test_map_batches_basic(ray_start_regular_shared, tmp_path): # Test input validation ds = ray.data.range(5) From 9730552833069722d44e55da179039cf63bae7b1 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 5 Oct 2022 01:38:13 -0700 Subject: [PATCH 10/15] address comments Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 23 ++++++++----------- python/ray/data/tests/test_dataset.py | 4 ---- python/ray/experimental/state/custom_types.py | 1 + .../ray/train/sklearn/sklearn_checkpoint.py | 2 +- python/ray/train/sklearn/sklearn_predictor.py | 2 +- .../ray/train/tests/test_sklearn_trainer.py | 4 ++-- 6 files changed, 14 insertions(+), 22 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4852309fe0ae..8f6af5c239ad 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -679,18 +679,18 @@ def select_columns( ) -> "Dataset[T]": """Select one or more columns from the dataset. - This is a blocking operation. + Columns passed in will be de-duped since ArrowBlock and PandasBlock + `select` does not explicitly handle duplicated columns. Examples: >>> import ray - >>> ds = ray.data.range_table(100) - >>> # Add a new column equal to value * 2. - >>> ds = ds.add_column( - ... "new_col", lambda df: df["value"] * 2) - >>> # Select only the "new_col" column. - >>> ds = ds.select_columns(["new_col"]) + >>> # Create a dataset with 3 columns + >>> ds = ray.data.from_items([{"col1": i, "col2": i+1, "col3": i+2} + ... for i in range(10)]) + >>> # Select only "col1" and "col2" columns. + >>> ds = ds.select_columns(["col1", "col2"]) >>> ds - Dataset(num_blocks=17, num_rows=100, schema={new_col: int64}) + Dataset(num_blocks=10, num_rows=10, schema={col1: int64, col2: int64}) Time complexity: O(dataset size / parallelism) @@ -703,12 +703,7 @@ def select_columns( ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - if ray_remote_args.get("batch_format") == "numpy": - raise TypeError( - "Unable to create a block accessor for block type `numpy`. " - "Remove `batch_format` or change it to `default`.") - - # dedup since Arrow/PandasBlock's `select` does not handle dup columns + # dedup the input columns used for selection unique_columns = list(set(columns)) return self.map_batches( lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index b9bfb3eb9a6a..a5429fb8c9d0 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2232,10 +2232,6 @@ def test_select_columns(ray_start_regular_shared): assert ds._dataset_format() == "arrow" assert ds.take(1) == [{'col1': 1, 'col2': 2}] - # Test `batch_format` validation - with pytest.raises(TypeError): - ds.select_columns(columns=["col1", "col2"], batch_format="numpy") - def test_map_batches_basic(ray_start_regular_shared, tmp_path): # Test input validation diff --git a/python/ray/experimental/state/custom_types.py b/python/ray/experimental/state/custom_types.py index ced74b80a8f4..db9d59e62fdb 100644 --- a/python/ray/experimental/state/custom_types.py +++ b/python/ray/experimental/state/custom_types.py @@ -83,6 +83,7 @@ def validate_protobuf_enum(grpc_enum, custom_enum): # doesn't include any values in that case. if len(enum_vals) > 0: assert enum_vals == set(custom_enum) + pass # Do the enum validation here. diff --git a/python/ray/train/sklearn/sklearn_checkpoint.py b/python/ray/train/sklearn/sklearn_checkpoint.py index 9886d6f47cfa..79607796e1b2 100644 --- a/python/ray/train/sklearn/sklearn_checkpoint.py +++ b/python/ray/train/sklearn/sklearn_checkpoint.py @@ -64,7 +64,7 @@ def from_estimator( return checkpoint - def get_model(self) -> BaseEstimator: + def get_estimator(self) -> BaseEstimator: """Retrieve the ``Estimator`` stored in this checkpoint.""" with self.as_directory() as checkpoint_path: estimator_path = os.path.join(checkpoint_path, MODEL_KEY) diff --git a/python/ray/train/sklearn/sklearn_predictor.py b/python/ray/train/sklearn/sklearn_predictor.py index 545ac9316e4c..6bedec0715cb 100644 --- a/python/ray/train/sklearn/sklearn_predictor.py +++ b/python/ray/train/sklearn/sklearn_predictor.py @@ -55,7 +55,7 @@ def from_checkpoint(cls, checkpoint: Checkpoint) -> "SklearnPredictor": ``SklearnTrainer`` run. """ checkpoint = SklearnCheckpoint.from_checkpoint(checkpoint) - estimator = checkpoint.get_model() + estimator = checkpoint.get_estimator() preprocessor = checkpoint.get_preprocessor() return cls(estimator=estimator, preprocessor=preprocessor) diff --git a/python/ray/train/tests/test_sklearn_trainer.py b/python/ray/train/tests/test_sklearn_trainer.py index 250fd8b04b1d..c6b3a62c6170 100644 --- a/python/ray/train/tests/test_sklearn_trainer.py +++ b/python/ray/train/tests/test_sklearn_trainer.py @@ -93,7 +93,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": result = trainer.fit() checkpoint = SklearnCheckpoint.from_checkpoint(result.checkpoint) - model = checkpoint.get_model() + model = checkpoint.get_estimator() assert model.n_jobs == 1 @@ -129,7 +129,7 @@ def _transform_pandas(self, df: "pd.DataFrame") -> "pd.DataFrame": checkpoint = SklearnCheckpoint.from_checkpoint(resume_from) - model = checkpoint.get_model() + model = checkpoint.get_estimator() preprocessor = checkpoint.get_preprocessor() assert hasattr(model, "feature_importances_") assert preprocessor.is_same From e16c72c097194fa2129ef5274b675769f61fb888 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 5 Oct 2022 01:42:03 -0700 Subject: [PATCH 11/15] fix rebase Signed-off-by: Michael Mui --- python/ray/air/checkpoint.py | 13 ------------- python/ray/experimental/state/custom_types.py | 1 - 2 files changed, 14 deletions(-) diff --git a/python/ray/air/checkpoint.py b/python/ray/air/checkpoint.py index b369ebd8db36..9ef8a88780a4 100644 --- a/python/ray/air/checkpoint.py +++ b/python/ray/air/checkpoint.py @@ -13,9 +13,6 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Tuple, Type, Union -import abc -from abc import abstractmethod - import ray from ray import cloudpickle as pickle from ray.air._internal.checkpointing import load_preprocessor_from_dir @@ -757,16 +754,6 @@ def __fspath__(self): "Use `Checkpoint.to_directory()` or `Checkpoint.as_directory()` instead." ) - @abstractmethod - def get_model(self, model): - """ - Retrieve the framework-specific model stored in this checkpoint. - - Returns: - A framework-specific model. - """ - raise NotImplementedError - def get_preprocessor(self) -> Optional["Preprocessor"]: """Return the saved preprocessor, if one exists.""" diff --git a/python/ray/experimental/state/custom_types.py b/python/ray/experimental/state/custom_types.py index db9d59e62fdb..ced74b80a8f4 100644 --- a/python/ray/experimental/state/custom_types.py +++ b/python/ray/experimental/state/custom_types.py @@ -83,7 +83,6 @@ def validate_protobuf_enum(grpc_enum, custom_enum): # doesn't include any values in that case. if len(enum_vals) > 0: assert enum_vals == set(custom_enum) - pass # Do the enum validation here. From cfc825c407e03039d25da592b4177ce980538eee Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 5 Oct 2022 01:45:14 -0700 Subject: [PATCH 12/15] fix lint changes after scripts/format.sh Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 7 ++++--- python/ray/data/tests/test_dataset.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 8f6af5c239ad..3bf87cea1783 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -706,9 +706,10 @@ def select_columns( # dedup the input columns used for selection unique_columns = list(set(columns)) return self.map_batches( - lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), - compute=compute, **ray_remote_args - ) + lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), + compute=compute, + **ray_remote_args, + ) def flat_map( self, diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index a5429fb8c9d0..81760977e046 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2222,15 +2222,21 @@ def test_select_columns(ray_start_regular_shared): # Test pandas and base cases ds = ray.data.from_pandas(df) assert ds._dataset_format() == "pandas" - assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [{'col1': 1, 'col2': 2, "col3": 3}] - assert ds.select_columns(columns=["col1", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ + {"col1": 1, "col2": 2, "col3": 3} + ] + assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [{'col1': 1, 'col2': 2}] + assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] # Test arrow ds = ds.select_columns(columns=["col1", "col2"], batch_format="pyarrow") assert ds._dataset_format() == "arrow" - assert ds.take(1) == [{'col1': 1, 'col2': 2}] + assert ds.take(1) == [{"col1": 1, "col2": 2}] def test_map_batches_basic(ray_start_regular_shared, tmp_path): From c7f51195d5166fc1669fb5de78468b0cd2ab55d4 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Wed, 12 Oct 2022 03:37:39 +0100 Subject: [PATCH 13/15] add schema validation checks and more unit tests Signed-off-by: Michael Mui --- python/ray/data/dataset.py | 35 ++++++++++++++++++++----- python/ray/data/tests/test_dataset.py | 37 +++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 3bf87cea1783..1f5d1b9a27b4 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -679,8 +679,7 @@ def select_columns( ) -> "Dataset[T]": """Select one or more columns from the dataset. - Columns passed in will be de-duped since ArrowBlock and PandasBlock - `select` does not explicitly handle duplicated columns. + Note that all input columns need to be in the schema of the dataset. Examples: >>> import ray @@ -696,17 +695,39 @@ def select_columns( Time complexity: O(dataset size / parallelism) Args: - columns: Names of the columns to select. Columns not included in this - will be filtered out. + columns: Names of the columns to select. Columns that are not + included in this list will be filtered out. compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - # dedup the input columns used for selection - unique_columns = list(set(columns)) + + import pyarrow as pa + + schema = self.schema() + assert isinstance(schema, (type, PandasBlockSchema, pa.Schema)) + + # check to make sure all input columns are in the dataset schema + if isinstance(schema, PandasBlockSchema): + dataset_cols = schema.names + elif isinstance(schema, pa.Schema): + dataset_cols = [field.name for field in schema] + else: + raise ValueError( + "We currently only support select by column names. " + "Datasets with `simple` schema are not supported." + ) + + extra_cols = [col for col in columns if col not in dataset_cols] + if extra_cols: + raise ValueError( + "The `columns` passed in have to be in the schema of the dataset. " + f"Please remove {extra_cols} from your input `columns`." + ) + return self.map_batches( - lambda batch: BlockAccessor.for_block(batch).select(columns=unique_columns), + lambda batch: BlockAccessor.for_block(batch).select(columns=columns), compute=compute, **ray_remote_args, ) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 81760977e046..2a7b0bdfd1b6 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2218,25 +2218,52 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): def test_select_columns(ray_start_regular_shared): + + # Test pandas df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) - # Test pandas and base cases ds = ray.data.from_pandas(df) assert ds._dataset_format() == "pandas" + assert ds.select_columns(columns=[]).take(1) == [{}] assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ {"col1": 1, "col2": 2, "col3": 3} ] assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ {"col1": 1, "col2": 2} ] - assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col2"]).take(1) == [ + assert ds.select_columns(columns=["col2", "col1"]).take(1) == [ {"col1": 1, "col2": 2} ] + assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ + "col1", + "col2", + "col2", + ] + with pytest.raises(ValueError): + ds.select_columns(columns=["col1", "col2", "dummy_col"]) # Test arrow - ds = ds.select_columns(columns=["col1", "col2"], batch_format="pyarrow") + ds = ds.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") assert ds._dataset_format() == "arrow" - assert ds.take(1) == [{"col1": 1, "col2": 2}] + assert ds.select_columns(columns=[]).take(1) == [{}] + assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ + {"col1": 1, "col2": 2, "col3": 3} + ] + assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] + assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ + "col1", + "col2", + "col2", + ] + with pytest.raises(ValueError): + ds.select_columns(columns=["col1", "col2", "dummy_col"]) + + # Test simple + ds = ray.data.range(10) + assert ds._dataset_format() == "simple" + with pytest.raises(ValueError): + ds.select_columns(columns=[]) def test_map_batches_basic(ray_start_regular_shared, tmp_path): From 95ffbe07d561472288c852598c0d71c7058d954b Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Fri, 14 Oct 2022 02:59:49 +0200 Subject: [PATCH 14/15] remove schema validation and let library error surface, add api to dataset_pipeline, and update docs Signed-off-by: Michael Mui --- doc/source/data/api/dataset.rst | 3 + doc/source/data/api/dataset_pipeline.rst | 3 + python/ray/data/dataset.py | 36 ++-------- python/ray/data/dataset_pipeline.py | 13 ++++ python/ray/data/tests/test_dataset.py | 72 ++++++++----------- .../ray/data/tests/test_dataset_pipeline.py | 16 +++++ 6 files changed, 71 insertions(+), 72 deletions(-) diff --git a/doc/source/data/api/dataset.rst b/doc/source/data/api/dataset.rst index 69777c953e90..ead18c8854c8 100644 --- a/doc/source/data/api/dataset.rst +++ b/doc/source/data/api/dataset.rst @@ -16,6 +16,7 @@ Dataset API ray.data.Dataset.filter ray.data.Dataset.add_column ray.data.Dataset.drop_columns + ray.data.Dataset.select_columns ray.data.Dataset.random_sample ray.data.Dataset.limit @@ -144,6 +145,8 @@ Basic Transformations .. automethod:: ray.data.Dataset.drop_columns +.. automethod:: ray.data.Dataset.select_columns + .. automethod:: ray.data.Dataset.random_sample .. automethod:: ray.data.Dataset.limit diff --git a/doc/source/data/api/dataset_pipeline.rst b/doc/source/data/api/dataset_pipeline.rst index a82bd488aac7..92a98f60207c 100644 --- a/doc/source/data/api/dataset_pipeline.rst +++ b/doc/source/data/api/dataset_pipeline.rst @@ -17,6 +17,7 @@ DatasetPipeline API ray.data.DatasetPipeline.filter ray.data.DatasetPipeline.add_column ray.data.DatasetPipeline.drop_columns + ray.data.DatasetPipeline.select_columns **Sorting, Shuffling, Repartitioning** @@ -98,6 +99,8 @@ Basic transformations .. automethod:: ray.data.DatasetPipeline.drop_columns +.. automethod:: ray.data.DatasetPipeline.select_columns + Sorting, Shuffling, Repartitioning ---------------------------------- diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 1f5d1b9a27b4..0f2f9e9f4c9d 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -672,14 +672,14 @@ def drop_columns( def select_columns( self, - columns: List[str], + cols: List[str], *, compute: Union[str, ComputeStrategy] = None, **ray_remote_args, ) -> "Dataset[T]": """Select one or more columns from the dataset. - Note that all input columns need to be in the schema of the dataset. + All input columns used to select need to be in the schema of the dataset. Examples: >>> import ray @@ -687,7 +687,7 @@ def select_columns( >>> ds = ray.data.from_items([{"col1": i, "col2": i+1, "col3": i+2} ... for i in range(10)]) >>> # Select only "col1" and "col2" columns. - >>> ds = ds.select_columns(["col1", "col2"]) + >>> ds = ds.select_columns(cols=["col1", "col2"]) >>> ds Dataset(num_blocks=10, num_rows=10, schema={col1: int64, col2: int64}) @@ -695,39 +695,15 @@ def select_columns( Time complexity: O(dataset size / parallelism) Args: - columns: Names of the columns to select. Columns that are not - included in this list will be filtered out. + cols: Names of the columns to select. If any name is not included in the + dataset schema, an exception will be raised. compute: The compute strategy, either "tasks" (default) to use Ray tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - - import pyarrow as pa - - schema = self.schema() - assert isinstance(schema, (type, PandasBlockSchema, pa.Schema)) - - # check to make sure all input columns are in the dataset schema - if isinstance(schema, PandasBlockSchema): - dataset_cols = schema.names - elif isinstance(schema, pa.Schema): - dataset_cols = [field.name for field in schema] - else: - raise ValueError( - "We currently only support select by column names. " - "Datasets with `simple` schema are not supported." - ) - - extra_cols = [col for col in columns if col not in dataset_cols] - if extra_cols: - raise ValueError( - "The `columns` passed in have to be in the schema of the dataset. " - f"Please remove {extra_cols} from your input `columns`." - ) - return self.map_batches( - lambda batch: BlockAccessor.for_block(batch).select(columns=columns), + lambda batch: BlockAccessor.for_block(batch).select(columns=cols), compute=compute, **ray_remote_args, ) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 272d3c060476..d5fc95fd25e2 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -810,6 +810,19 @@ def drop_columns( lambda ds: ds.drop_columns(cols, compute=compute, **ray_remote_args) ) + def select_columns( + self, + cols: List[str], + *, + compute: Optional[str] = None, + **ray_remote_args, + ) -> "DatasetPipeline[U]": + """Apply :py:meth:`Dataset.select_columns ` to + each dataset/window in this pipeline.""" + return self.foreach_window( + lambda ds: ds.select_columns(cols, compute=compute, **ray_remote_args) + ) + def repartition_each_window( self, num_blocks: int, *, shuffle: bool = False ) -> "DatasetPipeline[U]": diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 2a7b0bdfd1b6..2005e7eed8f3 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -2218,52 +2218,40 @@ def test_drop_columns(ray_start_regular_shared, tmp_path): def test_select_columns(ray_start_regular_shared): - - # Test pandas + # Test pandas and arrow df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) - ds = ray.data.from_pandas(df) - assert ds._dataset_format() == "pandas" - assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ - {"col1": 1, "col2": 2, "col3": 3} - ] - assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ - {"col1": 1, "col2": 2} - ] - assert ds.select_columns(columns=["col2", "col1"]).take(1) == [ - {"col1": 1, "col2": 2} - ] - assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ - "col1", - "col2", - "col2", - ] - with pytest.raises(ValueError): - ds.select_columns(columns=["col1", "col2", "dummy_col"]) - - # Test arrow - ds = ds.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") - assert ds._dataset_format() == "arrow" - assert ds.select_columns(columns=[]).take(1) == [{}] - assert ds.select_columns(columns=["col1", "col2", "col3"]).take(1) == [ - {"col1": 1, "col2": 2, "col3": 3} - ] - assert ds.select_columns(columns=["col1", "col2"]).take(1) == [ - {"col1": 1, "col2": 2} - ] - assert ds.select_columns(columns=["col1", "col2", "col2"]).schema().names == [ - "col1", - "col2", - "col2", - ] - with pytest.raises(ValueError): - ds.select_columns(columns=["col1", "col2", "dummy_col"]) + ds1 = ray.data.from_pandas(df) + assert ds1._dataset_format() == "pandas" + + ds2 = ds1.map_batches(lambda pa: pa, batch_size=1, batch_format="pyarrow") + assert ds2._dataset_format() == "arrow" + + for each_ds in [ds1, ds2]: + assert each_ds.select_columns(cols=[]).take(1) == [{}] + assert each_ds.select_columns(cols=["col1", "col2", "col3"]).take(1) == [ + {"col1": 1, "col2": 2, "col3": 3} + ] + assert each_ds.select_columns(cols=["col1", "col2"]).take(1) == [ + {"col1": 1, "col2": 2} + ] + assert each_ds.select_columns(cols=["col2", "col1"]).take(1) == [ + {"col1": 1, "col2": 2} + ] + # Test selecting columns with duplicates + assert each_ds.select_columns(cols=["col1", "col2", "col2"]).schema().names == [ + "col1", + "col2", + "col2", + ] + # Test selecting a column that is not in the dataset schema + with pytest.raises(KeyError): + each_ds.select_columns(cols=["col1", "col2", "dummy_col"]) # Test simple - ds = ray.data.range(10) - assert ds._dataset_format() == "simple" + ds3 = ray.data.range(10) + assert ds3._dataset_format() == "simple" with pytest.raises(ValueError): - ds.select_columns(columns=[]) + ds3.select_columns(cols=[]) def test_map_batches_basic(ray_start_regular_shared, tmp_path): diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index 202d599c90be..60175df4b7c6 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -627,6 +627,15 @@ def test_randomize_block_order_each_window(ray_start_regular_shared): assert pipe.take() == [0, 1, 4, 5, 2, 3, 6, 7, 10, 11, 8, 9] +def test_add_column(ray_start_regular_shared): + df = pd.DataFrame({"col1": [1, 2, 3]}) + ds = ray.data.from_pandas(df) + pipe = ds.repeat() + assert pipe.add_column("col2", lambda x: x["col1"] + 1).take(1) == [ + {"col1": 1, "col2": 2} + ] + + def test_drop_columns(ray_start_regular_shared): df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) ds = ray.data.from_pandas(df) @@ -634,6 +643,13 @@ def test_drop_columns(ray_start_regular_shared): assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] +def test_select_columns(ray_start_regular_shared): + df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) + ds = ray.data.from_pandas(df) + pipe = ds.repeat() + assert pipe.select_columns(["col2", "col3"]).take(1) == [{"col2": 2, "col3": 3}] + + def test_random_shuffle_each_window_with_custom_resource(ray_start_cluster): ray.shutdown() cluster = ray_start_cluster From 751ee87251a9f27e40255d6eecb4eac72de549f7 Mon Sep 17 00:00:00 2001 From: Michael Mui Date: Tue, 25 Oct 2022 17:39:20 -0700 Subject: [PATCH 15/15] formatting and rebase Signed-off-by: Michael Mui --- python/ray/data/tests/test_dataset_pipeline.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index 60175df4b7c6..7f3895ca3544 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -636,18 +636,18 @@ def test_add_column(ray_start_regular_shared): ] -def test_drop_columns(ray_start_regular_shared): +def test_select_columns(ray_start_regular_shared): df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) ds = ray.data.from_pandas(df) pipe = ds.repeat() - assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] + assert pipe.select_columns(["col2", "col3"]).take(1) == [{"col2": 2, "col3": 3}] -def test_select_columns(ray_start_regular_shared): +def test_drop_columns(ray_start_regular_shared): df = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4], "col3": [3, 4, 5]}) ds = ray.data.from_pandas(df) pipe = ds.repeat() - assert pipe.select_columns(["col2", "col3"]).take(1) == [{"col2": 2, "col3": 3}] + assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] def test_random_shuffle_each_window_with_custom_resource(ray_start_cluster):