diff --git a/doc/source/data/api/input_output.rst b/doc/source/data/api/input_output.rst index 1a95297f7a30..c3b5f19f2eb0 100644 --- a/doc/source/data/api/input_output.rst +++ b/doc/source/data/api/input_output.rst @@ -227,6 +227,9 @@ Partitioning API datasource.PathPartitionEncoder datasource.PathPartitionParser datasource.PathPartitionFilter + datasource.FileExtensionFilter + +.. _metadata_provider: MetadataProvider API -------------------- @@ -240,3 +243,16 @@ MetadataProvider API datasource.DefaultFileMetadataProvider datasource.DefaultParquetMetadataProvider datasource.FastFileMetadataProvider + + +.. _block_write_path_provider: + +BlockWritePathProvider API +-------------------------- + +.. autosummary:: + :toctree: doc/ + + datasource.BlockWritePathProvider + datasource.DefaultBlockWritePathProvider + diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 24668663a080..2c0ce87c3a58 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -18,6 +18,8 @@ If your transformation isn't vectorized, there's no performance benefit. Optimizing reads ---------------- +.. _read_parallelism: + Tuning read parallelism ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b101a41eeff5..bbab7e00944b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -308,7 +308,7 @@ def map( ... def __call__(self, batch): ... return self.model(batch) >>> # Apply the transform in parallel on GPUs. Since - >>> # compute=ActorPoolStrategy(size=8) the transform will be applied on a + >>> # compute=ActorPoolStrategy(size=8) the transform is applied on a >>> # pool of 8 Ray actors, each allocated 1 GPU by Ray. >>> ds.map(CachedModel, # doctest: +SKIP ... compute=ray.data.ActorPoolStrategy(size=8), @@ -411,9 +411,9 @@ def map_batches( .. note:: The size of the batches provided to ``fn`` may be smaller than the provided ``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent to - a given map task. When ``batch_size`` is specified, each map task will be + a given map task. When ``batch_size`` is specified, each map task is sent a single block if the block is equal to or larger than ``batch_size``, - and will be sent a bundle of blocks up to (but not exceeding) + and is sent a bundle of blocks up to (but not exceeding) ``batch_size`` if blocks are smaller than ``batch_size``. Examples: @@ -504,10 +504,10 @@ def map_batches( exactly as is with no additional formatting. zero_copy_batch: Whether ``fn`` should be provided zero-copy, read-only batches. If this is ``True`` and no copy is required for the - ``batch_format`` conversion, the batch will be a zero-copy, read-only + ``batch_format`` conversion, the batch is a zero-copy, read-only view on data in Ray's object store, which can decrease memory utilization and improve performance. If this is ``False``, the batch - will be writable, which will require an extra copy to guarantee. + is writable, which will require an extra copy to guarantee. If ``fn`` mutates its input, this will need to be ``False`` in order to avoid "assignment destination is read-only" or "buffer source array is read-only" errors. Default is ``False``. @@ -667,7 +667,7 @@ def add_column( Args: col: Name of the column to add. If the name already exists, the - column will be overwritten. + column is overwritten. fn: Map function generating the column values given a batch of records in pandas format. compute: The compute strategy, either "tasks" (default) to use Ray @@ -715,7 +715,7 @@ def drop_columns( Args: cols: Names of the columns to drop. If any name does not exist, - an exception will be raised. + an exception is raised. compute: The compute strategy, either "tasks" (default) to use Ray tasks, ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed-size actor pool, or ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` for an @@ -763,7 +763,7 @@ def select_columns( Args: cols: Names of the columns to select. If any name is not included in the - dataset schema, an exception will be raised. + dataset schema, an exception is raised. compute: The compute strategy, either "tasks" (default) to use Ray tasks, ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed-size actor pool, or ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` for an @@ -986,7 +986,7 @@ def random_shuffle( Time complexity: O(dataset size / parallelism) Args: - seed: Fix the random seed to use, otherwise one will be chosen + seed: Fix the random seed to use, otherwise one is chosen based on system randomness. num_blocks: The number of output blocks after the shuffle, or None to retain the number of blocks. @@ -1026,7 +1026,7 @@ def randomize_block_order( >>> ds.randomize_block_order(seed=12345) # doctest: +SKIP Args: - seed: Fix the random seed to use, otherwise one will be chosen + seed: Fix the random seed to use, otherwise one is chosen based on system randomness. Returns: @@ -1153,7 +1153,7 @@ def streaming_split( n: Number of output iterators to return. equal: If True, each output iterator will see an exactly equal number of rows, dropping data if necessary. If False, some iterators may see - slightly more or less rows than other, but no data will be dropped. + slightly more or less rows than other, but no data is dropped. locality_hints: Specify the node ids corresponding to each iterator location. Dataset will try to minimize data movement based on the iterator output locations. This list must have length ``n``. You can @@ -1409,8 +1409,8 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: Args: indices: List of sorted integers which indicate where the dataset - will be split. If an index exceeds the length of the dataset, - an empty dataset will be returned. + is split. If an index exceeds the length of the dataset, + an empty dataset is returned. Returns: The dataset splits. @@ -1471,9 +1471,9 @@ def split_proportionately( and test sets (equivalent to eg. scikit-learn's ``train_test_split``). See also ``Dataset.train_test_split`` for a higher level abstraction. - The indices to split at will be calculated in such a way so that all splits + The indices to split at is calculated in such a way so that all splits always contains at least one element. If that is not possible, - an exception will be raised. + an exception is raised. This is equivalent to caulculating the indices manually and calling ``Dataset.split_at_indices``. @@ -1558,7 +1558,7 @@ def train_test_split( shuffle: Whether or not to globally shuffle the dataset before splitting. Defaults to False. This may be a very expensive operation with large dataset. - seed: Fix the random seed to use for shuffle, otherwise one will be chosen + seed: Fix the random seed to use for shuffle, otherwise one is chosen based on system randomness. Ignored if ``shuffle=False``. Returns: @@ -1814,8 +1814,8 @@ def sum( Args: on: a column name or a list of column names to aggregate. ignore_nulls: Whether to ignore null values. If ``True``, null - values will be ignored when computing the sum; if ``False``, - if a null value is encountered, the output will be None. + values are ignored when computing the sum; if ``False``, + if a null value is encountered, the output is None. We consider np.nan, None, and pd.NaT to be null values. Default is ``True``. @@ -1832,7 +1832,7 @@ def sum( containing the column-wise sum of the provided columns. If the dataset is empty, all values are null, or any value is null - AND ``ignore_nulls`` is ``False``, then the output will be None. + AND ``ignore_nulls`` is ``False``, then the output is None. """ ret = self._aggregate_on(Sum, on, ignore_nulls) return self._aggregate_result(ret) @@ -1855,8 +1855,8 @@ def min( Args: on: a column name or a list of column names to aggregate. ignore_nulls: Whether to ignore null values. If ``True``, null - values will be ignored when computing the min; if ``False``, - if a null value is encountered, the output will be None. + values are ignored when computing the min; if ``False``, + if a null value is encountered, the output is None. We consider np.nan, None, and pd.NaT to be null values. Default is ``True``. @@ -1873,7 +1873,7 @@ def min( containing the column-wise min of the provided columns. If the dataset is empty, all values are null, or any value is null - AND ``ignore_nulls`` is ``False``, then the output will be None. + AND ``ignore_nulls`` is ``False``, then the output is None. """ ret = self._aggregate_on(Min, on, ignore_nulls) return self._aggregate_result(ret) @@ -1896,8 +1896,8 @@ def max( Args: on: a column name or a list of column names to aggregate. ignore_nulls: Whether to ignore null values. If ``True``, null - values will be ignored when computing the max; if ``False``, - if a null value is encountered, the output will be None. + values are ignored when computing the max; if ``False``, + if a null value is encountered, the output is None. We consider np.nan, None, and pd.NaT to be null values. Default is ``True``. @@ -1914,7 +1914,7 @@ def max( containing the column-wise max of the provided columns. If the dataset is empty, all values are null, or any value is null - AND ``ignore_nulls`` is ``False``, then the output will be None. + AND ``ignore_nulls`` is ``False``, then the output is None. """ ret = self._aggregate_on(Max, on, ignore_nulls) return self._aggregate_result(ret) @@ -1937,8 +1937,8 @@ def mean( Args: on: a column name or a list of column names to aggregate. ignore_nulls: Whether to ignore null values. If ``True``, null - values will be ignored when computing the mean; if ``False``, - if a null value is encountered, the output will be None. + values are ignored when computing the mean; if ``False``, + if a null value is encountered, the output is None. We consider np.nan, None, and pd.NaT to be null values. Default is ``True``. @@ -1955,7 +1955,7 @@ def mean( containing the column-wise mean of the provided columns. If the dataset is empty, all values are null, or any value is null - AND ``ignore_nulls`` is ``False``, then the output will be None. + AND ``ignore_nulls`` is ``False``, then the output is None. """ ret = self._aggregate_on(Mean, on, ignore_nulls) return self._aggregate_result(ret) @@ -1991,8 +1991,8 @@ def std( ddof: Delta Degrees of Freedom. The divisor used in calculations is ``N - ddof``, where ``N`` represents the number of elements. ignore_nulls: Whether to ignore null values. If ``True``, null - values will be ignored when computing the std; if ``False``, - if a null value is encountered, the output will be None. + values are ignored when computing the std; if ``False``, + if a null value is encountered, the output is None. We consider np.nan, None, and pd.NaT to be null values. Default is ``True``. @@ -2009,7 +2009,7 @@ def std( containing the column-wise std of the provided columns. If the dataset is empty, all values are null, or any value is null - AND ``ignore_nulls`` is ``False``, then the output will be None. + AND ``ignore_nulls`` is ``False``, then the output is None. """ ret = self._aggregate_on(Std, on, ignore_nulls, ddof=ddof) return self._aggregate_result(ret) @@ -2052,11 +2052,11 @@ def sort(self, key: Optional[str] = None, descending: bool = False) -> "Dataset" def zip(self, other: "Dataset") -> "Dataset": """Materialize and zip this dataset with the elements of another. - The datasets must have the same number of rows. Their column sets will be + The datasets must have the same number of rows. Their column sets are merged, and any duplicate column names disambiguated with _1, _2, etc. suffixes. .. note:: - The smaller of the two datasets will be repartitioned to align the number + The smaller of the two datasets are repartitioned to align the number of rows per block with the larger dataset. .. note:: @@ -2419,46 +2419,66 @@ def write_parquet( ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args, ) -> None: - """Write the dataset to parquet. + """Writes the :class:`~ray.data.Dataset` to parquet files under the provided ``path``. - This is only supported for datasets convertible to Arrow records. - To control the number of files, use :meth:`Dataset.repartition`. - - Unless a custom block path provider is given, the format of the output - files will be {uuid}_{block_idx}.parquet, where ``uuid`` is an unique - id for the dataset. + The number of files is determined by the number of blocks in the dataset. + To control the number of number of blocks, call + :meth:`~ray.data.Dataset.repartition`. - Examples: + If pyarrow can't represent your data, this method errors. - .. testcode:: - :skipif: True + By default, the format of the output files is ``{uuid}_{block_idx}.parquet``, + where ``uuid`` is a unique + id for the dataset. To modify this behavior, implement a custom + :class:`~ray.data.datasource.BlockWritePathProvider` + and pass it in as the ``block_path_provider`` argument. - import ray - - ds = ray.data.range(100) - ds.write_parquet("s3://bucket/folder/") + Examples: + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.write_parquet("local:///tmp/data/") Time complexity: O(dataset size / parallelism) Args: - path: The path to the destination root directory, where Parquet - files will be written to. - filesystem: The filesystem implementation to write to. - try_create_dir: Try to create all directories in destination path - if True. Does nothing if all directories already exist. + path: The path to the destination root directory, where + parquet files are written to. + filesystem: The pyarrow filesystem implementation to write to. + These filesystems are specified in the + `pyarrow docs `_. + Specify this if you need to provide specific configurations to the + filesystem. By default, the filesystem is automatically selected based + on the scheme of the paths. For example, if the path begins with + ``s3://``, the ``S3FileSystem`` is used. + try_create_dir: If ``True``, attempts to create all directories in the + destination path. Does nothing if all directories already + exist. Defaults to ``True``. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_output_stream - block_path_provider: BlockWritePathProvider implementation to - write each dataset block to a custom output path. + `pyarrow.fs.FileSystem.open_output_stream `_, which is used when + opening the file to write to. + block_path_provider: A + :class:`~ray.data.datasource.BlockWritePathProvider` + implementation specifying the filename structure for each output + parquet file. By default, the format of the output files is + ``{uuid}_{block_idx}.parquet``, where ``uuid`` is a unique id for the + dataset. arrow_parquet_args_fn: Callable that returns a dictionary of write - arguments to use when writing each block to a file. Overrides - any duplicate keys from arrow_parquet_args. This should be used - instead of arrow_parquet_args if any of your write arguments - cannot be pickled, or if you'd like to lazily resolve the write + arguments that are provided to `pyarrow.parquet.write_table() `_ + when writing each block to a file. Overrides + any duplicate keys from ``arrow_parquet_args``. Use this argument + instead of ``arrow_parquet_args`` if any of your write arguments + can't pickled, or if you'd like to lazily resolve the write arguments for each dataset block. - ray_remote_args: Kwargs passed to ray.remote in the write tasks. + ray_remote_args: Kwargs passed to :meth:`~ray.remote` in the write tasks. arrow_parquet_args: Options to pass to - pyarrow.parquet.write_table(), which is used to write out each + `pyarrow.parquet.write_table() `_, which is used to write out each block to a file. """ self.write_datasource( @@ -2487,47 +2507,70 @@ def write_json( ray_remote_args: Dict[str, Any] = None, **pandas_json_args, ) -> None: - """Write the dataset to json. + """Writes the :class:`~ray.data.Dataset` to JSON files. - This is only supported for datasets convertible to Arrow records. - To control the number of files, use :meth:`Dataset.repartition`. + The number of files is determined by the number of blocks in the dataset. + To control the number of number of blocks, call + :meth:`~ray.data.Dataset.repartition`. - Unless a custom block path provider is given, the format of the output - files will be {self._uuid}_{block_idx}.json, where ``uuid`` is an - unique id for the dataset. + This method is only supported for datasets with records that are convertible to + pandas dataframes. - Examples: + By default, the format of the output files is ``{uuid}_{block_idx}.json``, + where ``uuid`` is a unique id for the dataset. To modify this behavior, + implement a custom + :class:`~ray.data.file_based_datasource.BlockWritePathProvider` + and pass it in as the ``block_path_provider`` argument. - .. testcode:: - :skipif: True - - import ray + Examples: + Write the dataset as JSON files to a local directory. - ds = ray.data.range(100) - ds.write_json("s3://bucket/folder/") + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.write_json("local:///tmp/data") Time complexity: O(dataset size / parallelism) Args: - path: The path to the destination root directory, where json - files will be written to. - filesystem: The filesystem implementation to write to. - try_create_dir: Try to create all directories in destination path - if True. Does nothing if all directories already exist. + path: The path to the destination root directory, where + the JSON files are written to. + filesystem: The pyarrow filesystem implementation to write to. + These filesystems are specified in the + `pyarrow docs `_. + Specify this if you need to provide specific configurations to the + filesystem. By default, the filesystem is automatically selected based + on the scheme of the paths. For example, if the path begins with + ``s3://``, the ``S3FileSystem`` is used. + try_create_dir: If ``True``, attempts to create all directories in the + destination path. Does nothing if all directories already + exist. Defaults to ``True``. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_output_stream - block_path_provider: BlockWritePathProvider implementation to - write each dataset block to a custom output path. + `pyarrow.fs.FileSystem.open_output_stream `_, which is used when + opening the file to write to. + block_path_provider: A + :class:`~ray.data.datasource.BlockWritePathProvider` + implementation specifying the filename structure for each output + parquet file. By default, the format of the output files is + ``{uuid}_{block_idx}.json``, where ``uuid`` is a unique id for the + dataset. pandas_json_args_fn: Callable that returns a dictionary of write - arguments to use when writing each block to a file. Overrides - any duplicate keys from pandas_json_args. This should be used - instead of pandas_json_args if any of your write arguments - cannot be pickled, or if you'd like to lazily resolve the write + arguments that are provided to + `pandas.DataFrame.to_json() `_ + when writing each block to a file. Overrides + any duplicate keys from ``pandas_json_args``. Use this parameter + instead of ``pandas_json_args`` if any of your write arguments + can't be pickled, or if you'd like to lazily resolve the write arguments for each dataset block. - ray_remote_args: Kwargs passed to ray.remote in the write tasks. - pandas_json_args: These args will be passed to - pandas.DataFrame.to_json(), which we use under the hood to - write out each Dataset block. These + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the write tasks. + pandas_json_args: These args are passed to + `pandas.DataFrame.to_json() `_, + which is used under the hood to write out each + :class:`~ray.data.Dataset` block. These are dict(orient="records", lines=True) by default. """ self.write_datasource( @@ -2556,45 +2599,74 @@ def write_csv( ray_remote_args: Dict[str, Any] = None, **arrow_csv_args, ) -> None: - """Write the dataset to csv. + """Writes the :class:`~ray.data.Dataset` to CSV files. - This is only supported for datasets convertible to Arrow records. - To control the number of files, use :meth:`Dataset.repartition`. + The number of files is determined by the number of blocks in the dataset. + To control the number of number of blocks, call + :meth:`~ray.data.Dataset.repartition`. - Unless a custom block path provider is given, the format of the output - files will be {uuid}_{block_idx}.csv, where ``uuid`` is an unique id - for the dataset. + This method is only supported for datasets with records that are convertible to + pyarrow tables. + + By default, the format of the output files is ``{uuid}_{block_idx}.csv``, + where ``uuid`` is a unique id for the dataset. To modify this behavior, + implement a custom + :class:`~ray.data.datasource.BlockWritePathProvider` + and pass it in as the ``block_path_provider`` argument. Examples: + Write the dataset as CSV files to a local directory. - .. testcode:: - :skipif: True + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.write_csv("local:///tmp/data") - import ray + Write the dataset as CSV files to S3. - ds = ray.data.range(100) - ds.write_csv("s3://bucket/folder/") + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.write_csv("s3://bucket/folder/) # doctest: +SKIP Time complexity: O(dataset size / parallelism) Args: - path: The path to the destination root directory, where csv - files will be written to. - filesystem: The filesystem implementation to write to. - try_create_dir: Try to create all directories in destination path - if True. Does nothing if all directories already exist. + path: The path to the destination root directory, where + the CSV files are written to. + filesystem: The pyarrow filesystem implementation to write to. + These filesystems are specified in the + `pyarrow docs `_. + Specify this if you need to provide specific configurations to the + filesystem. By default, the filesystem is automatically selected based + on the scheme of the paths. For example, if the path begins with + ``s3://``, the ``S3FileSystem`` is used. + try_create_dir: If ``True``, attempts to create all directories in the + destination path if ``True``. Does nothing if all directories already + exist. Defaults to ``True``. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_output_stream - block_path_provider: BlockWritePathProvider implementation to - write each dataset block to a custom output path. + `pyarrow.fs.FileSystem.open_output_stream `_, which is used when + opening the file to write to. + block_path_provider: A + :class:`~ray.data.datasource.BlockWritePathProvider` + implementation specifying the filename structure for each output + parquet file. By default, the format of the output files is + ``{uuid}_{block_idx}.csv``, where ``uuid`` is a unique id for the + dataset. arrow_csv_args_fn: Callable that returns a dictionary of write - arguments to use when writing each block to a file. Overrides - any duplicate keys from arrow_csv_args. This should be used - instead of arrow_csv_args if any of your write arguments - cannot be pickled, or if you'd like to lazily resolve the write - arguments for each dataset block. - ray_remote_args: Kwargs passed to ray.remote in the write tasks. - arrow_csv_args: Other CSV write options to pass to pyarrow. + arguments that are provided to `pyarrow.write.write_csv `_ when writing each + block to a file. Overrides any duplicate keys from ``arrow_csv_args``. + Use this argument instead of ``arrow_csv_args`` if any of your write + arguments cannot be pickled, or if you'd like to lazily resolve the + write arguments for each dataset block. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the write tasks. + arrow_csv_args: Options to pass to `pyarrow.write.write_csv `_ + when writing each block to a file. """ self.write_datasource( CSVDatasource(), @@ -2621,11 +2693,12 @@ def write_tfrecords( block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(), ray_remote_args: Dict[str, Any] = None, ) -> None: - """Write the dataset to TFRecord files. + """Write the :class:`~ray.data.Dataset` to TFRecord files. The `TFRecord `_ - files will contain - `tf.train.Example `_ # noqa: E501 + files contain + `tf.train.Example `_ records, with one Example record for each row in the dataset. .. warning:: @@ -2633,36 +2706,52 @@ def write_tfrecords( so this function only supports datasets with these data types, and will error if the dataset contains unsupported types. - This is only supported for datasets convertible to Arrow records. - To control the number of files, use :meth:`Dataset.repartition`. - - Unless a custom block path provider is given, the format of the output - files will be {uuid}_{block_idx}.tfrecords, where ``uuid`` is an unique id - for the dataset. + The number of files is determined by the number of blocks in the dataset. + To control the number of number of blocks, call + :meth:`~ray.data.Dataset.repartition`. - Examples: + This method is only supported for datasets with records that are convertible to + pyarrow tables. - .. testcode:: - :skipif: True + By default, the format of the output files is ``{uuid}_{block_idx}.tfrecords``, + where ``uuid`` is a unique id for the dataset. To modify this behavior, + implement a custom + :class:`~ray.data.file_based_datasource.BlockWritePathProvider` + and pass it in as the ``block_path_provider`` argument. - import ray - - ds = ray.data.range(100) - ds.write_tfrecords("s3://bucket/folder/") + Examples: + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.write_tfrecords("local:///tmp/data/") Time complexity: O(dataset size / parallelism) Args: path: The path to the destination root directory, where tfrecords - files will be written to. - filesystem: The filesystem implementation to write to. - try_create_dir: Try to create all directories in destination path - if True. Does nothing if all directories already exist. + files are written to. + filesystem: The pyarrow filesystem implementation to write to. + These filesystems are specified in the + `pyarrow docs `_. + Specify this if you need to provide specific configurations to the + filesystem. By default, the filesystem is automatically selected based + on the scheme of the paths. For example, if the path begins with + ``s3://``, the ``S3FileSystem`` is used. + try_create_dir: If ``True``, attempts to create all directories in the + destination path. Does nothing if all directories already + exist. Defaults to ``True``. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_output_stream - block_path_provider: BlockWritePathProvider implementation to - write each dataset block to a custom output path. - ray_remote_args: Kwargs passed to ray.remote in the write tasks. + `pyarrow.fs.FileSystem.open_output_stream `_, which is used when + opening the file to write to. + block_path_provider: A + :class:`~ray.data.datasource.BlockWritePathProvider` + implementation specifying the filename structure for each output + parquet file. By default, the format of the output files is + ``{uuid}_{block_idx}.tfrecords``, where ``uuid`` is a unique id for the + dataset. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the write tasks. """ @@ -2691,7 +2780,7 @@ def write_webdataset( ray_remote_args: Dict[str, Any] = None, encoder: Optional[Union[bool, str, callable, list]] = True, ) -> None: - """Write the dataset to WebDataset files. + """Writes the dataset to WebDataset files. The `TFRecord `_ files will contain @@ -2707,7 +2796,7 @@ def write_webdataset( To control the number of files, use :meth:`Dataset.repartition`. Unless a custom block path provider is given, the format of the output - files will be {uuid}_{block_idx}.tfrecords, where ``uuid`` is an unique id + files is {uuid}_{block_idx}.tfrecords, where ``uuid`` is a unique id for the dataset. Examples: @@ -2724,10 +2813,11 @@ def write_webdataset( Args: path: The path to the destination root directory, where tfrecords - files will be written to. + files are written to. filesystem: The filesystem implementation to write to. - try_create_dir: Try to create all directories in destination path - if True. Does nothing if all directories already exist. + try_create_dir: If ``True``, attempts to create all + directories in the destination path. Does nothing if all directories + already exist. Defaults to ``True``. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_output_stream block_path_provider: BlockWritePathProvider implementation to @@ -2755,54 +2845,64 @@ def write_numpy( self, path: str, *, - column: Optional[str] = None, + column: str, filesystem: Optional["pyarrow.fs.FileSystem"] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(), ray_remote_args: Dict[str, Any] = None, ) -> None: - """Write a tensor column of the dataset to npy files. + """Writes a column of the :class:`~ray.data.Dataset` to .npy files. - This is only supported for datasets convertible to Arrow records that - contain a TensorArray column. To control the number of files, use - :meth:`Dataset.repartition`. + This is only supported for columns in the datasets that can be converted to + NumPy arrays. - Unless a custom block path provider is given, the format of the output - files will be {self._uuid}_{block_idx}.npy, where ``uuid`` is an unique - id for the dataset. + The number of files is determined by the number of blocks in the dataset. + To control the number of number of blocks, call + :meth:`~ray.data.Dataset.repartition`. - Examples: + By default, the format of the output files is ``{uuid}_{block_idx}.npy``, + where ``uuid`` is a unique id for the dataset. To modify this behavior, + implement a custom + :class:`~ray.data.datasource.BlockWritePathProvider` + and pass it in as the ``block_path_provider`` argument. - .. testcode:: - :skipif: True - - import ray - - ds = ray.data.range(100) - ds.write_numpy("s3://bucket/folder/", column="id") + Examples: + >>> import ray + >>> ds = ray.data.range(100) + >>> ds.write_numpy("local:///tmp/data/", column="id") Time complexity: O(dataset size / parallelism) Args: - path: The path to the destination root directory, where npy - files will be written to. - column: The name of the table column that contains the tensor to + path: The path to the destination root directory, where + the npy files are written to. + column: The name of the column that contains the data to be written. - filesystem: The filesystem implementation to write to. - try_create_dir: Try to create all directories in destination path - if True. Does nothing if all directories already exist. + filesystem: The pyarrow filesystem implementation to write to. + These filesystems are specified in the + `pyarrow docs `_. + Specify this if you need to provide specific configurations to the + filesystem. By default, the filesystem is automatically selected based + on the scheme of the paths. For example, if the path begins with + ``s3://``, the ``S3FileSystem`` is used. + try_create_dir: If ``True``, attempts to create all directories in + destination path. Does nothing if all directories already + exist. Defaults to ``True``. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_output_stream - block_path_provider: BlockWritePathProvider implementation to - write each dataset block to a custom output path. - ray_remote_args: Kwargs passed to ray.remote in the write tasks. + `pyarrow.fs.FileSystem.open_output_stream `_, which is used when + opening the file to write to. + block_path_provider: A + :class:`~ray.data.datasource.BlockWritePathProvider` + implementation specifying the filename structure for each output + parquet file. By default, the format of the output files is + ``{uuid}_{block_idx}.npy``, where ``uuid`` is a unique id for the + dataset. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the write tasks. """ - if column is None: - raise ValueError( - "In Ray 2.5, the column must be specified " - "(e.g., `write_numpy(column='data')`)." - ) self.write_datasource( NumpyDatasource(), @@ -2824,22 +2924,24 @@ def write_mongo( collection: str, ray_remote_args: Dict[str, Any] = None, ) -> None: - """Write the dataset to a MongoDB datasource. + """Writes the :class:`~ray.data.Dataset` to a MongoDB database. - This is only supported for datasets convertible to Arrow records. - To control the number of parallel write tasks, use :meth:`Dataset.repartition`` - before calling this method. + This method is only supported for datasets convertible to pyarrow tables. - .. note:: - Currently, this supports only a subset of the pyarrow's types, due to the + The number of parallel writes is determined by the number of blocks in the + dataset. To control the number of number of blocks, call + :meth:`~ray.data.Dataset.repartition`. + + .. warning:: + This method supports only a subset of the PyArrow's types, due to the limitation of pymongoarrow which is used underneath. Writing unsupported - types will fail on type checking. See all the supported types at: + types fails on type checking. See all the supported types at: https://mongo-arrow.readthedocs.io/en/latest/data_types.html. .. note:: - The records will be inserted into MongoDB as new documents. If a record has + The records are inserted into MongoDB as new documents. If a record has the _id field, this _id must be non-existent in MongoDB, otherwise the write - will be rejected and fail (hence preexisting documents are protected from + is rejected and fail (hence preexisting documents are protected from being mutated). It's fine to not have _id field in record and MongoDB will auto generate one at insertion. @@ -2858,14 +2960,19 @@ def write_mongo( ) Args: - uri: The URI to the destination MongoDB where the dataset will be - written to. For the URI format, see details in - https://www.mongodb.com/docs/manual/reference/connection-string/. + uri: The URI to the destination MongoDB where the dataset is + written to. For the URI format, see details in the + `MongoDB docs `_. database: The name of the database. This database must exist otherwise - ValueError will be raised. + a ValueError is raised. collection: The name of the collection in the database. This collection - must exist otherwise ValueError will be raised. - ray_remote_args: Kwargs passed to ray.remote in the write tasks. + must exist otherwise a ValueError is raised. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the write tasks. + + Raises: + ValueError: if ``database`` doesn't exist. + ValueError: if ``collection`` doesn't exist. """ from ray.data.datasource import MongoDatasource @@ -2885,7 +2992,7 @@ def write_datasource( ray_remote_args: Dict[str, Any] = None, **write_args, ) -> None: - """Write the dataset to a custom datasource. + """Writes the dataset to a custom datasource. For an example of how to use this method, see :ref:`Implementing a Custom Datasource `. @@ -3053,7 +3160,7 @@ def iter_batches( Args: prefetch_batches: The number of batches to fetch ahead of the current batch - to fetch. If set to greater than 0, a separate threadpool will be used + to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1. You can revert back to the old prefetching behavior that uses `prefetch_blocks` by setting @@ -3068,11 +3175,11 @@ def iter_batches( ``Dict[str, numpy.ndarray]``, or None to return the underlying block exactly as is with no additional formatting. drop_last: Whether to drop the last batch if it's incomplete. - local_shuffle_buffer_size: If non-None, the data will be randomly shuffled + local_shuffle_buffer_size: If non-None, the data is randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer will be drained. + the buffer, the remaining rows in the buffer is drained. local_shuffle_seed: The seed to use for the local random shuffle. Returns: @@ -3129,7 +3236,7 @@ def iter_torch_batches( Args: prefetch_batches: The number of batches to fetch ahead of the current batch - to fetch. If set to greater than 0, a separate threadpool will be used + to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1. You can revert back to the old prefetching behavior that uses `prefetch_blocks` by setting @@ -3138,10 +3245,10 @@ def iter_torch_batches( as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. - dtypes: The Torch dtype(s) for the created tensor(s); if None, the dtype - will be inferred from the tensor data. - device: The device on which the tensor should be placed; if None, the Torch - tensor will be constructed on the CPU. + dtypes: The Torch dtype(s) for the created tensor(s); if ``None``, the dtype + is inferred from the tensor data. + device: The device on which the tensor should be placed; if ``None``, the + torch tensor is constructed on the CPU. collate_fn: A function to convert a Numpy batch to a PyTorch tensor batch. Potential use cases include collating along a dimension other than the first, padding sequences of various lengths, or generally handling @@ -3150,11 +3257,11 @@ def iter_torch_batches( arrays to a batch of PyTorch tensors. This API is still experimental and is subject to change. drop_last: Whether to drop the last batch if it's incomplete. - local_shuffle_buffer_size: If non-None, the data will be randomly shuffled + local_shuffle_buffer_size: If non-None, the data is randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer will be drained. This + the buffer, the remaining rows in the buffer is drained. This buffer size must be greater than or equal to ``batch_size``, and therefore ``batch_size`` must also be specified when using local shuffling. @@ -3213,7 +3320,7 @@ def iter_tf_batches( Args: prefetch_batches: The number of batches to fetch ahead of the current batch - to fetch. If set to greater than 0, a separate threadpool will be used + to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1. You can revert back to the old prefetching behavior that uses `prefetch_blocks` by setting @@ -3222,14 +3329,14 @@ def iter_tf_batches( as batches (blocks may contain different number of rows). The final batch may include fewer than ``batch_size`` rows if ``drop_last`` is ``False``. Defaults to 256. - dtypes: The TensorFlow dtype(s) for the created tensor(s); if None, the - dtype will be inferred from the tensor data. + dtypes: The TensorFlow dtype(s) for the created tensor(s); if ``None``, the + dtype is inferred from the tensor data. drop_last: Whether to drop the last batch if it's incomplete. - local_shuffle_buffer_size: If non-None, the data will be randomly shuffled + local_shuffle_buffer_size: If non-None, the data is randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer will be drained. This + the buffer, the remaining rows in the buffer is drained. This buffer size must be greater than or equal to ``batch_size``, and therefore ``batch_size`` must also be specified when using local shuffling. @@ -3277,7 +3384,7 @@ def to_torch( It is recommended to use the returned ``IterableDataset`` directly instead of passing it into a torch ``DataLoader``. - Each element in IterableDataset will be a tuple consisting of 2 + Each element in IterableDataset is a tuple consisting of 2 elements. The first item contains the feature tensor(s), and the second item is the label tensor. Those can take on different forms, depending on the specified arguments. @@ -3285,25 +3392,25 @@ def to_torch( For the features tensor (N is the ``batch_size`` and n, m, k are the number of features per tensor): - * If ``feature_columns`` is a ``List[str]``, the features will be + * If ``feature_columns`` is a ``List[str]``, the features is a tensor of shape (N, n), with columns corresponding to ``feature_columns`` - * If ``feature_columns`` is a ``List[List[str]]``, the features will be + * If ``feature_columns`` is a ``List[List[str]]``, the features is a list of tensors of shape [(N, m),...,(N, k)], with columns of each tensor corresponding to the elements of ``feature_columns`` * If ``feature_columns`` is a ``Dict[str, List[str]]``, the features - will be a dict of key-tensor pairs of shape + is a dict of key-tensor pairs of shape {key1: (N, m),..., keyN: (N, k)}, with columns of each tensor corresponding to the value of ``feature_columns`` under the key. - If ``unsqueeze_label_tensor=True`` (default), the label tensor will be - of shape (N, 1). Otherwise, it will be of shape (N,). + If ``unsqueeze_label_tensor=True`` (default), the label tensor is + of shape (N, 1). Otherwise, it is of shape (N,). If ``label_column`` is specified as ``None``, then no column from the - ``Dataset`` will be treated as the label, and the output label tensor - will be ``None``. + ``Dataset`` is treated as the label, and the output label tensor + is ``None``. Note that you probably want to call :meth:`Dataset.split` on this dataset if there are to be multiple Torch workers consuming the data. @@ -3318,19 +3425,19 @@ def to_torch( feature_columns: The names of the columns to use as the features. Can be a list of lists or a dict of string-list pairs for multi-tensor output. - If None, then use all columns except the label column as + If ``None``, then use all columns except the label column as the features. label_column_dtype: The torch dtype to - use for the label column. If None, then automatically infer + use for the label column. If ``None``, then automatically infer the dtype. feature_column_dtypes: The dtypes to use for the feature tensors. This should match the format of ``feature_columns``, - or be a single dtype, in which case it will be applied to - all tensors. If None, then automatically infer the dtype. + or be a single dtype, in which case it is applied to + all tensors. If ``None``, then automatically infer the dtype. batch_size: How many samples per batch to yield at a time. Defaults to 1. prefetch_batches: The number of batches to fetch ahead of the current batch - to fetch. If set to greater than 0, a separate threadpool will be used + to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1. You can revert back to the old prefetching behavior that uses `prefetch_blocks` by setting @@ -3338,24 +3445,24 @@ def to_torch( drop_last: Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of the stream is not divisible by the batch - size, then the last batch will be smaller. Defaults to False. - local_shuffle_buffer_size: If non-None, the data will be randomly shuffled + size, then the last batch is smaller. Defaults to False. + local_shuffle_buffer_size: If non-None, the data is randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer will be drained. This + the buffer, the remaining rows in the buffer is drained. This buffer size must be greater than or equal to ``batch_size``, and therefore ``batch_size`` must also be specified when using local shuffling. local_shuffle_seed: The seed to use for the local random shuffle. unsqueeze_label_tensor: If set to True, the label tensor - will be unsqueezed (reshaped to (N, 1)). Otherwise, it will + is unsqueezed (reshaped to (N, 1)). Otherwise, it will be left as is, that is (N, ). In general, regression loss functions expect an unsqueezed tensor, while classification loss functions expect a squeezed one. Defaults to True. unsqueeze_feature_tensors: If set to True, the features tensors - will be unsqueezed (reshaped to (N, 1)) before being concatenated into - the final features tensor. Otherwise, they will be left as is, that is + are unsqueezed (reshaped to (N, 1)) before being concatenated into + the final features tensor. Otherwise, they are left as is, that is (N, ). Defaults to True. Returns: @@ -3454,7 +3561,7 @@ def to_tf( string, the target data is a tensor. If this is a list, the target data is a ``dict`` that maps column names to their tensor representation. prefetch_batches: The number of batches to fetch ahead of the current batch - to fetch. If set to greater than 0, a separate threadpool will be used + to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node, format the batches, and apply the collate_fn. Defaults to 1. You can revert back to the old prefetching behavior that uses `prefetch_blocks` by setting @@ -3463,12 +3570,12 @@ def to_tf( drop_last: Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of the stream is not divisible by the batch - size, then the last batch will be smaller. Defaults to False. - local_shuffle_buffer_size: If non-None, the data will be randomly shuffled + size, then the last batch is smaller. Defaults to False. + local_shuffle_buffer_size: If non-None, the data is randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to - the buffer, the remaining rows in the buffer will be drained. This + the buffer, the remaining rows in the buffer is drained. This buffer size must be greater than or equal to ``batch_size``, and therefore ``batch_size`` must also be specified when using local shuffling. @@ -3524,7 +3631,7 @@ def to_dask( iterable of ``(name, dtype)`` can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of ``(name, dtype)`` can be used. - By default, this will be inferred from the underlying Dataset schema, + By default, this is inferred from the underlying Dataset schema, with this argument supplying an optional override. Returns: @@ -3635,7 +3742,7 @@ def to_modin(self) -> "modin.DataFrame": """Convert this dataset into a Modin dataframe. This works by first converting this dataset into a distributed set of - Pandas dataframes (using :meth:`Dataset.to_pandas_refs`). Please see caveats + pandas dataframes (using :meth:`Dataset.to_pandas_refs`). Please see caveats there. Then the individual dataframes are used to create the modin DataFrame using ``modin.distributed.dataframe.pandas.partitions.from_partitions()``. @@ -3676,11 +3783,10 @@ def to_spark(self, spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame" @ConsumptionAPI(pattern="Time complexity:") def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame": - """Convert this dataset into a single Pandas DataFrame. + """Convert this :class:`~ray.data.Dataset` into a single pandas DataFrame. - This is only supported for datasets convertible to Arrow or Pandas - records. An error is raised if the number of records exceeds the - provided limit. Note that you can use :meth:`.limit` on the dataset + This method errors if the number of rows exceeds the + provided ``limit``. You can use :meth:`.limit` on the dataset beforehand to truncate the dataset manually. Examples: @@ -3695,12 +3801,16 @@ def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame": Time complexity: O(dataset size) Args: - limit: The maximum number of records to return. An error will be - raised if the limit is exceeded. + limit: The maximum number of records to return. An error is + raised if the dataset has more rows than this limit. Returns: - A Pandas DataFrame created from this dataset, containing a limited + A pandas DataFrame created from this dataset, containing a limited number of records. + + Raises: + ValueError: if the number of rows in the :class:`~ray.data.Dataset` exceeds + ``limit``. """ count = self.count() if count > limit: @@ -3720,17 +3830,26 @@ def to_pandas(self, limit: int = 100000) -> "pandas.DataFrame": @ConsumptionAPI(pattern="Time complexity:") @DeveloperAPI def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: - """Convert this dataset into a distributed set of Pandas dataframes. + """Converts this :class:`~ray.data.Dataset` into a distributed set of Pandas + dataframes. + + One DataFrame is created for each block in this Dataset. - This is only supported for datasets convertible to Arrow records. This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`Dataset.to_arrow` or :meth:`Dataset.get_internal_block_refs`. + Examples: + >>> import ray + >>> ds = ray.data.range(10, parallelism=2) + >>> refs = ds.to_pandas_refs() + >>> len(refs) + 2 + Time complexity: O(dataset size / parallelism) Returns: - A list of remote Pandas dataframes created from this dataset. + A list of remote pandas DataFrames created from this dataset. """ block_to_df = cached_remote_fn(_block_to_df) @@ -3740,19 +3859,27 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: def to_numpy_refs( self, *, column: Optional[str] = None ) -> List[ObjectRef[np.ndarray]]: - """Convert this dataset into a distributed set of NumPy ndarrays. + """Converts this :class:`~ray.data.Dataset` into a distributed set of NumPy + ndarrays or dictionary of NumPy ndarrays. This is only supported for datasets convertible to NumPy ndarrays. This function induces a copy of the data. For zero-copy access to the underlying data, consider using :meth:`Dataset.to_arrow` or :meth:`Dataset.get_internal_block_refs`. + Examples: + >>> import ray + >>> ds = ray.data.range(10, parallelism=2) + >>> refs = ds.to_numpy_refs() + >>> len(refs) + 2 + Time complexity: O(dataset size / parallelism) Args: - column: The name of the column to convert to numpy, or None to specify the - entire row. If not specified for Arrow or Pandas blocks, each returned - future will represent a dict of column ndarrays. + column: The name of the column to convert to numpy. If ``None``, all columns + are used. If multiple columns are specified, each returned + future represents a dict of ndarrays. Defaults to None. Returns: A list of remote NumPy ndarrays created from this dataset. @@ -3766,16 +3893,26 @@ def to_numpy_refs( @ConsumptionAPI(pattern="Time complexity:") @DeveloperAPI def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]: - """Convert this dataset into a distributed set of Arrow tables. + """Convert this :class:`~ray.data.Dataset` into a distributed set of PyArrow + tables. - This is only supported for datasets convertible to Arrow records. - This function is zero-copy if the existing data is already in Arrow - format. Otherwise, the data will be converted to Arrow format. + One PyArrow table is created for each block in this Dataset. + + This method is only supported for datasets convertible to PyArrow tables. + This function is zero-copy if the existing data is already in PyArrow + format. Otherwise, the data is converted to PyArrow format. + + Examples: + >>> import ray + >>> ds = ray.data.range(10, parallelism=2) + >>> refs = ds.to_arrow_refs() + >>> len(refs) + 2 Time complexity: O(1) unless conversion is required. Returns: - A list of remote Arrow tables created from this dataset. + A list of remote PyArrow tables created from this dataset. """ import pyarrow as pa @@ -3964,7 +4101,7 @@ def window( length of the pipeline. Setting this to infinity effectively disables pipelining. bytes_per_window: Specify the window size in bytes instead of blocks. - This will be treated as an upper bound for the window size, but each + This is treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with ``blocks_per_window``. """ @@ -4182,7 +4319,7 @@ def stats(self) -> str: """Returns a string containing execution timing information. Note that this does not trigger execution, so if the dataset has not yet - executed, an empty string will be returned. + executed, an empty string is returned. Examples: @@ -4284,7 +4421,7 @@ def serialize_lineage(self) -> bytes: futures, to bytes that can be stored and later deserialized, possibly on a different cluster. - Note that this will drop all computed data, and that everything will be + Note that this will drop all computed data, and that everything is recomputed from scratch after deserialization. Use :py:meth:`Dataset.deserialize_lineage` to deserialize the serialized @@ -4498,7 +4635,7 @@ def _repr_mimebundle_(self, **kwargs): """Return a mimebundle with an ipywidget repr and a simple text repr. Depending on the frontend where the data is being displayed, - different mimetypes will be used from this bundle. + different mimetypes are used from this bundle. See https://ipython.readthedocs.io/en/stable/config/integrating.html for information about this method, and https://ipywidgets.readthedocs.io/en/latest/embedding.html @@ -4678,7 +4815,7 @@ class Schema: Attributes: names: List of column names of this Dataset. types: List of Arrow types of the Dataset. Note that the "object" type is - not Arrow compatible and hence will be returned as `object`. + not Arrow compatible and hence is returned as `object`. base_schema: The underlying Arrow or Pandas schema. """ @@ -4776,7 +4913,7 @@ def _sliding_window(iterable: Iterable, n: int): returned. Args: - iterable: The iterable on which the sliding window will be + iterable: The iterable on which the sliding window is created. n: The width of the sliding window. diff --git a/python/ray/data/examples/data/different-extensions/data.csv b/python/ray/data/examples/data/different-extensions/data.csv new file mode 100644 index 000000000000..301800ec1f13 --- /dev/null +++ b/python/ray/data/examples/data/different-extensions/data.csv @@ -0,0 +1,2 @@ +a,b +0,1 \ No newline at end of file diff --git a/python/ray/data/examples/data/different-extensions/data.json b/python/ray/data/examples/data/different-extensions/data.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/data/examples/data/iris.tfrecords.gz b/python/ray/data/examples/data/iris.tfrecords.gz new file mode 100644 index 000000000000..111a447d1496 Binary files /dev/null and b/python/ray/data/examples/data/iris.tfrecords.gz differ diff --git a/python/ray/data/examples/data/year=2022/month=09/sales.csv b/python/ray/data/examples/data/year=2022/month=09/sales.csv new file mode 100644 index 000000000000..d21ebba95663 --- /dev/null +++ b/python/ray/data/examples/data/year=2022/month=09/sales.csv @@ -0,0 +1,2 @@ +order_number,quantity +10107,30 \ No newline at end of file diff --git a/python/ray/data/examples/data/year=2022/month=09/sales.json b/python/ray/data/examples/data/year=2022/month=09/sales.json new file mode 100644 index 000000000000..0529c0bea537 --- /dev/null +++ b/python/ray/data/examples/data/year=2022/month=09/sales.json @@ -0,0 +1,4 @@ +{ + "order_number": 10107, + "quantity": 30 +} \ No newline at end of file diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index d1649c9c601c..7e9e8b70ec9b 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -102,32 +102,31 @@ def from_items( items: List[Any], *, parallelism: int = -1, - output_arrow_format: bool = True, ) -> MaterializedDataset: """Create a :class:`~ray.data.Dataset` from a list of local Python objects. - Use this method to create small datasets for testing and exploration. + Use this method to create small datasets from data that fits in memory. Examples: - .. testcode:: - - import ray - - ds = ray.data.from_items([1, 2, 3, 4, 5]) - - print(ds.schema()) - - .. testoutput:: - - Column Type - ------ ---- - item int64 + >>> import ray + >>> ds = ray.data.from_items([1, 2, 3, 4, 5]) + >>> ds + MaterializedDataset(num_blocks=..., num_rows=5, schema={item: int64}) + >>> ds.schema() + Column Type + ------ ---- + item int64 Args: items: List of local Python objects. - parallelism: The amount of parallelism to use for the dataset. - Parallelism might be limited by the number of items. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see + :ref:`Tuning read parallelism `. + Parallelism is upper bounded by ``len(items)``. Returns: A :class:`~ray.data.Dataset` holding the items. @@ -188,23 +187,38 @@ def from_items( @PublicAPI def range(n: int, *, parallelism: int = -1) -> Dataset: - """Create a dataset from a range of integers [0..n). + """Creates a :class:`~ray.data.Dataset` from a range of integers [0..n). + + This function allows for easy creation of synthetic datasets for testing or + benchmarking :ref:`Ray Data `. Examples: + >>> import ray - >>> ds = ray.data.range(10000) # doctest: +SKIP - >>> ds # doctest: +SKIP + >>> ds = ray.data.range(10000) + >>> ds Dataset(num_blocks=..., num_rows=10000, schema={id: int64}) - >>> ds.map(lambda x: {"id": x["id"] * 2}).take(4) # doctest: +SKIP - [{"id": 0}, {"id": 2}, {"id": 4}, {"id": 6}] + >>> ds.map(lambda row: {"id": row["id"] * 2}).take(4) + [{'id': 0}, {'id': 2}, {'id': 4}, {'id': 6}] Args: n: The upper bound of the range of integers. - parallelism: The amount of parallelism to use for the dataset. - Parallelism may be limited by the number of items. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see + :ref:`Tuning read parallelism `. + Parallelism is upper bounded by n. Returns: - Dataset producing the integers. + A :class:`~ray.data.Dataset` producing the integers from the range 0 to n. + + .. seealso:: + + :meth:`~ray.data.range_tensor` + Call this method for creating synthetic datasets of tensor data. + """ return read_datasource( RangeDatasource(), @@ -222,34 +236,46 @@ def range_table(n: int, *, parallelism: int = -1) -> Dataset: @PublicAPI def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Dataset: - """Create a Tensor stream from a range of integers [0..n). + """Creates a :class:`~ray.data.Dataset` tensors of the provided shape from range + [0...n]. + + This function allows for easy creation of synthetic tensor datasets for testing or + benchmarking :ref:`Ray Data `. Examples: + >>> import ray >>> ds = ray.data.range_tensor(1000, shape=(2, 2)) - >>> ds # doctest: +ELLIPSIS + >>> ds Dataset( num_blocks=..., num_rows=1000, schema={data: numpy.ndarray(shape=(2, 2), dtype=int64)} ) - >>> ds.map_batches(lambda arr: arr * 2).take(2) # doctest: +SKIP - [array([[0, 0], - [0, 0]]), - array([[2, 2], - [2, 2]])] - - This is similar to range_table(), but uses the ArrowTensorArray extension - type. The dataset elements take the form {"data": array(N, shape=shape)}. + >>> ds.map_batches(lambda row: {"data": row["data"] * 2}).take(2) + [{'data': array([[0, 0], + [0, 0]])}, {'data': array([[2, 2], + [2, 2]])}] Args: - n: The upper bound of the range of integer records. - shape: The shape of each record. - parallelism: The amount of parallelism to use for the dataset. - Parallelism may be limited by the number of items. + n: The upper bound of the range of tensor records. + shape: The shape of each tensor in the dataset. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see + :ref:`Tuning read parallelism `. + Parallelism is upper bounded by n. Returns: - Dataset producing the integers as Arrow tensor records. + A :class:`~ray.data.Dataset` producing the tensor data from range 0 to n. + + .. seealso:: + + :meth:`~ray.data.range` + Call this method to create synthetic datasets of integer data. + """ return read_datasource( RangeDatasource(), @@ -276,7 +302,7 @@ def read_datasource( datasource: The datasource to read data from. parallelism: The requested parallelism of the read. Parallelism may be limited by the available partitioning of the datasource. If set to -1, - parallelism will be automatically chosen based on the available cluster + parallelism is automatically chosen based on the available cluster resources and estimated in-memory data size. read_args: Additional kwargs to pass to the datasource impl. ray_remote_args: kwargs passed to ray.remote in the read tasks. @@ -373,7 +399,7 @@ def read_datasource( k = math.ceil(requested_parallelism / estimated_num_blocks) logger.info( f"To satisfy the requested parallelism of {requested_parallelism}, " - f"each read task output will be split into {k} smaller blocks." + f"each read task output is split into {k} smaller blocks." ) for r in read_tasks: r._set_additional_split_factor(k) @@ -433,17 +459,19 @@ def read_mongo( ray_remote_args: Dict[str, Any] = None, **mongo_args, ) -> Dataset: - """Create an Arrow dataset from MongoDB. + """Create a :class:`~ray.data.Dataset` from a MongoDB database. The data to read from is specified via the ``uri``, ``database`` and ``collection`` of the MongoDB. The dataset is created from the results of executing ``pipeline`` against the ``collection``. If ``pipeline`` is None, the entire - ``collection`` will be read. + ``collection`` is read. + + .. tip:: - You can check out more details here about these MongoDB concepts: - - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/ + For more details about these MongoDB concepts, see the following: + - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ + - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/ + - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/ To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are @@ -465,28 +493,37 @@ def read_mongo( ... ) Args: - uri: The URI of the source MongoDB where the dataset will be - read from. For the URI format, see details in - https://www.mongodb.com/docs/manual/reference/connection-string/. + uri: The URI of the source MongoDB where the dataset is + read from. For the URI format, see details in the `MongoDB docs `_. database: The name of the database hosted in the MongoDB. This database - must exist otherwise ValueError will be raised. + must exist otherwise ValueError is raised. collection: The name of the collection in the database. This collection - must exist otherwise ValueError will be raised. - pipeline: A MongoDB pipeline, which will be executed on the given collection + must exist otherwise ValueError is raised. + pipeline: A `MongoDB pipeline `_, which is executed on the given collection with results used to create Dataset. If None, the entire collection will be read. schema: The schema used to read the collection. If None, it'll be inferred from the results of pipeline. - parallelism: The requested parallelism of the read. If -1, it will be - automatically chosen based on the available cluster resources and estimated - in-memory data size. - ray_remote_args: kwargs passed to ray.remote in the read tasks. - mongo_args: kwargs passed to aggregate_arrow_all() in pymongoarrow in producing + parallelism: The requested parallelism of the read. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + mongo_args: kwargs passed to `aggregate_arrow_all() `_ in pymongoarrow in producing Arrow-formatted results. Returns: - Dataset producing Arrow records from the results of executing the pipeline - on the specified MongoDB collection. + :class:`~ray.data.Dataset` producing rows from the results of executing the pipeline on the specified MongoDB collection. + + Raises: + ValueError: if ``database`` doesn't exist. + ValueError: if ``collection`` doesn't exist. """ return read_datasource( MongoDatasource(), @@ -513,36 +550,49 @@ def read_parquet( meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(), **arrow_parquet_args, ) -> Dataset: - """Create an Arrow dataset from parquet files. + """Creates a :class:`~ray.data.Dataset` from parquet files. + Examples: + Read a file in remote storage. + >>> import ray - >>> # Read a directory of files in remote storage. - >>> ray.data.read_parquet("s3://bucket/path") # doctest: +SKIP + >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet") + >>> ds.schema() + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string + + + Read a directory in remote storage. + >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris-parquet/") - >>> # Read multiple local files. - >>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP + Read multiple local files. + >>> ray.data.read_parquet( + ... ["local:///path/to/file1", "local:///path/to/file2"]) # doctest: +SKIP - >>> # Specify a schema for the parquet file. + Specify a schema for the parquet file. >>> import pyarrow as pa - >>> fields = [("sepal.length", pa.float64()), - ... ("sepal.width", pa.float64()), - ... ("petal.length", pa.float64()), - ... ("petal.width", pa.float64()), + >>> fields = [("sepal.length", pa.float32()), + ... ("sepal.width", pa.float32()), + ... ("petal.length", pa.float32()), + ... ("petal.width", pa.float32()), ... ("variety", pa.string())] - >>> ray.data.read_parquet("example://iris.parquet", + >>> ds = ray.data.read_parquet("example://iris.parquet", ... schema=pa.schema(fields)) - Dataset( - num_blocks=..., - num_rows=150, - schema={ - sepal.length: double, - sepal.width: double, - petal.length: double, - petal.width: double, - variety: string - } - ) + >>> ds.schema() + Column Type + ------ ---- + sepal.length float + sepal.width float + petal.length float + petal.width float + variety string + The Parquet reader also supports projection and filter pushdown, allowing column selection and row filtering to be pushed down to the file scan. @@ -566,31 +616,48 @@ def read_parquet( {'sepal.length': 5.1, 'variety': 'Setosa'} {'sepal.length': 5.4, 'variety': 'Setosa'} - For further arguments you can pass to pyarrow as a keyword argument, see - https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment + For further arguments you can pass to PyArrow as a keyword argument, see the + `PyArrow API reference `_. Args: paths: A single file path or directory, or a list of file paths. Multiple directories are not supported. - filesystem: The filesystem implementation to read from. These are specified in - https://arrow.apache.org/docs/python/api/filesystems.html#filesystem-implementations. - columns: A list of column names to read. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the dataset. - ray_remote_args: kwargs passed to ray.remote in the read tasks. - tensor_column_schema: A dict of column name --> tensor dtype and shape + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are specified in the + `pyarrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the ``S3FileSystem`` is + used. + columns: A list of column names to read. Only the specified columns are + read during the file scan. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + records in all the parquet files. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + tensor_column_schema: A dict of column name to PyArrow dtype and shape mappings for converting a Parquet column containing serialized - tensors (ndarrays) as their elements to our tensor column extension - type. This assumes that the tensors were serialized in the raw - NumPy array format in C-contiguous order (e.g. via + tensors (ndarrays) as their elements to PyArrow tensors. This function + assumes that the tensors are serialized in the raw + NumPy array format in C-contiguous order (e.g., via `arr.tobytes()`). - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - arrow_parquet_args: Other parquet read options to pass to pyarrow, see - https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases you do not need to set this parameter. + arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full + set of arguments, see the`PyArrow API `_ Returns: - Dataset producing Arrow records read from the specified paths. + :class:`~ray.data.Dataset` producing records read from the specified parquet + files. """ arrow_parquet_args = _resolve_parquet_args( tensor_column_schema, @@ -626,22 +693,27 @@ def read_images( include_paths: bool = False, ignore_missing_paths: bool = False, ) -> Dataset: - """Read images from the specified paths. + """Creates a :class:`~ray.data.Dataset` from image files. Examples: >>> import ray - >>> path = "s3://anonymous@air-example-data-2/movie-image-small-filesize-1GB" - >>> ds = ray.data.read_images(path) # doctest: +SKIP - >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8)}) + >>> path = "s3://anonymous@ray-example-data/batoidea/JPEGImages/" + >>> ds = ray.data.read_images(path) + >>> ds.schema() + Column Type + ------ ---- + image numpy.ndarray(shape=(32, 32, 3), dtype=uint8) If you need image file paths, set ``include_paths=True``. - >>> ds = ray.data.read_images(path, include_paths=True) # doctest: +SKIP - >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=41979, schema={image: numpy.ndarray(ndim=3, dtype=uint8), path: string}) - >>> ds.take(1)[0]["path"] # doctest: +SKIP - 'air-example-data-2/movie-image-small-filesize-1GB/0.jpg' + >>> ds = ray.data.read_images(path, include_paths=True) + >>> ds.schema() + Column Type + ------ ---- + image numpy.ndarray(shape=(32, 32, 3), dtype=uint8) + path string + >>> ds.take(1)[0]["path"] + 'ray-example-data/batoidea/JPEGImages/1.jpeg' If your images are arranged like: @@ -658,24 +730,43 @@ def read_images( >>> import ray >>> from ray.data.datasource.partitioning import Partitioning - >>> root = "example://tiny-imagenet-200/train" + >>> root = "example://image-datasets/dir-partitioned" >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root) - >>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning) # doctest: +SKIP - >>> ds # doctest: +SKIP - Dataset(num_blocks=..., num_rows=94946, schema={image: TensorDtype(shape=(224, 224, 3), dtype=uint8), class: object}) + >>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning) + >>> ds.schema() + Column Type + ------ ---- + image numpy.ndarray(shape=(224, 224, 3), dtype=uint8) + class string Args: - paths: A single file/directory path or a list of file/directory paths. + paths: A single file or directory, or a list of file or directory paths. A list of paths can contain both files and directories. - filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the dataset. - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - ray_remote_args: kwargs passed to ray.remote in the read tasks. + filesystem: The pyarrow filesystem + implementation to read from. These filesystems are specified in the + `pyarrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the `S3FileSystem` is used. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + records in all the CSV files. + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_file_args: kwargs passed to - ``pyarrow.fs.FileSystem.open_input_file``. - partition_filter: Path-based partition filter, if any. Can be used + `pyarrow.fs.FileSystem.open_input_file `_. + when opening input files to read. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match ``*.png``, ``*.jpg``, ``*.jpeg``, ``*.tiff``, ``*.bmp``, or ``*.gif``. @@ -683,7 +774,8 @@ def read_images( that describes how paths are organized. Defaults to ``None``. size: The desired height and width of loaded images. If unspecified, images retain their original shape. - mode: A `Pillow mode `_ + mode: A `Pillow mode `_ describing the desired type and depth of pixels. If unspecified, image modes are inferred by `Pillow `_. @@ -700,7 +792,7 @@ def read_images( Raises: ValueError: if ``size`` contains non-positive numbers. ValueError: if ``mode`` is unsupported. - """ # noqa: E501 + """ return read_datasource( ImageDatasource(), paths=paths, @@ -734,68 +826,78 @@ def read_parquet_bulk( ), **arrow_parquet_args, ) -> Dataset: - """Create an Arrow dataset from a large number (such as >1K) of parquet files - quickly. + """Create :class:`~ray.data.Dataset` from parquet files without reading metadata. - By default, ONLY file paths should be provided as input (i.e. no directory paths), - and an OSError will be raised if one or more paths point to directories. If your - use-case requires directory paths, then the metadata provider should be changed to - one that supports directory expansion (e.g. ``DefaultFileMetadataProvider``). + Use :meth:`~ray.data.read_parquet` for most cases. - Offers improved performance vs. :func:`read_parquet` due to not using PyArrow's - ``ParquetDataset`` abstraction, whose latency scales linearly with the number of - input files due to collecting all file metadata on a single node. + Use :meth:`~ray.data.read_parquet_bulk` if all the provided paths point to files + and metadata fetching using :meth:`~ray.data.read_parquet` takes too long or the + parquet files do not all have a unified schema. - Also supports a wider variety of input Parquet file types than :func:`read_parquet` - due to not trying to merge and resolve a unified schema for all files. + Performance slowdowns are possible when using this method with parquet files that + are very large. - However, unlike :func:`read_parquet`, this does not offer file metadata resolution - by default, so a custom metadata provider should be provided if your use-case - requires a unified schema, block sizes, row counts, etc. + .. warning:: + + Only provide file paths as input (i.e., no directory paths). An + OSError is raised if one or more paths point to directories. If your + use-case requires directory paths, use :meth:`~ray.data.read_parquet` + instead. Examples: - >>> # Read multiple local files. You should always provide only input file - >>> # paths (i.e. no directory paths) when known to minimize read latency. - >>> ray.data.read_parquet_bulk( # doctest: +SKIP - ... ["/path/to/file1", "/path/to/file2"]) + Read multiple local files. You should always provide only input file paths + (i.e. no directory paths) when known to minimize read latency. - >>> # Read a directory of files in remote storage. Caution should be taken - >>> # when providing directory paths, since the time to both check each path - >>> # type and expand its contents may result in greatly increased latency - >>> # and/or request rate throttling from cloud storage service providers. >>> ray.data.read_parquet_bulk( # doctest: +SKIP - ... "s3://bucket/path", - ... meta_provider=DefaultFileMetadataProvider()) + ... ["/path/to/file1", "/path/to/file2"]) Args: - paths: A single file path or a list of file paths. If one or more directories - are provided, then ``meta_provider`` should also be set to an implementation - that supports directory expansion (e.g. ``DefaultFileMetadataProvider``). - filesystem: The filesystem implementation to read from. - columns: A list of column names to read. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the dataset. - ray_remote_args: kwargs passed to ray.remote in the read tasks. + paths: A single file path or a list of file paths. + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are + specified in the + `PyArrow docs `_. + Specify this parameter if you need to provide specific configurations to + the filesystem. By default, the filesystem is automatically selected based + on the scheme of the paths. For example, if the path begins with ``s3://``, + the `S3FileSystem` is used. + columns: A list of column names to read. Only the + specified columns are read during the file scan. + parallelism: The amount of parallelism to use for + the dataset. Defaults to -1, which automatically determines the optimal + parallelism for your configuration. You should not need to manually set + this value in most cases. For details on how the parallelism is + automatically determined and guidance on how to tune it, see + :ref:`Tuning read parallelism `. Parallelism is + upper bounded by the total number of records in all the parquet files. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_file_args: kwargs passed to - ``pyarrow.fs.FileSystem.open_input_file``. - tensor_column_schema: A dict of column name --> tensor dtype and shape + `pyarrow.fs.FileSystem.open_input_file `_. + when opening input files to read. + tensor_column_schema: A dict of column name to PyArrow dtype and shape mappings for converting a Parquet column containing serialized - tensors (ndarrays) as their elements to our tensor column extension - type. This assumes that the tensors were serialized in the raw + tensors (ndarrays) as their elements to PyArrow tensors. This function + assumes that the tensors are serialized in the raw NumPy array format in C-contiguous order (e.g. via - ``arr.tobytes()``). - meta_provider: File metadata provider. Defaults to a fast file metadata - provider that skips file size collection and requires all input paths to be - files. Change to ``DefaultFileMetadataProvider`` or a custom metadata - provider if directory expansion and/or file metadata resolution is required. - partition_filter: Path-based partition filter, if any. Can be used + `arr.tobytes()`). + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use with a custom callback to read only selected partitions of a dataset. By default, this filters out any file paths whose file extension does not match "*.parquet*". - arrow_parquet_args: Other parquet read options to pass to pyarrow. + arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full + set of arguments, see + the `PyArrow API `_ Returns: - Dataset producing Arrow records read from the specified paths. + :class:`~ray.data.Dataset` producing records read from the specified paths. """ arrow_parquet_args = _resolve_parquet_args( tensor_column_schema, @@ -831,54 +933,84 @@ def read_json( ignore_missing_paths: bool = False, **arrow_json_args, ) -> Dataset: - """Create an Arrow dataset from json files. + """Creates a :class:`~ray.data.Dataset` from JSON files. Examples: + Read a file in remote storage. + >>> import ray - >>> # Read a directory of files in remote storage. - >>> ray.data.read_json("s3://bucket/path") # doctest: +SKIP + >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/logs.json") + >>> ds.schema() + Column Type + ------ ---- + timestamp timestamp[s] + size int64 - >>> # Read multiple local files. - >>> ray.data.read_json(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP + Read multiple local files. + + >>> ray.data.read_json( # doctest: +SKIP + ... ["local:///path/to/file1", "local:///path/to/file2"]) + + Read multiple directories. - >>> # Read multiple directories. >>> ray.data.read_json( # doctest: +SKIP ... ["s3://bucket/path1", "s3://bucket/path2"]) - By default, ``read_json`` parses - `Hive-style partitions `_ + By default, :meth:`~ray.data.read_json` parses + `Hive-style partitions `_ from file paths. If your data adheres to a different partitioning scheme, set the ``partitioning`` parameter. - >>> ds = ray.data.read_json("example://year=2022/month=09/sales.json") # doctest: +SKIP - >>> ds.take(1) # doctest: +SKIP - [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'} + >>> ds = ray.data.read_json("example://year=2022/month=09/sales.json") + >>> ds.take(1) + [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}] Args: - paths: A single file/directory path or a list of file/directory paths. + paths: A single file or directory, or a list of file or directory paths. A list of paths can contain both files and directories. - filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the dataset. - ray_remote_args: kwargs passed to ray.remote in the read tasks. + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are specified in the + `PyArrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the `S3FileSystem` is used. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + records in all the JSON files. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_input_stream - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - partition_filter: Path-based partition filter, if any. Can be used - with a custom callback to read only selected partitions of a dataset. + `pyarrow.fs.FileSystem.open_input_file `_. + when opening input files to read. + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. + Use with a custom callback to read only selected partitions of a + dataset. By default, this filters out any file paths whose file extension does not match "*.json*". - arrow_json_args: Other json read options to pass to pyarrow. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object that describes how paths are organized. By default, this function parses - `Hive-style partitions `_. + `Hive-style partitions `_. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not found. Defaults to False. + arrow_json_args: JSON read options to pass to `pyarrow.json.read_json `_. Returns: - Dataset producing Arrow records read from the specified paths. - """ # noqa: E501 + :class:`~ray.data.Dataset` producing records read from the specified paths. + """ return read_datasource( JSONDatasource(), parallelism=parallelism, @@ -908,82 +1040,125 @@ def read_csv( ignore_missing_paths: bool = False, **arrow_csv_args, ) -> Dataset: - r"""Create an Arrow dataset from csv files. + """Creates a :class:`~ray.data.Dataset` from CSV files. Examples: - >>> import ray - >>> # Read a directory of files in remote storage. - >>> ray.data.read_csv("s3://bucket/path") # doctest: +SKIP + Read a file in remote storage. - >>> # Read multiple local files. - >>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP + >>> import ray + >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv") + >>> ds.schema() + Column Type + ------ ---- + sepal length (cm) double + sepal width (cm) double + petal length (cm) double + petal width (cm) double + target int64 + + Read multiple local files. - >>> # Read multiple directories. >>> ray.data.read_csv( # doctest: +SKIP - ... ["s3://bucket/path1", "s3://bucket/path2"]) + ... ["local:///path/to/file1", "local:///path/to/file2"]) + + Read a directory from remote storage. + + >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris-csv/") + + Read files that use a different delimiter. For more uses of ParseOptions see + https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html # noqa: #501 - >>> # Read files that use a different delimiter. For more uses of ParseOptions see - >>> # https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html # noqa: #501 >>> from pyarrow import csv - >>> parse_options = csv.ParseOptions(delimiter="\t") - >>> ray.data.read_csv( # doctest: +SKIP + >>> parse_options = csv.ParseOptions(delimiter="\\t") + >>> ds = ray.data.read_csv( ... "example://iris.tsv", ... parse_options=parse_options) - - >>> # Convert a date column with a custom format from a CSV file. - >>> # For more uses of ConvertOptions see - >>> # https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html # noqa: #501 + >>> ds.schema() + Column Type + ------ ---- + sepal.length double + sepal.width double + petal.length double + petal.width double + variety string + + Convert a date column with a custom format from a CSV file. For more uses of ConvertOptions see https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html # noqa: #501 + >>> from pyarrow import csv >>> convert_options = csv.ConvertOptions( ... timestamp_parsers=["%m/%d/%Y"]) - >>> ray.data.read_csv( # doctest: +SKIP - ... "example://dow_jones_index.csv", + >>> ds = ray.data.read_csv( + ... "example://dow_jones.csv", ... convert_options=convert_options) - By default, ``read_csv`` parses - `Hive-style partitions `_ + By default, :meth:`~ray.data.read_csv` parses + `Hive-style partitions `_ from file paths. If your data adheres to a different partitioning scheme, set the ``partitioning`` parameter. - >>> ds = ray.data.read_csv("example://year=2022/month=09/sales.csv") # doctest: +SKIP - >>> ds.take(1) # doctest: +SKIP + >>> ds = ray.data.read_csv("example://year=2022/month=09/sales.csv") + >>> ds.take(1) [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}] - By default, ``read_csv`` reads all files from file paths. If you want to filter + By default, :meth:`~ray.data.read_csv` reads all files from file paths. If you want to filter files by file extensions, set the ``partition_filter`` parameter. - >>> # Read only *.csv files from multiple directories. + Read only ``*.csv`` files from a directory. + >>> from ray.data.datasource import FileExtensionFilter - >>> ray.data.read_csv( # doctest: +SKIP - ... ["s3://bucket/path1", "s3://bucket/path2"], + >>> ray.data.read_csv("example://different-extensions/", ... partition_filter=FileExtensionFilter("csv")) + Dataset(num_blocks=..., num_rows=1, schema={a: int64, b: int64}) Args: - paths: A single file/directory path or a list of file/directory paths. + paths: A single file or directory, or a list of file or directory paths. A list of paths can contain both files and directories. - filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the dataset. - ray_remote_args: kwargs passed to ray.remote in the read tasks. + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are specified in the + `pyarrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the `S3FileSystem` is used. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + records in all the CSV files. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_input_stream - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - partition_filter: Path-based partition filter, if any. Can be used - with a custom callback to read only selected partitions of a dataset. - By default, this does not filter out any files. - If wishing to filter out all file paths except those whose file extension - matches e.g. "*.csv*", a ``FileExtensionFilter("csv")`` can be provided. + `pyarrow.fs.FileSystem.open_input_file `_. + when opening input files to read. + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. + Use with a custom callback to read only selected partitions of a + dataset. By default, no files are filtered. + To filter out all file paths except those whose file extension + matches e.g., "*.csv*", you can provide a + :class:`~ray.data.datasource.FileExtensionFilter`. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object that describes how paths are organized. By default, this function parses - `Hive-style partitions `_. - arrow_csv_args: Other csv read options to pass to pyarrow. + `Hive-style partitions `_. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not found. Defaults to False. + arrow_csv_args: CSV read options to pass to + `pyarrow.csv.open_csv `_ + when opening CSV files. + Returns: - Dataset producing Arrow records read from the specified paths. - """ # noqa: E501 + :class:`~ray.data.Dataset` producing records read from the specified paths. + """ return read_datasource( CSVDatasource(), parallelism=parallelism, @@ -1004,7 +1179,6 @@ def read_text( paths: Union[str, List[str]], *, encoding: str = "utf-8", - errors: str = "ignore", drop_empty_lines: bool = True, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = -1, @@ -1015,42 +1189,66 @@ def read_text( partitioning: Partitioning = None, ignore_missing_paths: bool = False, ) -> Dataset: - """Create a dataset from lines stored in text files. + """Create a :class:`~ray.data.Dataset` from lines stored in text files. Examples: + Read a file in remote storage. + >>> import ray - >>> # Read a directory of files in remote storage. - >>> ray.data.read_text("s3://bucket/path") # doctest: +SKIP + >>> ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt") + >>> ds.schema() + Column Type + ------ ---- + text string + + Read multiple local files. - >>> # Read multiple local files. - >>> ray.data.read_text(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP + >>> ray.data.read_text( # doctest: +SKIP + ... ["local:///path/to/file1", "local:///path/to/file2"]) Args: - paths: A single file path or a list of file paths (or directories). + paths: A single file or directory, or a list of file or directory paths. + A list of paths can contain both files and directories. encoding: The encoding of the files (e.g., "utf-8" or "ascii"). - errors: What to do with errors on decoding. Specify either "strict", - "ignore", or "replace". Defaults to "ignore". - filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the stream. - ray_remote_args: Kwargs passed to ray.remote in the read tasks and + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are specified in the + `PyArrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the `S3FileSystem` is used. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + lines in all the text files. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks and in the subsequent text decoding map task. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_input_stream - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - partition_filter: Path-based partition filter, if any. Can be used - with a custom callback to read only selected partitions of a stream. - By default, this does not filter out any files. - If wishing to filter out all file paths except those whose file extension - matches e.g. "*.txt*", a ``FileXtensionFilter("txt")`` can be provided. + `pyarrow.fs.FileSystem.open_input_file `_. + when opening input files to read. + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. + Use with a custom callback to read only selected partitions of a + dataset. By default, no files are filtered. + To filter out all file paths except those whose file extension + matches e.g., "*.txt*", you can provide a + :class:`~ray.data.datasource.FileExtensionFilter`. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object that describes how paths are organized. Defaults to ``None``. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not found. Defaults to False. Returns: - Dataset producing lines of text read from the specified paths. + :class:`~ray.data.Dataset` producing lines of text read from the specified + paths. """ return read_datasource( TextDatasource(), @@ -1086,14 +1284,17 @@ def read_numpy( """Create an Arrow dataset from numpy files. Examples: + Read a directory of files in remote storage. + >>> import ray - >>> # Read a directory of files in remote storage. >>> ray.data.read_numpy("s3://bucket/path") # doctest: +SKIP - >>> # Read multiple local files. + Read multiple local files. + >>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP - >>> # Read multiple directories. + Read multiple directories. + >>> ray.data.read_numpy( # doctest: +SKIP ... ["s3://bucket/path1", "s3://bucket/path2"]) @@ -1146,73 +1347,72 @@ def read_tfrecords( ignore_missing_paths: bool = False, tf_schema: Optional["schema_pb2.Schema"] = None, ) -> Dataset: - """Create a dataset from TFRecord files that contain + """Create a :class:`~ray.data.Dataset` from TFRecord files that contain `tf.train.Example `_ messages. .. warning:: This function exclusively supports ``tf.train.Example`` messages. If a file contains a message that isn't of type ``tf.train.Example``, then this function - errors. + fails. Examples: - >>> import os - >>> import tempfile - >>> import tensorflow as tf - >>> features = tf.train.Features( - ... feature={ - ... "length": tf.train.Feature(float_list=tf.train.FloatList(value=[5.1])), - ... "width": tf.train.Feature(float_list=tf.train.FloatList(value=[3.5])), - ... "species": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"setosa"])), - ... } - ... ) - >>> example = tf.train.Example(features=features) - >>> path = os.path.join(tempfile.gettempdir(), "data.tfrecords") - >>> with tf.io.TFRecordWriter(path=path) as writer: - ... writer.write(example.SerializeToString()) - - This function reads ``tf.train.Example`` messages into a tabular - :class:`~ray.data.Dataset`. - >>> import ray - >>> ds = ray.data.read_tfrecords(path) - >>> ds.to_pandas() # doctest: +SKIP - length width species - 0 5.1 3.5 b'setosa' - - We can also read compressed TFRecord files which uses one of the - `compression type supported by Arrow `_: + >>> ray.data.read_tfrecords("example://iris.tfrecords") + Dataset( + num_blocks=..., + num_rows=150, + schema={...} + ) - >>> compressed_path = os.path.join(tempfile.gettempdir(), "data_compressed.tfrecords") - >>> options = tf.io.TFRecordOptions(compression_type="GZIP") # "ZLIB" also supported by TensorFlow - >>> with tf.io.TFRecordWriter(path=compressed_path, options=options) as writer: - ... writer.write(example.SerializeToString()) + We can also read compressed TFRecord files, which use one of the + `compression types supported by Arrow `_: - >>> ds = ray.data.read_tfrecords( - ... [compressed_path], + >>> ray.data.read_tfrecords( + ... "example://iris.tfrecords.gz", ... arrow_open_stream_args={"compression": "gzip"}, ... ) - >>> ds.to_pandas() # doctest: +SKIP - length width species - 0 5.1 3.5 b'setosa' + Dataset( + num_blocks=..., + num_rows=150, + schema={...} + ) Args: - paths: A single file/directory path or a list of file/directory paths. + paths: A single file or directory, or a list of file or directory paths. A list of paths can contain both files and directories. - filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files in the dataset. - arrow_open_stream_args: Key-word arguments passed to - ``pyarrow.fs.FileSystem.open_input_stream``. To read a compressed TFRecord file, - pass the corresponding compression type (e.g. for ``GZIP`` or ``ZLIB``, use - ``arrow_open_stream_args={'compression_type': 'gzip'}``). - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - partition_filter: Path-based partition filter, if any. Can be used - with a custom callback to read only selected partitions of a dataset. + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are specified in the + `PyArrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the `S3FileSystem` is used. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + records in all the CSV files. + arrow_open_stream_args: kwargs passed to + `pyarrow.fs.FileSystem.open_input_file `_. + when opening input files to read. To read a compressed TFRecord file, + pass the corresponding compression type (e.g., for ``GZIP`` or ``ZLIB``), + use ``arrow_open_stream_args={'compression_type': 'gzip'}``). + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. + Use with a custom callback to read only selected partitions of a + dataset. By default, this filters out any file paths whose file extension does not match ``"*.tfrecords*"``. - ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not + ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not found. Defaults to False. tf_schema: Optional TensorFlow Schema which is used to explicitly set the schema of the underlying Dataset. @@ -1222,7 +1422,7 @@ def read_tfrecords( Raises: ValueError: If a file contains a message that isn't a ``tf.train.Example``. - """ # noqa: E501 + """ return read_datasource( TFRecordDatasource(), parallelism=parallelism, @@ -1308,44 +1508,72 @@ def read_binary_files( partition_filter: Optional[PathPartitionFilter] = None, partitioning: Partitioning = None, ignore_missing_paths: bool = False, - output_arrow_format: bool = False, ) -> Dataset: - """Create a dataset from binary files of arbitrary contents. + """Create a :class:`~ray.data.Dataset` from binary files of arbitrary contents. Examples: + Read a file in remote storage. + >>> import ray - >>> # Read a directory of files in remote storage. - >>> ray.data.read_binary_files("s3://bucket/path") # doctest: +SKIP + >>> path = "s3://anonymous@ray-example-data/pdf-sample_0.pdf" + >>> ds = ray.data.read_binary_files(path) + >>> ds.schema() + Column Type + ------ ---- + bytes binary + + Read multiple local files. - >>> # Read multiple local files. >>> ray.data.read_binary_files( # doctest: +SKIP - ... ["/path/to/file1", "/path/to/file2"]) + ... ["local:///path/to/file1", "local:///path/to/file2"]) + + Read a file with the filepaths included as a column in the dataset. + + >>> path = "s3://anonymous@ray-example-data/pdf-sample_0.pdf" + >>> ds = ray.data.read_binary_files(path, include_paths=True) + >>> ds.take(1)[0]["path"] + 'ray-example-data/pdf-sample_0.pdf' + Args: - paths: A single file path or a list of file paths (or directories). - include_paths: Whether to include the full path of the file in the - dataset records. When specified, the stream records will be a - tuple of the file path and the file contents. - filesystem: The filesystem implementation to read from. - ray_remote_args: kwargs passed to ray.remote in the read tasks. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the stream. + paths: A single file or directory, or a list of file or directory paths. + A list of paths can contain both files and directories. + include_paths: If ``True``, include the path to each file. File paths are + stored in the ``'path'`` column. + filesystem: The PyArrow filesystem + implementation to read from. These filesystems are specified in the + `PyArrow docs `_. Specify this parameter if + you need to provide specific configurations to the filesystem. By default, + the filesystem is automatically selected based on the scheme of the paths. + For example, if the path begins with ``s3://``, the `S3FileSystem` is used. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + parallelism: The amount of parallelism to use for the dataset. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. Parallelism is upper bounded by the total number of + files. arrow_open_stream_args: kwargs passed to - pyarrow.fs.FileSystem.open_input_stream - meta_provider: File metadata provider. Custom metadata providers may - be able to resolve file metadata more quickly and/or accurately. - partition_filter: Path-based partition filter, if any. Can be used - with a custom callback to read only selected partitions of a dataset. + `pyarrow.fs.FileSystem.open_input_file `_. + meta_provider: A :ref:`file metadata provider `. Custom + metadata providers may be able to resolve file metadata more quickly and/or + accurately. In most cases, you do not need to set this. + partition_filter: A + :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. + Use with a custom callback to read only selected partitions of a + dataset. By default, no files are filtered. By default, this does not filter out any files. partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object that describes how paths are organized. Defaults to ``None``. ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not found. Defaults to False. - output_arrow_format: If True, returns data in Arrow format, instead of Python - list format. Defaults to False. Returns: - Dataset producing records read from the specified paths. + :class:`~ray.data.Dataset` producing rows read from the specified paths. """ output_arrow_format = True @@ -1432,8 +1660,13 @@ def create_connection(): connection_factory: A function that takes no arguments and returns a Python DB API2 `Connection object `_. - parallelism: The requested parallelism of the read. - ray_remote_args: Keyword arguments passed to :func:`ray.remote` in read tasks. + parallelism: The requested parallelism of the read. Defaults to -1, + which automatically determines the optimal parallelism for your + configuration. You should not need to manually set this value in most cases. + For details on how the parallelism is automatically determined and guidance + on how to tune it, see :ref:`Tuning read parallelism + `. + ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. Returns: A :class:`Dataset` containing the queried data. @@ -1519,13 +1752,25 @@ def from_modin(df: "modin.DataFrame") -> MaterializedDataset: def from_pandas( dfs: Union["pandas.DataFrame", List["pandas.DataFrame"]] ) -> MaterializedDataset: - """Create a dataset from a list of Pandas dataframes. + """Create a :class:`~ray.data.Dataset` from a list of pandas dataframes. + + Examples: + >>> import pandas as pd + >>> import ray + >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + >>> ray.data.from_pandas(df) + MaterializedDataset(num_blocks=1, num_rows=3, schema={a: int64, b: int64}) + + Create a Ray Dataset from a list of Pandas DataFrames. + + >>> ray.data.from_pandas([df, df]) + MaterializedDataset(num_blocks=2, num_rows=6, schema={a: int64, b: int64}) Args: - dfs: A Pandas dataframe or a list of Pandas dataframes. + dfs: A pandas dataframe or a list of pandas dataframes. Returns: - MaterializedDataset holding Arrow records read from the dataframes. + :class:`~ray.data.Dataset` holding data read from the dataframes. """ import pandas as pd @@ -1546,15 +1791,27 @@ def from_pandas( def from_pandas_refs( dfs: Union[ObjectRef["pandas.DataFrame"], List[ObjectRef["pandas.DataFrame"]]], ) -> MaterializedDataset: - """Create a dataset from a list of Ray object references to Pandas - dataframes. + """Create a :class:`~ray.data.Dataset` from a list of Ray object references to + pandas dataframes. + + Examples: + >>> import pandas as pd + >>> import ray + >>> df_ref = ray.put(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})) + >>> ray.data.from_pandas_refs(df_ref) + MaterializedDataset(num_blocks=1, num_rows=3, schema={a: int64, b: int64}) + + Create a Ray Dataset from a list of Pandas Dataframes references. + + >>> ray.data.from_pandas_refs([df_ref, df_ref]) + MaterializedDataset(num_blocks=2, num_rows=6, schema={a: int64, b: int64}) Args: - dfs: A Ray object references to pandas dataframe, or a list of + dfs: A Ray object reference to a pandas dataframe, or a list of Ray object references to pandas dataframes. Returns: - MaterializedDataset holding Arrow records read from the dataframes. + :class:`~ray.data.Dataset` holding data read from the dataframes. """ if isinstance(dfs, ray.ObjectRef): dfs = [dfs] @@ -1606,13 +1863,25 @@ def from_pandas_refs( @PublicAPI def from_numpy(ndarrays: Union[np.ndarray, List[np.ndarray]]) -> MaterializedDataset: - """Create a dataset from a list of NumPy ndarrays. + """Creates a :class:`~ray.data.Dataset` from a list of NumPy ndarrays. + + Examples: + >>> import numpy as np + >>> import ray + >>> arr = np.array([1]) + >>> ray.data.from_numpy(arr) + MaterializedDataset(num_blocks=1, num_rows=1, schema={data: int64}) + + Create a Ray Dataset from a list of NumPy arrays. + + >>> ray.data.from_numpy([arr, arr]) + MaterializedDataset(num_blocks=2, num_rows=2, schema={data: int64}) Args: ndarrays: A NumPy ndarray or a list of NumPy ndarrays. Returns: - MaterializedDataset holding the given ndarrays. + :class:`~ray.data.Dataset` holding data from the given ndarrays. """ if isinstance(ndarrays, np.ndarray): ndarrays = [ndarrays] @@ -1624,14 +1893,27 @@ def from_numpy(ndarrays: Union[np.ndarray, List[np.ndarray]]) -> MaterializedDat def from_numpy_refs( ndarrays: Union[ObjectRef[np.ndarray], List[ObjectRef[np.ndarray]]], ) -> MaterializedDataset: - """Create a dataset from a list of NumPy ndarray futures. + """Creates a :class:`~ray.data.Dataset` from a list of Ray object references to + NumPy ndarrays. + + Examples: + >>> import numpy as np + >>> import ray + >>> arr_ref = ray.put(np.array([1])) + >>> ray.data.from_numpy_refs(arr_ref) + MaterializedDataset(num_blocks=1, num_rows=1, schema={data: int64}) + + Create a Ray Dataset from a list of NumPy array references. + + >>> ray.data.from_numpy_refs([arr_ref, arr_ref]) + MaterializedDataset(num_blocks=2, num_rows=2, schema={data: int64}) Args: ndarrays: A Ray object reference to a NumPy ndarray or a list of Ray object references to NumPy ndarrays. Returns: - MaterializedDataset holding the given ndarrays. + :class:`~ray.data.Dataset` holding data from the given ndarrays. """ if isinstance(ndarrays, ray.ObjectRef): ndarrays = [ndarrays] @@ -1672,14 +1954,27 @@ def from_numpy_refs( def from_arrow( tables: Union["pyarrow.Table", bytes, List[Union["pyarrow.Table", bytes]]], ) -> MaterializedDataset: - """Create a dataset from a list of Arrow tables. + """Create a :class:`~ray.data.Dataset` from a list of PyArrow tables. + + Examples: + >>> import pyarrow as pa + >>> import ray + >>> table = pa.table({"x": [1]}) + >>> ray.data.from_arrow(table) + MaterializedDataset(num_blocks=1, num_rows=1, schema={x: int64}) + + Create a Ray Dataset from a list of PyArrow tables. + + >>> ray.data.from_arrow([table, table]) + MaterializedDataset(num_blocks=2, num_rows=2, schema={x: int64}) + Args: - tables: An Arrow table, or a list of Arrow tables, + tables: A PyArrow table, or a list of PyArrow tables, or its streaming format in bytes. Returns: - MaterializedDataset holding Arrow records from the tables. + :class:`~ray.data.Dataset` holding data from the PyArrow tables. """ import pyarrow as pa @@ -1695,14 +1990,28 @@ def from_arrow_refs( List[ObjectRef[Union["pyarrow.Table", bytes]]], ], ) -> MaterializedDataset: - """Create a dataset from a set of Arrow tables. + """Create a :class:`~ray.data.Dataset` from a list of Ray object references to + PyArrow tables. + + Examples: + >>> import pyarrow as pa + >>> import ray + >>> table_ref = ray.put(pa.table({"x": [1]})) + >>> ray.data.from_arrow_refs(table_ref) + MaterializedDataset(num_blocks=1, num_rows=1, schema={x: int64}) + + Create a Ray Dataset from a list of PyArrow table references + + >>> ray.data.from_arrow_refs([table_ref, table_ref]) + MaterializedDataset(num_blocks=2, num_rows=2, schema={x: int64}) + Args: tables: A Ray object reference to Arrow table, or list of Ray object references to Arrow tables, or its streaming format in bytes. Returns: - MaterializedDataset holding Arrow records from the tables. + :class:`~ray.data.Dataset` holding data read from the tables. """ if isinstance(tables, ray.ObjectRef): tables = [tables] @@ -1733,7 +2042,7 @@ def from_spark( spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). parallelism: The amount of parallelism to use for the dataset. - If not provided, it will be equal to the number of partitions of + If not provided, it is set to the number of partitions of the original Spark dataframe. Returns: @@ -1756,34 +2065,52 @@ def from_huggingface( Example: - .. doctest:: + .. + The following `testoutput` is mocked to avoid illustrating download + logs like "Downloading and preparing dataset 162.17 MiB". - >>> import ray - >>> import datasets - >>> hf_dataset = datasets.load_dataset("tweet_eval", "emotion") - Downloading ... - >>> ray_ds = ray.data.from_huggingface(hf_dataset) - >>> ray_ds - {'train': MaterializedDataset( - num_blocks=..., - num_rows=3257, - schema={text: string, label: int64} - ), 'test': MaterializedDataset( - num_blocks=..., - num_rows=1421, - schema={text: string, label: int64} - ), 'validation': MaterializedDataset( - num_blocks=..., - num_rows=374, - schema={text: string, label: int64} - )} - >>> ray_ds = ray.data.from_huggingface(hf_dataset["train"]) - >>> ray_ds - MaterializedDataset( - num_blocks=..., - num_rows=3257, - schema={text: string, label: int64} - ) + .. testcode:: + + import ray + import datasets + + hf_dataset = datasets.load_dataset("tweet_eval", "emotion") + ray_ds = ray.data.from_huggingface(hf_dataset) + + print(ray_ds) + + .. testoutput:: + :options: +MOCK + + {'train': MaterializedDataset( + num_blocks=..., + num_rows=3257, + schema={text: string, label: int64} + ), 'test': MaterializedDataset( + num_blocks=..., + num_rows=1421, + schema={text: string, label: int64} + ), 'validation': MaterializedDataset( + num_blocks=..., + num_rows=374, + schema={text: string, label: int64} + )} + + Load only a single split of the Huggingface Dataset. + + .. testcode:: + + ray_ds = ray.data.from_huggingface(hf_dataset["train"]) + print(ray_ds) + + .. testoutput:: + :options: +MOCK + + MaterializedDataset( + num_blocks=..., + num_rows=3257, + schema={text: string, label: int64} + ) Args: dataset: A Hugging Face Dataset, or DatasetDict. IterableDataset is not diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 1950ba59767d..d374801026ac 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -494,9 +494,6 @@ def test_convert_types(ray_start_regular_shared): def test_from_items(ray_start_regular_shared): ds = ray.data.from_items(["hello", "world"]) assert extract_values("item", ds.take()) == ["hello", "world"] - - ds = ray.data.from_items([{"hello": "world"}], output_arrow_format=True) - assert ds.take() == [{"hello": "world"}] assert isinstance(next(ds.iter_batches(batch_format=None)), pa.Table) diff --git a/rllib/offline/dataset_writer.py b/rllib/offline/dataset_writer.py index 705d3a4a78d6..b517933ce985 100644 --- a/rllib/offline/dataset_writer.py +++ b/rllib/offline/dataset_writer.py @@ -71,9 +71,7 @@ def write(self, sample_batch: SampleBatchType): # Todo: We should flush at the end of sampling even if this # condition was not reached. if len(self.samples) >= self.max_num_samples_per_file: - ds = data.from_items(self.samples, output_arrow_format=True).repartition( - num_blocks=1, shuffle=False - ) + ds = data.from_items(self.samples).repartition(num_blocks=1, shuffle=False) if self.format == "json": ds.write_json(self.path, try_create_dir=True) elif self.format == "parquet":