Skip to content

Commit

Permalink
[Data] Add read_json() fallback to json.load() when pyarrow read …
Browse files Browse the repository at this point in the history
…fails (ray-project#42558)

When `pyarrow.json` fails to read in a JSON file for whatever reason, we add a fallback to reading in the bytes of the file, then using `json.load()` to parse the JSON, so that we can better support JSON files.

Signed-off-by: Scott Lee <[email protected]>
Signed-off-by: khluu <[email protected]>
  • Loading branch information
scottjlee authored and khluu committed Jan 24, 2024
1 parent 11d880f commit 821c540
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 28 deletions.
73 changes: 58 additions & 15 deletions python/ray/data/datasource/json_datasource.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from io import BytesIO
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from ray.data._internal.dataset_logger import DatasetLogger
Expand Down Expand Up @@ -36,42 +37,40 @@ def __init__(
)
self.arrow_json_args = arrow_json_args

# TODO(ekl) The PyArrow JSON reader doesn't support streaming reads.
def _read_stream(self, f: "pyarrow.NativeFile", path: str):
from io import BytesIO

from pyarrow import ArrowInvalid, json
def _read_with_pyarrow_read_json(self, buffer: "pyarrow.lib.Buffer"):
"""Read with PyArrow JSON reader, trying to auto-increase the
read block size in the case of the read object
straddling block boundaries."""
import pyarrow as pa

# When reading large files, the default block size configured in PyArrow can be
# too small, resulting in the following error: `pyarrow.lib.ArrowInvalid:
# straddling object straddles two block boundaries (try to increase block
# size?)`. More information on this issue can be found here:
# https://github.com/apache/arrow/issues/25674 The read will be retried with
# geometrically increasing block size until the size reaches
# `DataContext.get_current().target_max_block_size`. The initial block size
# will start at the PyArrow default block size or it can be manually set
# through the `read_options` parameter as follows.
#
# https://github.com/apache/arrow/issues/25674
# The read will be retried with geometrically increasing block size
# until the size reaches `DataContext.get_current().target_max_block_size`.
# The initial block size will start at the PyArrow default block size
# or it can be manually set through the `read_options` parameter as follows.
# >>> import pyarrow.json as pajson
# >>> block_size = 10 << 20 # Set block size to 10MB
# >>> ray.data.read_json( # doctest: +SKIP
# ... "s3://anonymous@ray-example-data/log.json",
# ... read_options=pajson.ReadOptions(block_size=block_size)
# ... )

buffer = f.read_buffer()
init_block_size = self.read_options.block_size
max_block_size = DataContext.get_current().target_max_block_size
while True:
try:
yield json.read_json(
yield pa.json.read_json(
BytesIO(buffer),
read_options=self.read_options,
**self.arrow_json_args,
)
self.read_options.block_size = init_block_size
break
except ArrowInvalid as e:
except pa.ArrowInvalid as e:
if "straddling object straddles two block boundaries" in str(e):
if self.read_options.block_size < max_block_size:
# Increase the block size in case it was too small.
Expand All @@ -82,12 +81,56 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str):
)
self.read_options.block_size *= 2
else:
raise ArrowInvalid(
raise pa.ArrowInvalid(
f"{e} - Auto-increasing block size to "
f"{self.read_options.block_size} bytes failed. "
f"Please try manually increasing the block size through "
f"the `read_options` parameter to a larger size. "
f"For example: `read_json(..., read_options="
f"pyarrow.json.ReadOptions(block_size=10 << 25))`"
f"More information on this issue can be found here: "
f"https://github.com/apache/arrow/issues/25674"
)
else:
# unrelated error, simply reraise
raise e

def _read_with_python_json(self, buffer: "pyarrow.lib.Buffer"):
"""Fallback method to read JSON files with Python's native json.load(),
in case the default pyarrow json reader fails."""
import json

import pyarrow as pa

parsed_json = json.load(BytesIO(buffer))
try:
yield pa.Table.from_pylist(parsed_json)
except AttributeError as e:
# For PyArrow < 7.0.0, `pa.Table.from_pylist()` is not available.
# Construct a dict from the list and call
# `pa.Table.from_pydict()` instead.
assert "no attribute 'from_pylist'" in str(e), str(e)
from collections import defaultdict

dct = defaultdict(list)
for row in parsed_json:
for k, v in row.items():
dct[k].append(v)
yield pa.Table.from_pydict(dct)

# TODO(ekl) The PyArrow JSON reader doesn't support streaming reads.
def _read_stream(self, f: "pyarrow.NativeFile", path: str):
import pyarrow as pa

buffer: pa.lib.Buffer = f.read_buffer()

try:
yield from self._read_with_pyarrow_read_json(buffer)
except pa.ArrowInvalid as e:
# If read with PyArrow fails, try falling back to native json.load().
logger.get_logger(log_to_stdout=False).warning(
f"Error reading with pyarrow.json.read_json(). "
f"Falling back to native json.load(), which may be slower. "
f"PyArrow error was:\n{e}"
)
yield from self._read_with_python_json(buffer)
42 changes: 29 additions & 13 deletions python/ray/data/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
)
from ray.data.datasource.path_util import _unwrap_protocol
from ray.data.tests.conftest import * # noqa
from ray.data.tests.mock_http_server import * # noqa
from ray.data.tests.test_partitioning import PathPartitionEncoder
from ray.data.tests.util import Counter
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -257,6 +256,26 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path):
shutil.rmtree(dir_path)


def test_read_json_fallback_from_pyarrow_failure(ray_start_regular_shared, local_path):
# Try to read this with read_json() to trigger fallback logic
# to read bytes with json.load().
data = [{"one": [1]}, {"one": [1, 2]}]
path1 = os.path.join(local_path, "test1.json")
with open(path1, "w") as f:
json.dump(data, f)

# pyarrow.json cannot read JSONs containing arrays of different lengths.
from pyarrow import ArrowInvalid

with pytest.raises(ArrowInvalid):
pajson.read_json(path1)

# Ray Data successfully reads this in by
# falling back to json.load() when pyarrow fails.
ds = ray.data.read_json(path1)
assert ds.take_all() == data


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
Expand Down Expand Up @@ -666,22 +685,19 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi
df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path3 = os.path.join(data_path, "test3.json")
df3.to_json(path3, orient="records", lines=True, storage_options=storage_options)
try:
# Negative Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Negative buffer resize" in str(e.cause)
try:
# Zero Buffer Size

# Negative Buffer Size, fails with arrow but succeeds in fallback to json.load()
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)
)
dsdf = ds.to_pandas()

# Zero Buffer Size, fails with arrow and fails in fallback to json.load()
with pytest.raises(json.decoder.JSONDecodeError, match="Extra data"):
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Empty JSON file" in str(e.cause)


if __name__ == "__main__":
Expand Down

0 comments on commit 821c540

Please sign in to comment.