diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 07f53a6e7cc4..673bdcb24ab2 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -1,11 +1,15 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from ray.data._internal.dataset_logger import DatasetLogger +from ray.data.context import DataContext from ray.data.datasource.file_based_datasource import FileBasedDatasource from ray.util.annotations import PublicAPI if TYPE_CHECKING: import pyarrow +logger = DatasetLogger(__name__) + @PublicAPI class JSONDatasource(FileBasedDatasource): @@ -34,6 +38,56 @@ def __init__( # TODO(ekl) The PyArrow JSON reader doesn't support streaming reads. def _read_stream(self, f: "pyarrow.NativeFile", path: str): - from pyarrow import json + from io import BytesIO + + from pyarrow import ArrowInvalid, json + + # When reading large files, the default block size configured in PyArrow can be + # too small, resulting in the following error: `pyarrow.lib.ArrowInvalid: + # straddling object straddles two block boundaries (try to increase block + # size?)`. More information on this issue can be found here: + # https://github.com/apache/arrow/issues/25674 The read will be retried with + # geometrically increasing block size until the size reaches + # `DataContext.get_current().target_max_block_size`. The initial block size + # will start at the PyArrow default block size or it can be manually set + # through the `read_options` parameter as follows. + # + # >>> import pyarrow.json as pajson + # >>> block_size = 10 << 20 # Set block size to 10MB + # >>> ray.data.read_json( # doctest: +SKIP + # ... "s3://anonymous@ray-example-data/log.json", + # ... read_options=pajson.ReadOptions(block_size=block_size) + # ... ) - yield json.read_json(f, read_options=self.read_options, **self.arrow_json_args) + buffer = f.read_buffer() + init_block_size = self.read_options.block_size + max_block_size = DataContext.get_current().target_max_block_size + while True: + try: + yield json.read_json( + BytesIO(buffer), + read_options=self.read_options, + **self.arrow_json_args, + ) + self.read_options.block_size = init_block_size + break + except ArrowInvalid as e: + if "straddling object straddles two block boundaries" in str(e): + if self.read_options.block_size < max_block_size: + # Increase the block size in case it was too small. + logger.get_logger(log_to_stdout=False).info( + f"JSONDatasource read failed with " + f"block_size={self.read_options.block_size}. Retrying with " + f"block_size={self.read_options.block_size * 2}." + ) + self.read_options.block_size *= 2 + else: + raise ArrowInvalid( + f"{e} - Auto-increasing block size to " + f"{self.read_options.block_size} bytes failed. " + f"More information on this issue can be found here: " + f"https://github.com/apache/arrow/issues/25674" + ) + else: + # unrelated error, simply reraise + raise e diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 36cb20222b9f..dba9e934954a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1042,20 +1042,6 @@ def read_json( >>> ds.take(1) [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}] - When reading large files, the default block size configured in PyArrow can be too small, - resulting in the following error: - ``pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries - (try to increase block size?)``. - - To resolve this, use the ``read_options`` parameter to set a larger block size: - - >>> import pyarrow.json as pajson - >>> block_size = 10 << 20 # Set block size to 10MB - >>> ray.data.read_json( # doctest: +SKIP - ... "s3://anonymous@ray-example-data/log.json", - ... read_options=pajson.ReadOptions(block_size=block_size) - ... ) - Args: paths: A single file or directory, or a list of file or directory paths. A list of paths can contain both files and directories. diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 1701301fda6f..1972b8056f5a 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -615,6 +615,75 @@ def test_json_write_block_path_provider( assert df.equals(ds_df) +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (None, lazy_fixture("local_path"), None), + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoint_url): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + # Single small file, unit block_size + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + path1 = os.path.join(data_path, "test1.json") + df1.to_json(path1, orient="records", lines=True, storage_options=storage_options) + ds = ray.data.read_json( + path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1) + ) + dsdf = ds.to_pandas() + assert df1.equals(dsdf) + # Test metadata ops. + assert ds.count() == 3 + assert ds.input_files() == [_unwrap_protocol(path1)] + assert "{one: int64, two: string}" in str(ds), ds + + # Single large file, default block_size + num_chars = 2500000 + num_rows = 3 + df2 = pd.DataFrame( + { + "one": ["a" * num_chars for _ in range(num_rows)], + "two": ["b" * num_chars for _ in range(num_rows)], + } + ) + path2 = os.path.join(data_path, "test2.json") + df2.to_json(path2, orient="records", lines=True, storage_options=storage_options) + ds = ray.data.read_json(path2, filesystem=fs) + dsdf = ds.to_pandas() + assert df2.equals(dsdf) + # Test metadata ops. + assert ds.count() == num_rows + assert ds.input_files() == [_unwrap_protocol(path2)] + assert "{one: string, two: string}" in str(ds), ds + + # Single file, negative and zero block_size (expect failure) + df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + path3 = os.path.join(data_path, "test3.json") + df3.to_json(path3, orient="records", lines=True, storage_options=storage_options) + try: + # Negative Buffer Size + ds = ray.data.read_json( + path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1) + ) + dsdf = ds.to_pandas() + except pa.ArrowInvalid as e: + assert "Negative buffer resize" in str(e.cause) + try: + # Zero Buffer Size + ds = ray.data.read_json( + path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0) + ) + dsdf = ds.to_pandas() + except pa.ArrowInvalid as e: + assert "Empty JSON file" in str(e.cause) + + if __name__ == "__main__": import sys