diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index 4791341c3ec8..143f7dec8d30 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -61,7 +61,7 @@ If you already have a Parquet dataset with columns containing serialized tensors # Read the Parquet files into a new Dataset, with the serialized tensors # automatically cast to our tensor column extension type. ds = ray.data.read_parquet( - path, _tensor_column_schema={"two": (np.int, (2, 2, 2))}) + path, tensor_column_schema={"two": (np.int, (2, 2, 2))}) # Internally, this column is represented with our Arrow tensor extension # type. @@ -107,7 +107,7 @@ If your serialized tensors don't fit the above constraints (e.g. they're stored # -> one: int64 # two: extension> -Please note that the ``_tensor_column_schema`` and ``_block_udf`` parameters are both experimental developer APIs and may break in future versions. +Please note that the ``tensor_column_schema`` and ``_block_udf`` parameters are both experimental developer APIs and may break in future versions. Working with tensor column datasets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -245,6 +245,6 @@ Limitations This feature currently comes with a few known limitations that we are either actively working on addressing or have already implemented workarounds for. * All tensors in a tensor column currently must be the same shape. Please let us know if you require heterogeneous tensor shape for your tensor column! Tracking issue is `here `__. - * Automatic casting via specifying an override Arrow schema when reading Parquet is blocked by Arrow supporting custom ExtensionType casting kernels. See `issue `__. An explicit ``_tensor_column_schema`` parameter has been added for :func:`read_parquet() ` as a stopgap solution. + * Automatic casting via specifying an override Arrow schema when reading Parquet is blocked by Arrow supporting custom ExtensionType casting kernels. See `issue `__. An explicit ``tensor_column_schema`` parameter has been added for :func:`read_parquet() ` as a stopgap solution. * Ingesting tables with tensor columns into pytorch via ``ds.to_torch()`` is blocked by pytorch supporting tensor creation from objects that implement the `__array__` interface. See `issue `__. Workarounds are being `investigated `__. * Ingesting tables with tensor columns into TensorFlow via ``ds.to_tf()`` is blocked by a Pandas fix for properly interpreting extension arrays in ``DataFrame.values`` being released. See `PR `__. Workarounds are being `investigated `__. diff --git a/doc/source/data/examples/big_data_ingestion.ipynb b/doc/source/data/examples/big_data_ingestion.ipynb index 717ae09a153c..a5df6fa29d05 100644 --- a/doc/source/data/examples/big_data_ingestion.ipynb +++ b/doc/source/data/examples/big_data_ingestion.ipynb @@ -270,17 +270,14 @@ "def create_large_shuffle_pipeline(\n", " data_size_bytes: int, num_epochs: int, num_columns: int, num_shards: int\n", ") -> List[DatasetPipeline]:\n", - " # _spread_resource_prefix is used to ensure tasks are evenly spread to all\n", - " # CPU nodes.\n", " return (\n", " ray.data.read_datasource(\n", " RandomIntRowDatasource(),\n", " n=data_size_bytes // 8 // num_columns,\n", " num_columns=num_columns,\n", - " _spread_resource_prefix=\"node:\",\n", " )\n", " .repeat(num_epochs)\n", - " .random_shuffle_each_window(_spread_resource_prefix=\"node:\")\n", + " .random_shuffle_each_window()\n", " .split(num_shards, equal=True)\n", " )" ] diff --git a/doc/source/ray-core/_examples/datasets_train/datasets_train.py b/doc/source/ray-core/_examples/datasets_train/datasets_train.py index b06d3aa3540d..1a4660d96a88 100644 --- a/doc/source/ray-core/_examples/datasets_train/datasets_train.py +++ b/doc/source/ray-core/_examples/datasets_train/datasets_train.py @@ -121,9 +121,7 @@ def create_data_chunk(n, d, seed, include_label=False): def read_dataset(path: str) -> ray.data.Dataset: print(f"reading data from {path}") - return ray.data.read_parquet(path, _spread_resource_prefix="node:").random_shuffle( - _spread_resource_prefix="node:" - ) + return ray.data.read_parquet(path).random_shuffle() class DataPreprocessor: @@ -596,9 +594,7 @@ def train_func(config): DROPOUT_PROB = 0.2 # Random global shuffle - train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window( - _spread_resource_prefix="node:" - ) + train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window() del train_dataset datasets = {"train_dataset": train_dataset_pipeline, "test_dataset": test_dataset} diff --git a/python/ray/data/aggregate.py b/python/ray/data/aggregate.py index 0e1693b43e3c..22d289234498 100644 --- a/python/ray/data/aggregate.py +++ b/python/ray/data/aggregate.py @@ -14,7 +14,7 @@ from ray.data import Dataset -@PublicAPI(stability="beta") +@PublicAPI class AggregateFn(object): def __init__( self, @@ -64,7 +64,7 @@ def _validate(self, ds: "Dataset") -> None: _validate_key_fn(ds, self._key_fn) -@PublicAPI(stability="beta") +@PublicAPI class Count(AggregateFn): """Defines count aggregation.""" @@ -77,7 +77,7 @@ def __init__(self): ) -@PublicAPI(stability="beta") +@PublicAPI class Sum(_AggregateOnKeyBase): """Defines sum aggregation.""" @@ -94,7 +94,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): ) -@PublicAPI(stability="beta") +@PublicAPI class Min(_AggregateOnKeyBase): """Defines min aggregation.""" @@ -111,7 +111,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): ) -@PublicAPI(stability="beta") +@PublicAPI class Max(_AggregateOnKeyBase): """Defines max aggregation.""" @@ -128,7 +128,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): ) -@PublicAPI(stability="beta") +@PublicAPI class Mean(_AggregateOnKeyBase): """Defines mean aggregation.""" @@ -149,7 +149,7 @@ def __init__(self, on: Optional[KeyFn] = None, ignore_nulls: bool = True): ) -@PublicAPI(stability="beta") +@PublicAPI class Std(_AggregateOnKeyBase): """Defines standard deviation aggregation. diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index f33b532fdc14..37476e076cbf 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -85,7 +85,7 @@ _epoch_warned = False -@PublicAPI(stability="beta") +@PublicAPI class Dataset(Generic[T]): """Implements a distributed Arrow dataset. @@ -167,7 +167,7 @@ def map( ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - fn = cache_wrapper(fn) + fn = cache_wrapper(fn, compute) context = DatasetContext.get_current() def transform(block: Block) -> Iterable[Block]: @@ -240,7 +240,7 @@ def map_batches( import pyarrow as pa import pandas as pd - fn = cache_wrapper(fn) + fn = cache_wrapper(fn, compute) context = DatasetContext.get_current() def transform(block: Block) -> Iterable[Block]: @@ -370,7 +370,7 @@ def flat_map( ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - fn = cache_wrapper(fn) + fn = cache_wrapper(fn, compute) context = DatasetContext.get_current() def transform(block: Block) -> Iterable[Block]: @@ -417,7 +417,7 @@ def filter( ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ - fn = cache_wrapper(fn) + fn = cache_wrapper(fn, compute) context = DatasetContext.get_current() def transform(block: Block) -> Iterable[Block]: @@ -504,8 +504,6 @@ def random_shuffle( *, seed: Optional[int] = None, num_blocks: Optional[int] = None, - _spread_resource_prefix: Optional[str] = None, - _move: bool = False, # TODO: deprecate. ) -> "Dataset[T]": """Randomly shuffle the elements of this dataset. @@ -534,7 +532,7 @@ def do_shuffle(block_list, clear_input_blocks: bool, block_udf, remote_args): num_blocks = block_list.executed_num_blocks() # Blocking. if num_blocks == 0: return block_list, {} - if _move or clear_input_blocks: + if clear_input_blocks: blocks = block_list.copy() block_list.clear() else: @@ -545,7 +543,6 @@ def do_shuffle(block_list, clear_input_blocks: bool, block_udf, remote_args): num_blocks, random_shuffle=True, random_seed=seed, - _spread_resource_prefix=_spread_resource_prefix, map_ray_remote_args=remote_args, reduce_ray_remote_args=remote_args, ) @@ -2523,11 +2520,6 @@ def __iter__(self): ) return pipe - def pipeline(self, *, parallelism: int = 10) -> "DatasetPipeline[T]": - raise DeprecationWarning( - "Use .window(blocks_per_window=n) instead of " ".pipeline(parallelism=n)" - ) - def window( self, *, @@ -2670,21 +2662,6 @@ def __iter__(self): ) return pipe - @DeveloperAPI - def get_internal_block_refs(self) -> List[ObjectRef[Block]]: - """Get a list of references to the underlying blocks of this dataset. - - This function can be used for zero-copy access to the data. It blocks - until the underlying blocks are computed. - - Time complexity: O(1) - - Returns: - A list of references to this dataset's blocks. - """ - return self._plan.execute().get_blocks() - - @DeveloperAPI def fully_executed(self) -> "Dataset[T]": """Force full evaluation of the blocks of this dataset. @@ -2709,11 +2686,24 @@ def fully_executed(self) -> "Dataset[T]": ds._set_uuid(self._get_uuid()) return ds - @DeveloperAPI def stats(self) -> str: """Returns a string containing execution timing information.""" return self._plan.stats().summary_string() + @DeveloperAPI + def get_internal_block_refs(self) -> List[ObjectRef[Block]]: + """Get a list of references to the underlying blocks of this dataset. + + This function can be used for zero-copy access to the data. It blocks + until the underlying blocks are computed. + + Time complexity: O(1) + + Returns: + A list of references to this dataset's blocks. + """ + return self._plan.execute().get_blocks() + def _experimental_lazy(self) -> "Dataset[T]": """Enable lazy evaluation (experimental).""" self._lazy = True diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 7197f7c8cafe..e28e26cfca25 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -35,13 +35,13 @@ logger = logging.getLogger(__name__) # Operations that can be naively applied per dataset row in the pipeline. -PER_DATASET_OPS = ["map", "map_batches", "add_column", "flat_map", "filter"] +_PER_DATASET_OPS = ["map", "map_batches", "add_column", "flat_map", "filter"] # Operations that apply to each dataset holistically in the pipeline. -HOLISTIC_PER_DATASET_OPS = ["repartition", "random_shuffle", "sort"] +_HOLISTIC_PER_DATASET_OPS = ["repartition", "random_shuffle", "sort"] # Similar to above but we should force evaluation immediately. -PER_DATASET_OUTPUT_OPS = [ +_PER_DATASET_OUTPUT_OPS = [ "write_json", "write_csv", "write_parquet", @@ -49,10 +49,10 @@ ] # Operations that operate over the stream of output batches from the pipeline. -OUTPUT_ITER_OPS = ["take", "take_all", "show", "to_tf", "to_torch"] +_OUTPUT_ITER_OPS = ["take", "take_all", "show", "to_tf", "to_torch"] -@PublicAPI(stability="beta") +@PublicAPI class DatasetPipeline(Generic[T]): """Implements a pipeline of Datasets. @@ -475,7 +475,9 @@ def __iter__(self): length=length, ) - def schema(self) -> Union[type, "pyarrow.lib.Schema"]: + def schema( + self, fetch_if_missing: bool = False + ) -> Union[type, "pyarrow.lib.Schema"]: """Return the schema of the dataset pipeline. For datasets of Arrow records, this will return the Arrow schema. @@ -483,11 +485,16 @@ def schema(self) -> Union[type, "pyarrow.lib.Schema"]: Time complexity: O(1) + Args: + fetch_if_missing: If True, synchronously fetch the schema if it's + not known. Default is False, where None is returned if the + schema is not known. + Returns: The Python type or Arrow schema of the records, or None if the schema is not known. """ - return next(self.iter_datasets()).schema() + return next(self.iter_datasets()).schema(fetch_if_missing=fetch_if_missing) def count(self) -> int: """Count the number of records in the dataset pipeline. @@ -670,12 +677,6 @@ def foreach_window( _executed=self._executed, ) - def foreach_dataset(self, *a, **kw) -> None: - raise DeprecationWarning( - "`foreach_dataset` has been renamed to `foreach_window`." - ) - - @DeveloperAPI def stats(self, exclude_first_window: bool = True) -> str: """Returns a string containing execution timing information. @@ -743,7 +744,7 @@ def _optimize_stages(self): self._optimized_stages = optimized_stages -for method in PER_DATASET_OPS: +for method in _PER_DATASET_OPS: def make_impl(method): delegate = getattr(Dataset, method) @@ -766,7 +767,7 @@ def impl(self, *args, **kwargs) -> "DatasetPipeline[U]": setattr(DatasetPipeline, method, make_impl(method)) -for method in HOLISTIC_PER_DATASET_OPS: +for method in _HOLISTIC_PER_DATASET_OPS: def make_impl(method): delegate = getattr(Dataset, method) @@ -798,7 +799,7 @@ def impl(*a, **kw): setattr(DatasetPipeline, method, deprecation_warning(method)) setattr(DatasetPipeline, method + "_each_window", make_impl(method)) -for method in PER_DATASET_OUTPUT_OPS: +for method in _PER_DATASET_OUTPUT_OPS: def make_impl(method): delegate = getattr(Dataset, method) @@ -822,7 +823,7 @@ def impl(self, *args, **kwargs): setattr(DatasetPipeline, method, make_impl(method)) -for method in OUTPUT_ITER_OPS: +for method in _OUTPUT_ITER_OPS: def make_impl(method): delegate = getattr(Dataset, method) diff --git a/python/ray/data/grouped_dataset.py b/python/ray/data/grouped_dataset.py index 673128d41679..57064f5837d1 100644 --- a/python/ray/data/grouped_dataset.py +++ b/python/ray/data/grouped_dataset.py @@ -15,7 +15,7 @@ from ray.data.block import Block, BlockAccessor, BlockMetadata, T, U, KeyType -@PublicAPI(stability="beta") +@PublicAPI class GroupedDataset(Generic[T]): """Represents a grouped dataset created by calling ``Dataset.groupby()``. diff --git a/python/ray/data/impl/compute.py b/python/ray/data/impl/compute.py index 97c54f22a7fc..1f6b42166414 100644 --- a/python/ray/data/impl/compute.py +++ b/python/ray/data/impl/compute.py @@ -1,7 +1,7 @@ from typing import TypeVar, Any, Union, Callable, List, Tuple, Optional import ray -from ray.util.annotations import PublicAPI +from ray.util.annotations import PublicAPI, DeveloperAPI from ray.data.block import ( Block, BlockAccessor, @@ -22,43 +22,13 @@ CallableClass = type +@DeveloperAPI class ComputeStrategy: def _apply(self, fn: Any, blocks: BlockList, clear_input_blocks: bool) -> BlockList: raise NotImplementedError -def _map_block_split(block: Block, fn: Any, input_files: List[str]) -> BlockPartition: - output = [] - stats = BlockExecStats.builder() - for new_block in fn(block): - accessor = BlockAccessor.for_block(new_block) - new_meta = BlockMetadata( - num_rows=accessor.num_rows(), - size_bytes=accessor.size_bytes(), - schema=accessor.schema(), - input_files=input_files, - exec_stats=stats.build(), - ) - owner = DatasetContext.get_current().block_owner - output.append((ray.put(new_block, _owner=owner), new_meta)) - stats = BlockExecStats.builder() - return output - - -def _map_block_nosplit( - block: Block, fn: Any, input_files: List[str] -) -> Tuple[Block, BlockMetadata]: - stats = BlockExecStats.builder() - builder = DelegatingBlockBuilder() - for new_block in fn(block): - builder.add_block(new_block) - new_block = builder.build() - accessor = BlockAccessor.for_block(new_block) - return new_block, accessor.get_metadata( - input_files=input_files, exec_stats=stats.build() - ) - - +@DeveloperAPI class TaskPoolStrategy(ComputeStrategy): def _apply( self, @@ -249,7 +219,8 @@ def map_block_nosplit( def cache_wrapper( - fn: Union[CallableClass, Callable[[Any], Any]] + fn: Union[CallableClass, Callable[[Any], Any]], + compute: Optional[Union[str, ComputeStrategy]], ) -> Callable[[Any], Any]: """Implements caching of stateful callables. @@ -261,6 +232,13 @@ def cache_wrapper( """ if isinstance(fn, CallableClass): + if compute is None: + raise ValueError( + "``compute`` must be specified when using a callable class. " + 'For example, use ``compute="actors"`` or ' + "``compute=ActorPoolStrategy(min, max)``." + ) + def _fn(item: Any) -> Any: if ray.data._cached_fn is None or ray.data._cached_cls != fn: ray.data._cached_cls = fn @@ -281,3 +259,35 @@ def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy: return compute_spec else: raise ValueError("compute must be one of [`tasks`, `actors`, ComputeStrategy]") + + +def _map_block_split(block: Block, fn: Any, input_files: List[str]) -> BlockPartition: + output = [] + stats = BlockExecStats.builder() + for new_block in fn(block): + accessor = BlockAccessor.for_block(new_block) + new_meta = BlockMetadata( + num_rows=accessor.num_rows(), + size_bytes=accessor.size_bytes(), + schema=accessor.schema(), + input_files=input_files, + exec_stats=stats.build(), + ) + owner = DatasetContext.get_current().block_owner + output.append((ray.put(new_block, _owner=owner), new_meta)) + stats = BlockExecStats.builder() + return output + + +def _map_block_nosplit( + block: Block, fn: Any, input_files: List[str] +) -> Tuple[Block, BlockMetadata]: + stats = BlockExecStats.builder() + builder = DelegatingBlockBuilder() + for new_block in fn(block): + builder.add_block(new_block) + new_block = builder.build() + accessor = BlockAccessor.for_block(new_block) + return new_block, accessor.get_metadata( + input_files=input_files, exec_stats=stats.build() + ) diff --git a/python/ray/data/impl/shuffle.py b/python/ray/data/impl/shuffle.py index 57563f3da75e..f56d616f7bea 100644 --- a/python/ray/data/impl/shuffle.py +++ b/python/ray/data/impl/shuffle.py @@ -1,4 +1,3 @@ -import itertools import math from typing import TypeVar, List, Optional, Dict, Any, Tuple, Union, Callable, Iterable @@ -10,7 +9,6 @@ from ray.data.impl.block_list import BlockList from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder from ray.data.impl.remote_fn import cached_remote_fn -from ray.data.impl.util import _get_spread_resources_iter T = TypeVar("T") @@ -24,7 +22,6 @@ def simple_shuffle( random_seed: Optional[int] = None, map_ray_remote_args: Optional[Dict[str, Any]] = None, reduce_ray_remote_args: Optional[Dict[str, Any]] = None, - _spread_resource_prefix: Optional[str] = None ) -> Tuple[BlockList, Dict[str, List[BlockMetadata]]]: input_blocks = input_blocks.get_blocks() if map_ray_remote_args is None: @@ -35,19 +32,6 @@ def simple_shuffle( reduce_ray_remote_args = reduce_ray_remote_args.copy() reduce_ray_remote_args["scheduling_strategy"] = "SPREAD" input_num_blocks = len(input_blocks) - if _spread_resource_prefix is not None: - # Use given spread resource prefix for round-robin resource-based - # scheduling. - nodes = ray.nodes() - map_resource_iter = _get_spread_resources_iter( - nodes, _spread_resource_prefix, map_ray_remote_args - ) - reduce_resource_iter = _get_spread_resources_iter( - nodes, _spread_resource_prefix, reduce_ray_remote_args - ) - else: - # If no spread resource prefix given, yield an empty dictionary. - map_resource_iter, reduce_resource_iter = itertools.tee(itertools.repeat({}), 2) shuffle_map = cached_remote_fn(_shuffle_map) shuffle_reduce = cached_remote_fn(_shuffle_reduce) @@ -58,7 +42,6 @@ def simple_shuffle( shuffle_map.options( **map_ray_remote_args, num_returns=1 + output_num_blocks, - resources=next(map_resource_iter) ).remote(block, block_udf, i, output_num_blocks, random_shuffle, random_seed) for i, block in enumerate(input_blocks) ] @@ -85,7 +68,6 @@ def simple_shuffle( shuffle_reduce.options( **reduce_ray_remote_args, num_returns=2, - resources=next(reduce_resource_iter) ).remote(*[shuffle_map_out[i][j] for i in range(input_num_blocks)]) for j in range(output_num_blocks) ] diff --git a/python/ray/data/impl/util.py b/python/ray/data/impl/util.py index 88f9e6e22c78..690b79bba82b 100644 --- a/python/ray/data/impl/util.py +++ b/python/ray/data/impl/util.py @@ -1,11 +1,7 @@ -import itertools import logging -from typing import List, Dict, Any, Union +from typing import Union from types import ModuleType -from ray.remote_function import DEFAULT_REMOTE_FUNCTION_CPUS -import ray.ray_constants as ray_constants - logger = logging.getLogger(__name__) MIN_PYARROW_VERSION = (4, 0, 1) @@ -55,70 +51,3 @@ def _check_pyarrow_version(): ) else: _VERSION_VALIDATED = True - - -def _get_spread_resources_iter( - nodes: List[Dict[str, Any]], - spread_resource_prefix: str, - ray_remote_args: Dict[str, Any], -): - """Returns a round-robin iterator over resources that match the given - prefix and that coexist on nodes with the resource requests given in the - provided remote args (along with the task resource request defaults). - """ - # Extract the resource labels from the remote args. - resource_request_labels = _get_resource_request_labels(ray_remote_args) - # Filter on the prefix and the requested resources. - spread_resource_labels = _filtered_resources( - nodes, - include_prefix=spread_resource_prefix, - include_colocated_with=resource_request_labels, - ) - if not spread_resource_labels: - # No spreadable resource labels available, raise an error. - raise ValueError( - "No resources both match the provided prefix " - f"{spread_resource_prefix} and are colocated with resources " - f"{resource_request_labels}." - ) - # Return a round-robin resource iterator over the spread labels. - return itertools.cycle([{label: 0.001} for label in spread_resource_labels]) - - -def _get_resource_request_labels(ray_remote_args: Dict[str, Any]): - """Extracts the resource labels from the given remote args, filling in - task resource request defaults. - """ - resource_request_labels = set(ray_remote_args.get("resources", {}).keys()) - if ray_remote_args.get("num_cpus", DEFAULT_REMOTE_FUNCTION_CPUS) > 0: - resource_request_labels.add("CPU") - if "num_gpus" in ray_remote_args: - resource_request_labels.add("GPU") - try: - accelerator_type = ray_remote_args["accelerator_type"] - except KeyError: - pass - else: - resource_request_labels.add( - f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" f"{accelerator_type}" - ) - return resource_request_labels - - -def _filtered_resources( - nodes: List[Dict[str, Any]], include_prefix: str, include_colocated_with: List[str] -): - """Filters cluster resource labels based on the given prefix and the - given resource colocation constraints. - - Returns a list of unique, sorted resource labels. - """ - resources = [ - resource - for node in nodes - if set(include_colocated_with) <= set(node["Resources"].keys()) - for resource in node["Resources"].keys() - if resource.startswith(include_prefix) - ] - # Ensure stable ordering of unique resources. - return sorted(set(resources)) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 0419f2d2b188..981d4a19dd5c 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1,4 +1,3 @@ -import itertools import os import logging from typing import ( @@ -58,14 +57,14 @@ from ray.data.impl.plan import ExecutionPlan from ray.data.impl.remote_fn import cached_remote_fn from ray.data.impl.stats import DatasetStats, get_or_create_stats_actor -from ray.data.impl.util import _get_spread_resources_iter, _lazy_import_pyarrow_dataset +from ray.data.impl.util import _lazy_import_pyarrow_dataset T = TypeVar("T") logger = logging.getLogger(__name__) -@PublicAPI(stability="beta") +@PublicAPI def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]: """Create a dataset from a list of local Python objects. @@ -109,7 +108,7 @@ def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]: ) -@PublicAPI(stability="beta") +@PublicAPI def range(n: int, *, parallelism: int = 200) -> Dataset[int]: """Create a dataset from a range of integers [0..n). @@ -129,7 +128,7 @@ def range(n: int, *, parallelism: int = 200) -> Dataset[int]: ) -@PublicAPI(stability="beta") +@PublicAPI def range_arrow(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]: """Create an Arrow dataset from a range of integers [0..n). @@ -153,7 +152,7 @@ def range_arrow(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]: ) -@PublicAPI(stability="beta") +@PublicAPI def range_tensor( n: int, *, shape: Tuple = (1,), parallelism: int = 200 ) -> Dataset[ArrowRow]: @@ -184,13 +183,12 @@ def range_tensor( ) -@PublicAPI(stability="beta") +@PublicAPI def read_datasource( datasource: Datasource[T], *, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, - _spread_resource_prefix: Optional[str] = None, **read_args, ) -> Dataset[T]: """Read a dataset from a custom data source. @@ -263,30 +261,14 @@ def remote_read(i: int, task: ReadTask, stats_actor) -> MaybeBlockPartition: ray_remote_args["scheduling_strategy"] = "SPREAD" remote_read = cached_remote_fn(remote_read) - if _spread_resource_prefix is not None: - if context.optimize_fuse_stages: - logger.warning( - "_spread_resource_prefix has no effect when optimize_fuse_stages " - "is enabled. Tasks are spread by default." - ) - # Use given spread resource prefix for round-robin resource-based - # scheduling. - nodes = ray.nodes() - resource_iter = _get_spread_resources_iter( - nodes, _spread_resource_prefix, ray_remote_args - ) - else: - # If no spread resource prefix given, yield an empty dictionary. - resource_iter = itertools.repeat({}) - calls: List[Callable[[], ObjectRef[MaybeBlockPartition]]] = [] metadata: List[BlockPartitionMetadata] = [] for i, task in enumerate(read_tasks): calls.append( - lambda i=i, task=task, resources=next(resource_iter): remote_read.options( - **ray_remote_args, resources=resources - ).remote(i, task, stats_actor) + lambda i=i, task=task: remote_read.options(**ray_remote_args).remote( + i, task, stats_actor + ) ) metadata.append(task.get_metadata()) @@ -312,7 +294,7 @@ def remote_read(i: int, task: ReadTask, stats_actor) -> MaybeBlockPartition: ) -@PublicAPI(stability="beta") +@PublicAPI def read_parquet( paths: Union[str, List[str]], *, @@ -320,7 +302,7 @@ def read_parquet( columns: Optional[List[str]] = None, parallelism: int = 200, ray_remote_args: Dict[str, Any] = None, - _tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None, + tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None, **arrow_parquet_args, ) -> Dataset[ArrowRow]: """Create an Arrow dataset from parquet files. @@ -339,7 +321,7 @@ def read_parquet( parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files of the dataset. ray_remote_args: kwargs passed to ray.remote in the read tasks. - _tensor_column_schema: A dict of column name --> tensor dtype and shape + tensor_column_schema: A dict of column name --> tensor dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to our tensor column extension type. This assumes that the tensors were serialized in the raw @@ -350,13 +332,13 @@ def read_parquet( Returns: Dataset holding Arrow records read from the specified paths. """ - if _tensor_column_schema is not None: + if tensor_column_schema is not None: existing_block_udf = arrow_parquet_args.pop("_block_udf", None) def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table": from ray.data.extensions import ArrowTensorArray - for tensor_col_name, (dtype, shape) in _tensor_column_schema.items(): + for tensor_col_name, (dtype, shape) in tensor_column_schema.items(): # NOTE(Clark): We use NumPy to consolidate these potentially # non-contiguous buffers, and to do buffer bookkeeping in # general. @@ -390,7 +372,7 @@ def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table": ) -@PublicAPI(stability="beta") +@PublicAPI def read_json( paths: Union[str, List[str]], *, @@ -437,7 +419,7 @@ def read_json( ) -@PublicAPI(stability="beta") +@PublicAPI def read_csv( paths: Union[str, List[str]], *, @@ -484,7 +466,7 @@ def read_csv( ) -@PublicAPI(stability="beta") +@PublicAPI def read_text( paths: Union[str, List[str]], *, @@ -533,7 +515,7 @@ def to_text(s): ).flat_map(to_text) -@PublicAPI(stability="beta") +@PublicAPI def read_numpy( paths: Union[str, List[str]], *, @@ -577,7 +559,7 @@ def read_numpy( ) -@PublicAPI(stability="beta") +@PublicAPI def read_binary_files( paths: Union[str, List[str]], *, @@ -623,7 +605,7 @@ def read_binary_files( ) -@PublicAPI(stability="beta") +@PublicAPI def from_dask(df: "dask.DataFrame") -> Dataset[ArrowRow]: """Create a dataset from a Dask DataFrame. @@ -656,7 +638,7 @@ def to_ref(df): ) -@PublicAPI(stability="beta") +@PublicAPI def from_mars(df: "mars.DataFrame") -> Dataset[ArrowRow]: """Create a dataset from a MARS dataframe. @@ -669,7 +651,7 @@ def from_mars(df: "mars.DataFrame") -> Dataset[ArrowRow]: raise NotImplementedError # P1 -@PublicAPI(stability="beta") +@PublicAPI def from_modin(df: "modin.DataFrame") -> Dataset[ArrowRow]: """Create a dataset from a Modin dataframe. @@ -685,7 +667,7 @@ def from_modin(df: "modin.DataFrame") -> Dataset[ArrowRow]: return from_pandas_refs(parts) -@PublicAPI(stability="beta") +@PublicAPI def from_pandas( dfs: Union["pandas.DataFrame", List["pandas.DataFrame"]] ) -> Dataset[ArrowRow]: @@ -779,7 +761,7 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: ) -@PublicAPI(stability="beta") +@PublicAPI def from_arrow( tables: Union["pyarrow.Table", bytes, List[Union["pyarrow.Table", bytes]]] ) -> Dataset[ArrowRow]: @@ -830,7 +812,7 @@ def from_arrow_refs( ) -@PublicAPI(stability="beta") +@PublicAPI def from_spark( df: "pyspark.sql.DataFrame", *, parallelism: Optional[int] = None ) -> Dataset[ArrowRow]: diff --git a/python/ray/data/row.py b/python/ray/data/row.py index 7fe53c7516fb..965b781aa584 100644 --- a/python/ray/data/row.py +++ b/python/ray/data/row.py @@ -4,7 +4,7 @@ from ray.util.annotations import PublicAPI -@PublicAPI(stability="beta") +@PublicAPI class TableRow(Mapping): """ A dict-like row of a tabular ``Dataset``. diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 0649b37c43d6..6cc61c033cc7 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -128,6 +128,10 @@ def __call__(self, x): self.num_reuses += 1 return r + # Need to specify compute explicitly. + with pytest.raises(ValueError): + ds.map(StatefulFn).take() + # map task_reuse = ds.map(StatefulFn, compute="tasks").take() assert sorted(task_reuse) == list(range(10)), task_reuse @@ -770,8 +774,8 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet( str(tmp_path), + tensor_column_schema={tensor_col_name: (arr.dtype, inner_shape)}, _block_udf=_block_udf, - _tensor_column_schema={tensor_col_name: (arr.dtype, inner_shape)}, ) assert isinstance(ds.schema().field_by_name(tensor_col_name).type, ArrowTensorType) @@ -3075,13 +3079,13 @@ def range(n, parallelism=200): # Test move. ds = range(100, parallelism=2) - r1 = ds.random_shuffle(_move=True).take(999) + r1 = ds.random_shuffle().take(999) if pipelined: with pytest.raises(RuntimeError): ds = ds.map(lambda x: x).take(999) else: ds = ds.map(lambda x: x).take(999) - r2 = range(100).random_shuffle(_move=True).take(999) + r2 = range(100).random_shuffle().take(999) assert r1 != r2, (r1, r2) # Test empty dataset. @@ -3091,8 +3095,7 @@ def range(n, parallelism=200): assert r1.take() == ds.take() -@pytest.mark.parametrize("use_spread_resource_prefix", [False, True]) -def test_random_shuffle_spread(ray_start_cluster, use_spread_resource_prefix): +def test_random_shuffle_spread(ray_start_cluster): cluster = ray_start_cluster cluster.add_node( resources={"bar:1": 100}, @@ -3111,9 +3114,7 @@ def get_node_id(): node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote()) node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote()) - ds = ray.data.range(100, parallelism=2).random_shuffle( - _spread_resource_prefix=("bar:" if use_spread_resource_prefix else None) - ) + ds = ray.data.range(100, parallelism=2).random_shuffle() blocks = ds.get_internal_block_refs() ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) @@ -3123,8 +3124,7 @@ def get_node_id(): assert set(locations) == {node1_id, node2_id} -@pytest.mark.parametrize("use_spread_resource_prefix", [False, True]) -def test_parquet_read_spread(ray_start_cluster, tmp_path, use_spread_resource_prefix): +def test_parquet_read_spread(ray_start_cluster, tmp_path): cluster = ray_start_cluster cluster.add_node( resources={"bar:1": 100}, @@ -3151,51 +3151,7 @@ def get_node_id(): path2 = os.path.join(data_path, "test2.parquet") df2.to_parquet(path2) - ds = ray.data.read_parquet( - data_path, - _spread_resource_prefix=("bar:" if use_spread_resource_prefix else None), - ) - - # Force reads. - blocks = ds.get_internal_block_refs() - assert len(blocks) == 2 - - ray.wait(blocks, num_returns=len(blocks), fetch_local=False) - location_data = ray.experimental.get_object_locations(blocks) - locations = [] - for block in blocks: - locations.extend(location_data[block]["node_ids"]) - assert set(locations) == {node1_id, node2_id} - - -def test_parquet_read_spread_no_cpus(ray_start_cluster, tmp_path): - cluster = ray_start_cluster - cluster.add_node( - resources={"foo": 100}, _system_config={"max_direct_call_object_size": 0} - ) - cluster.add_node(resources={"bar:1": 100}) - cluster.add_node(resources={"bar:2": 100}, num_cpus=0) - - ray.init(cluster.address) - - @ray.remote(num_cpus=0) - def get_node_id(): - return ray.get_runtime_context().node_id.hex() - - node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote()) - node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote()) - - data_path = str(tmp_path) - df1 = pd.DataFrame({"one": list(range(100)), "two": list(range(100, 200))}) - path1 = os.path.join(data_path, "test1.parquet") - df1.to_parquet(path1) - df2 = pd.DataFrame({"one": list(range(300, 400)), "two": list(range(400, 500))}) - path2 = os.path.join(data_path, "test2.parquet") - df2.to_parquet(path2) - - ds = ray.data.read_parquet( - data_path, ray_remote_args={"num_cpus": 0}, _spread_resource_prefix="bar:" - ) + ds = ray.data.read_parquet(data_path) # Force reads. blocks = ds.get_internal_block_refs()