Skip to content

Commit

Permalink
[Data] Auto increasing block size for read_json (#42357)
Browse files Browse the repository at this point in the history
This PR adds logic to dynamically increase `block_size` used in Arrow's JSON loader, if the initial `block_size` resulting in a more graceful handling of these cases.

Signed-off-by: Matthew Owen <[email protected]>
  • Loading branch information
omatthew98 authored Jan 22, 2024
1 parent cbd07f9 commit 3d91fb7
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 16 deletions.
58 changes: 56 additions & 2 deletions python/ray/data/datasource/json_datasource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from ray.data._internal.dataset_logger import DatasetLogger
from ray.data.context import DataContext
from ray.data.datasource.file_based_datasource import FileBasedDatasource
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
import pyarrow

logger = DatasetLogger(__name__)


@PublicAPI
class JSONDatasource(FileBasedDatasource):
Expand Down Expand Up @@ -34,6 +38,56 @@ def __init__(

# TODO(ekl) The PyArrow JSON reader doesn't support streaming reads.
def _read_stream(self, f: "pyarrow.NativeFile", path: str):
from pyarrow import json
from io import BytesIO

from pyarrow import ArrowInvalid, json

# When reading large files, the default block size configured in PyArrow can be
# too small, resulting in the following error: `pyarrow.lib.ArrowInvalid:
# straddling object straddles two block boundaries (try to increase block
# size?)`. More information on this issue can be found here:
# https://github.com/apache/arrow/issues/25674 The read will be retried with
# geometrically increasing block size until the size reaches
# `DataContext.get_current().target_max_block_size`. The initial block size
# will start at the PyArrow default block size or it can be manually set
# through the `read_options` parameter as follows.
#
# >>> import pyarrow.json as pajson
# >>> block_size = 10 << 20 # Set block size to 10MB
# >>> ray.data.read_json( # doctest: +SKIP
# ... "s3://anonymous@ray-example-data/log.json",
# ... read_options=pajson.ReadOptions(block_size=block_size)
# ... )

yield json.read_json(f, read_options=self.read_options, **self.arrow_json_args)
buffer = f.read_buffer()
init_block_size = self.read_options.block_size
max_block_size = DataContext.get_current().target_max_block_size
while True:
try:
yield json.read_json(
BytesIO(buffer),
read_options=self.read_options,
**self.arrow_json_args,
)
self.read_options.block_size = init_block_size
break
except ArrowInvalid as e:
if "straddling object straddles two block boundaries" in str(e):
if self.read_options.block_size < max_block_size:
# Increase the block size in case it was too small.
logger.get_logger(log_to_stdout=False).info(
f"JSONDatasource read failed with "
f"block_size={self.read_options.block_size}. Retrying with "
f"block_size={self.read_options.block_size * 2}."
)
self.read_options.block_size *= 2
else:
raise ArrowInvalid(
f"{e} - Auto-increasing block size to "
f"{self.read_options.block_size} bytes failed. "
f"More information on this issue can be found here: "
f"https://github.com/apache/arrow/issues/25674"
)
else:
# unrelated error, simply reraise
raise e
14 changes: 0 additions & 14 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,20 +1042,6 @@ def read_json(
>>> ds.take(1)
[{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}]
When reading large files, the default block size configured in PyArrow can be too small,
resulting in the following error:
``pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries
(try to increase block size?)``.
To resolve this, use the ``read_options`` parameter to set a larger block size:
>>> import pyarrow.json as pajson
>>> block_size = 10 << 20 # Set block size to 10MB
>>> ray.data.read_json( # doctest: +SKIP
... "s3://anonymous@ray-example-data/log.json",
... read_options=pajson.ReadOptions(block_size=block_size)
... )
Args:
paths: A single file or directory, or a list of file or directory paths.
A list of paths can contain both files and directories.
Expand Down
69 changes: 69 additions & 0 deletions python/ray/data/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,75 @@ def test_json_write_block_path_provider(
assert df.equals(ds_df)


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoint_url):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))

# Single small file, unit block_size
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(data_path, "test1.json")
df1.to_json(path1, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(
path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1)
)
dsdf = ds.to_pandas()
assert df1.equals(dsdf)
# Test metadata ops.
assert ds.count() == 3
assert ds.input_files() == [_unwrap_protocol(path1)]
assert "{one: int64, two: string}" in str(ds), ds

# Single large file, default block_size
num_chars = 2500000
num_rows = 3
df2 = pd.DataFrame(
{
"one": ["a" * num_chars for _ in range(num_rows)],
"two": ["b" * num_chars for _ in range(num_rows)],
}
)
path2 = os.path.join(data_path, "test2.json")
df2.to_json(path2, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(path2, filesystem=fs)
dsdf = ds.to_pandas()
assert df2.equals(dsdf)
# Test metadata ops.
assert ds.count() == num_rows
assert ds.input_files() == [_unwrap_protocol(path2)]
assert "{one: string, two: string}" in str(ds), ds

# Single file, negative and zero block_size (expect failure)
df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path3 = os.path.join(data_path, "test3.json")
df3.to_json(path3, orient="records", lines=True, storage_options=storage_options)
try:
# Negative Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Negative buffer resize" in str(e.cause)
try:
# Zero Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Empty JSON file" in str(e.cause)


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 3d91fb7

Please sign in to comment.