From 821c54082ca2673365411425a865df74d6e697d8 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Tue, 23 Jan 2024 12:46:12 -0800 Subject: [PATCH] [Data] Add `read_json()` fallback to `json.load()` when pyarrow read fails (#42558) When `pyarrow.json` fails to read in a JSON file for whatever reason, we add a fallback to reading in the bytes of the file, then using `json.load()` to parse the JSON, so that we can better support JSON files. Signed-off-by: Scott Lee Signed-off-by: khluu --- python/ray/data/datasource/json_datasource.py | 73 +++++++++++++++---- python/ray/data/tests/test_json.py | 42 +++++++---- 2 files changed, 87 insertions(+), 28 deletions(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 673bdcb24ab2..ca075a7bad2b 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -1,3 +1,4 @@ +from io import BytesIO from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from ray.data._internal.dataset_logger import DatasetLogger @@ -36,22 +37,21 @@ def __init__( ) self.arrow_json_args = arrow_json_args - # TODO(ekl) The PyArrow JSON reader doesn't support streaming reads. - def _read_stream(self, f: "pyarrow.NativeFile", path: str): - from io import BytesIO - - from pyarrow import ArrowInvalid, json + def _read_with_pyarrow_read_json(self, buffer: "pyarrow.lib.Buffer"): + """Read with PyArrow JSON reader, trying to auto-increase the + read block size in the case of the read object + straddling block boundaries.""" + import pyarrow as pa # When reading large files, the default block size configured in PyArrow can be # too small, resulting in the following error: `pyarrow.lib.ArrowInvalid: # straddling object straddles two block boundaries (try to increase block # size?)`. More information on this issue can be found here: - # https://github.com/apache/arrow/issues/25674 The read will be retried with - # geometrically increasing block size until the size reaches - # `DataContext.get_current().target_max_block_size`. The initial block size - # will start at the PyArrow default block size or it can be manually set - # through the `read_options` parameter as follows. - # + # https://github.com/apache/arrow/issues/25674 + # The read will be retried with geometrically increasing block size + # until the size reaches `DataContext.get_current().target_max_block_size`. + # The initial block size will start at the PyArrow default block size + # or it can be manually set through the `read_options` parameter as follows. # >>> import pyarrow.json as pajson # >>> block_size = 10 << 20 # Set block size to 10MB # >>> ray.data.read_json( # doctest: +SKIP @@ -59,19 +59,18 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): # ... read_options=pajson.ReadOptions(block_size=block_size) # ... ) - buffer = f.read_buffer() init_block_size = self.read_options.block_size max_block_size = DataContext.get_current().target_max_block_size while True: try: - yield json.read_json( + yield pa.json.read_json( BytesIO(buffer), read_options=self.read_options, **self.arrow_json_args, ) self.read_options.block_size = init_block_size break - except ArrowInvalid as e: + except pa.ArrowInvalid as e: if "straddling object straddles two block boundaries" in str(e): if self.read_options.block_size < max_block_size: # Increase the block size in case it was too small. @@ -82,12 +81,56 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): ) self.read_options.block_size *= 2 else: - raise ArrowInvalid( + raise pa.ArrowInvalid( f"{e} - Auto-increasing block size to " f"{self.read_options.block_size} bytes failed. " + f"Please try manually increasing the block size through " + f"the `read_options` parameter to a larger size. " + f"For example: `read_json(..., read_options=" + f"pyarrow.json.ReadOptions(block_size=10 << 25))`" f"More information on this issue can be found here: " f"https://github.com/apache/arrow/issues/25674" ) else: # unrelated error, simply reraise raise e + + def _read_with_python_json(self, buffer: "pyarrow.lib.Buffer"): + """Fallback method to read JSON files with Python's native json.load(), + in case the default pyarrow json reader fails.""" + import json + + import pyarrow as pa + + parsed_json = json.load(BytesIO(buffer)) + try: + yield pa.Table.from_pylist(parsed_json) + except AttributeError as e: + # For PyArrow < 7.0.0, `pa.Table.from_pylist()` is not available. + # Construct a dict from the list and call + # `pa.Table.from_pydict()` instead. + assert "no attribute 'from_pylist'" in str(e), str(e) + from collections import defaultdict + + dct = defaultdict(list) + for row in parsed_json: + for k, v in row.items(): + dct[k].append(v) + yield pa.Table.from_pydict(dct) + + # TODO(ekl) The PyArrow JSON reader doesn't support streaming reads. + def _read_stream(self, f: "pyarrow.NativeFile", path: str): + import pyarrow as pa + + buffer: pa.lib.Buffer = f.read_buffer() + + try: + yield from self._read_with_pyarrow_read_json(buffer) + except pa.ArrowInvalid as e: + # If read with PyArrow fails, try falling back to native json.load(). + logger.get_logger(log_to_stdout=False).warning( + f"Error reading with pyarrow.json.read_json(). " + f"Falling back to native json.load(), which may be slower. " + f"PyArrow error was:\n{e}" + ) + yield from self._read_with_python_json(buffer) diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 1972b8056f5a..4e9f04a4905b 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -19,7 +19,6 @@ ) from ray.data.datasource.path_util import _unwrap_protocol from ray.data.tests.conftest import * # noqa -from ray.data.tests.mock_http_server import * # noqa from ray.data.tests.test_partitioning import PathPartitionEncoder from ray.data.tests.util import Counter from ray.tests.conftest import * # noqa @@ -257,6 +256,26 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path): shutil.rmtree(dir_path) +def test_read_json_fallback_from_pyarrow_failure(ray_start_regular_shared, local_path): + # Try to read this with read_json() to trigger fallback logic + # to read bytes with json.load(). + data = [{"one": [1]}, {"one": [1, 2]}] + path1 = os.path.join(local_path, "test1.json") + with open(path1, "w") as f: + json.dump(data, f) + + # pyarrow.json cannot read JSONs containing arrays of different lengths. + from pyarrow import ArrowInvalid + + with pytest.raises(ArrowInvalid): + pajson.read_json(path1) + + # Ray Data successfully reads this in by + # falling back to json.load() when pyarrow fails. + ds = ray.data.read_json(path1) + assert ds.take_all() == data + + @pytest.mark.parametrize( "fs,data_path,endpoint_url", [ @@ -666,22 +685,19 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) path3 = os.path.join(data_path, "test3.json") df3.to_json(path3, orient="records", lines=True, storage_options=storage_options) - try: - # Negative Buffer Size - ds = ray.data.read_json( - path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1) - ) - dsdf = ds.to_pandas() - except pa.ArrowInvalid as e: - assert "Negative buffer resize" in str(e.cause) - try: - # Zero Buffer Size + + # Negative Buffer Size, fails with arrow but succeeds in fallback to json.load() + ds = ray.data.read_json( + path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1) + ) + dsdf = ds.to_pandas() + + # Zero Buffer Size, fails with arrow and fails in fallback to json.load() + with pytest.raises(json.decoder.JSONDecodeError, match="Extra data"): ds = ray.data.read_json( path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0) ) dsdf = ds.to_pandas() - except pa.ArrowInvalid as e: - assert "Empty JSON file" in str(e.cause) if __name__ == "__main__":