Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Auto increasing block size for read_json #42357

Merged
merged 13 commits into from
Jan 22, 2024
38 changes: 36 additions & 2 deletions python/ray/data/datasource/json_datasource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from ray.data.context import DataContext
from ray.data.datasource.file_based_datasource import FileBasedDatasource
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
import pyarrow

logger = logging.getLogger(__name__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@PublicAPI
class JSONDatasource(FileBasedDatasource):
Expand Down Expand Up @@ -34,6 +38,36 @@ def __init__(

# TODO(ekl) The PyArrow JSON reader doesn't support streaming reads.
def _read_stream(self, f: "pyarrow.NativeFile", path: str):
from pyarrow import json
from io import BytesIO

from pyarrow import ArrowInvalid, json

yield json.read_json(f, read_options=self.read_options, **self.arrow_json_args)
buffer = f.read_buffer()
block_size = self.read_options.block_size
use_threads = self.read_options.use_threads
max_block_size = DataContext.get_current().target_max_block_size
while True:
try:
yield json.read_json(
BytesIO(buffer),
read_options=json.ReadOptions(
use_threads=use_threads, block_size=block_size
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since ReadOptions can have other attributes/parameters as well, can we pass those or create + modify a copy?

**self.arrow_json_args,
)
break
except ArrowInvalid as e:
if (
isinstance(e, ArrowInvalid)
and "straddling" not in str(e)
or block_size > max_block_size
):
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's modify the error message of the exception that is raised to include the largest block size that was tried. and let's also include a link to this GH issue for more details: apache/arrow#25674

Copy link
Contributor Author

@omatthew98 omatthew98 Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the exception to this: pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries (try to increase block size?) - Auto-increasing block size to 4B failed. More information on this issue can be found here: https://github.com/apache/arrow/issues/25674

else:
# Increase the block size in case it was too small.
logger.info(
f"JSONDatasource read failed with "
f"block_size={block_size}. Retrying with "
f"block_size={block_size * 2}."
)
block_size *= 2
7 changes: 4 additions & 3 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,10 @@ def read_json(
When reading large files, the default block size configured in PyArrow can be too small,
resulting in the following error:
``pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries
(try to increase block size?)``.

To resolve this, use the ``read_options`` parameter to set a larger block size:
(try to increase block size?)``. The read will be retried with geometrically
increasing block size until the size reaches `DataContext.get_current().target_max_block_size`.
The initial block size will start at the PyArrow default block size or it can be
manually set through the ``read_options`` parameter as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this PR, ideally the user shouldn't be seeing this error message any more.
let's instead move this info (related to the "try to increase block size?") into the new code you implemented in _read_stream()


>>> import pyarrow.json as pajson
>>> block_size = 10 << 20 # Set block size to 10MB
Expand Down
69 changes: 69 additions & 0 deletions python/ray/data/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,75 @@ def test_json_write_block_path_provider(
assert df.equals(ds_df)


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoint_url):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))

# Single small file, unit block_size
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(data_path, "test1.json")
df1.to_json(path1, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(
path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1)
)
dsdf = ds.to_pandas()
assert df1.equals(dsdf)
# Test metadata ops.
assert ds.count() == 3
assert ds.input_files() == [_unwrap_protocol(path1)]
assert "{one: int64, two: string}" in str(ds), ds

# Single large file, default block_size
num_chars = 2500000
num_rows = 3
df2 = pd.DataFrame(
{
"one": ["a" * num_chars for _ in range(num_rows)],
"two": ["b" * num_chars for _ in range(num_rows)],
}
)
path2 = os.path.join(data_path, "test2.json")
df2.to_json(path2, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(path2, filesystem=fs)
dsdf = ds.to_pandas()
assert df2.equals(dsdf)
# Test metadata ops.
assert ds.count() == num_rows
assert ds.input_files() == [_unwrap_protocol(path2)]
assert "{one: string, two: string}" in str(ds), ds

# Single file, negative and zero block_size (expect failure)
df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path3 = os.path.join(data_path, "test3.json")
df3.to_json(path3, orient="records", lines=True, storage_options=storage_options)
try:
# Negative Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Negative buffer resize" in str(e.cause)
try:
# Zero Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Empty JSON file" in str(e.cause)


if __name__ == "__main__":
import sys

Expand Down
Loading