From 4857e9556905f55c089aff1c8bec60b2f963f58f Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 11 Jan 2024 16:37:31 -0800 Subject: [PATCH 01/13] basic structure to auto increase block size for read_json Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 07f53a6e7cc4..231ade2057c1 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -1,3 +1,4 @@ +import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from ray.data.datasource.file_based_datasource import FileBasedDatasource @@ -6,6 +7,8 @@ if TYPE_CHECKING: import pyarrow +logger = logging.getLogger(__name__) + @PublicAPI class JSONDatasource(FileBasedDatasource): @@ -34,6 +37,32 @@ def __init__( # TODO(ekl) The PyArrow JSON reader doesn't support streaming reads. def _read_stream(self, f: "pyarrow.NativeFile", path: str): - from pyarrow import json + from pyarrow import ArrowInvalid, json - yield json.read_json(f, read_options=self.read_options, **self.arrow_json_args) + # Create local copy of read_options so block_size increases are not persisted + # between _read_stream calls. + local_read_options = json.ReadOptions( + use_threads=self.read_options.use_threads, + block_size=self.read_options.block_size, + ) + while True: + try: + yield json.read_json( + f, read_options=local_read_options, **self.arrow_json_args + ) + except ArrowInvalid as e: + # TODO: Figure out what MAX_BLOCK_SIZE would be / if necessary. + if ( + isinstance(e, ArrowInvalid) + and "straddling" not in str(e) + or local_read_options.block_size > MAX_BLOCK_SIZE + ): + raise e + else: + # Increase the block size in case it was too small. + logger.debug( + f"JSONDatasource read failed with " + f"block_size={local_read_options.block_size}. Retrying with " + f"block_size={local_read_options.block_size * 2}." + ) + local_read_options.block_size *= 2 From f6902fed918f058eef8a761643ba2871da6726dc Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Fri, 12 Jan 2024 11:54:35 -0800 Subject: [PATCH 02/13] use target max block size for the limit we try before failing Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 231ade2057c1..8916ef2aca15 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from ray.data.context import DataContext from ray.data.datasource.file_based_datasource import FileBasedDatasource from ray.util.annotations import PublicAPI @@ -45,6 +46,8 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): use_threads=self.read_options.use_threads, block_size=self.read_options.block_size, ) + ctx = DataContext.get_current() + max_block_size = ctx.target_max_block_size while True: try: yield json.read_json( @@ -55,7 +58,7 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): if ( isinstance(e, ArrowInvalid) and "straddling" not in str(e) - or local_read_options.block_size > MAX_BLOCK_SIZE + or local_read_options.block_size > max_block_size ): raise e else: From ebe1cfc2a43cd3ab0e99b52e4205bc9e2a491098 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Fri, 12 Jan 2024 16:45:00 -0800 Subject: [PATCH 03/13] adding in break to fix behavior Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 8916ef2aca15..90d3705544f1 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -53,6 +53,7 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): yield json.read_json( f, read_options=local_read_options, **self.arrow_json_args ) + break except ArrowInvalid as e: # TODO: Figure out what MAX_BLOCK_SIZE would be / if necessary. if ( From afeac34de5d846d3e038e17ae6d4dd5f29b80789 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Tue, 16 Jan 2024 14:57:07 -0800 Subject: [PATCH 04/13] add in _open_input_source to allow for resetting file pos Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 90d3705544f1..41b96557e60f 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -36,6 +36,16 @@ def __init__( ) self.arrow_json_args = arrow_json_args + def _open_input_source( + self, + filesystem: "pyarrow.fs.FileSystem", + path: str, + **open_args, + ) -> "pyarrow.NativeFile": + # JSON requires `open_input_file` for seekable file access + return filesystem.open_input_file(path, **open_args) + + # TODO(ekl) The PyArrow JSON reader doesn't support streaming reads. def _read_stream(self, f: "pyarrow.NativeFile", path: str): from pyarrow import ArrowInvalid, json @@ -46,8 +56,8 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): use_threads=self.read_options.use_threads, block_size=self.read_options.block_size, ) - ctx = DataContext.get_current() - max_block_size = ctx.target_max_block_size + init_file_pos = f.tell() + max_block_size = DataContext.get_current().target_max_block_size while True: try: yield json.read_json( @@ -55,7 +65,6 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): ) break except ArrowInvalid as e: - # TODO: Figure out what MAX_BLOCK_SIZE would be / if necessary. if ( isinstance(e, ArrowInvalid) and "straddling" not in str(e) @@ -64,9 +73,11 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): raise e else: # Increase the block size in case it was too small. - logger.debug( + logger.info( f"JSONDatasource read failed with " f"block_size={local_read_options.block_size}. Retrying with " f"block_size={local_read_options.block_size * 2}." ) local_read_options.block_size *= 2 + # Reset file position to re-attempt read. + f.seek(init_file_pos) From 360dc40f6ac1ef0c676b2cfc35a2784d71e7a75f Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Tue, 16 Jan 2024 14:58:09 -0800 Subject: [PATCH 05/13] add in test_json_read_across_blocks Signed-off-by: Matthew Owen --- python/ray/data/tests/test_json.py | 63 ++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 1701301fda6f..efef5a1c2930 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -615,6 +615,69 @@ def test_json_write_block_path_provider( assert df.equals(ds_df) +@pytest.mark.parametrize( + "fs,data_path,endpoint_url", + [ + (None, lazy_fixture("local_path"), None), + (lazy_fixture("local_fs"), lazy_fixture("local_path"), None), + (lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")), + ], +) +def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoint_url): + if endpoint_url is None: + storage_options = {} + else: + storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url)) + + # Single small file, unit block_size + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + path1 = os.path.join(data_path, "test1.json") + df1.to_json(path1, orient="records", lines=True, storage_options=storage_options) + ds = ray.data.read_json(path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1)) + dsdf = ds.to_pandas() + assert df1.equals(dsdf) + # Test metadata ops. + assert ds.count() == 3 + assert ds.input_files() == [_unwrap_protocol(path1)] + assert "{one: int64, two: string}" in str(ds), ds + + # Single large file, default block_size + num_chars = 2500000 + num_rows = 3 + df2 = pd.DataFrame({"one": ["a" * num_chars for _ in range(num_rows)], "two": ["b" * num_chars for _ in range(num_rows)]}) + tmp2 = os.path.join(data_path, "tmp2.json") + path2 = os.path.join(data_path, "test2.json") + df2.to_json(tmp2, orient="records", lines=True, storage_options=storage_options) + # force long line to break line size + with open(path2, "w") as w_file: + with open(tmp2, "r") as r_file: + w_file.write(r_file.read().replace("\n", "")) + ds = ray.data.read_json(path2, filesystem=fs) + dsdf = ds.to_pandas() + assert df2.equals(dsdf) + # Test metadata ops. + assert ds.count() == num_rows + assert ds.input_files() == [_unwrap_protocol(path2)] + assert "{one: string, two: string}" in str(ds), ds + + # Single file, negative and zero block_size (expect failure) + df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + path3 = os.path.join(data_path, "test3.json") + df3.to_json(path3, orient="records", lines=True, storage_options=storage_options) + try: + # Negative Buffer Size + ds = ray.data.read_json(path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)) + dsdf = ds.to_pandas() + except pa.ArrowInvalid as e: + assert "Negative buffer resize" in str(e.cause) + try: + # Zero Buffer Size + ds = ray.data.read_json(path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0)) + dsdf = ds.to_pandas() + except pa.ArrowInvalid as e: + assert "Empty JSON file" in str(e.cause) + + if __name__ == "__main__": import sys From 4c347944f491a081ea7d55edf34921567649e1c6 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Wed, 17 Jan 2024 14:40:52 -0800 Subject: [PATCH 06/13] refactoring to use pyarrow buffer Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 38 +++++++------------ python/ray/data/tests/test_json.py | 19 ++++++++-- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 41b96557e60f..94a4ab1c2a86 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -36,48 +36,38 @@ def __init__( ) self.arrow_json_args = arrow_json_args - def _open_input_source( - self, - filesystem: "pyarrow.fs.FileSystem", - path: str, - **open_args, - ) -> "pyarrow.NativeFile": - # JSON requires `open_input_file` for seekable file access - return filesystem.open_input_file(path, **open_args) - - # TODO(ekl) The PyArrow JSON reader doesn't support streaming reads. def _read_stream(self, f: "pyarrow.NativeFile", path: str): + from io import BytesIO + from pyarrow import ArrowInvalid, json - # Create local copy of read_options so block_size increases are not persisted - # between _read_stream calls. - local_read_options = json.ReadOptions( - use_threads=self.read_options.use_threads, - block_size=self.read_options.block_size, - ) - init_file_pos = f.tell() + buffer = f.read_buffer() + block_size = self.read_options.block_size + use_threads = self.read_options.use_threads max_block_size = DataContext.get_current().target_max_block_size while True: try: yield json.read_json( - f, read_options=local_read_options, **self.arrow_json_args + BytesIO(buffer), + read_options=json.ReadOptions( + use_threads=use_threads, block_size=block_size + ), + **self.arrow_json_args, ) break except ArrowInvalid as e: if ( isinstance(e, ArrowInvalid) and "straddling" not in str(e) - or local_read_options.block_size > max_block_size + or block_size > max_block_size ): raise e else: # Increase the block size in case it was too small. logger.info( f"JSONDatasource read failed with " - f"block_size={local_read_options.block_size}. Retrying with " - f"block_size={local_read_options.block_size * 2}." + f"block_size={block_size}. Retrying with " + f"block_size={block_size * 2}." ) - local_read_options.block_size *= 2 - # Reset file position to re-attempt read. - f.seek(init_file_pos) + block_size *= 2 diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index efef5a1c2930..17afceb7c059 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -633,7 +633,9 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) path1 = os.path.join(data_path, "test1.json") df1.to_json(path1, orient="records", lines=True, storage_options=storage_options) - ds = ray.data.read_json(path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1)) + ds = ray.data.read_json( + path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1) + ) dsdf = ds.to_pandas() assert df1.equals(dsdf) # Test metadata ops. @@ -644,7 +646,12 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi # Single large file, default block_size num_chars = 2500000 num_rows = 3 - df2 = pd.DataFrame({"one": ["a" * num_chars for _ in range(num_rows)], "two": ["b" * num_chars for _ in range(num_rows)]}) + df2 = pd.DataFrame( + { + "one": ["a" * num_chars for _ in range(num_rows)], + "two": ["b" * num_chars for _ in range(num_rows)], + } + ) tmp2 = os.path.join(data_path, "tmp2.json") path2 = os.path.join(data_path, "test2.json") df2.to_json(tmp2, orient="records", lines=True, storage_options=storage_options) @@ -666,13 +673,17 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi df3.to_json(path3, orient="records", lines=True, storage_options=storage_options) try: # Negative Buffer Size - ds = ray.data.read_json(path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)) + ds = ray.data.read_json( + path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1) + ) dsdf = ds.to_pandas() except pa.ArrowInvalid as e: assert "Negative buffer resize" in str(e.cause) try: # Zero Buffer Size - ds = ray.data.read_json(path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0)) + ds = ray.data.read_json( + path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0) + ) dsdf = ds.to_pandas() except pa.ArrowInvalid as e: assert "Empty JSON file" in str(e.cause) From da74990064d7912d550158c71970f4bed8cb931a Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 18 Jan 2024 09:18:49 -0800 Subject: [PATCH 07/13] use smart_open Signed-off-by: Matthew Owen --- python/ray/data/tests/test_json.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 17afceb7c059..802d4a3e6ef8 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -8,6 +8,7 @@ import pyarrow.json as pajson import pytest from pytest_lazyfixture import lazy_fixture +import smart_open import ray from ray.data.block import BlockAccessor @@ -656,8 +657,8 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi path2 = os.path.join(data_path, "test2.json") df2.to_json(tmp2, orient="records", lines=True, storage_options=storage_options) # force long line to break line size - with open(path2, "w") as w_file: - with open(tmp2, "r") as r_file: + with smart_open(path2, "w") as w_file: + with smart_open(tmp2, "r") as r_file: w_file.write(r_file.read().replace("\n", "")) ds = ray.data.read_json(path2, filesystem=fs) dsdf = ds.to_pandas() From 59a3e3662cefd652d3a1187d09095fc96b055649 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 18 Jan 2024 09:59:21 -0800 Subject: [PATCH 08/13] fix smart_open call, format files Signed-off-by: Matthew Owen --- python/ray/data/tests/test_json.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 802d4a3e6ef8..9758b13f8f17 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -7,8 +7,8 @@ import pyarrow as pa import pyarrow.json as pajson import pytest -from pytest_lazyfixture import lazy_fixture import smart_open +from pytest_lazyfixture import lazy_fixture import ray from ray.data.block import BlockAccessor @@ -657,8 +657,8 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi path2 = os.path.join(data_path, "test2.json") df2.to_json(tmp2, orient="records", lines=True, storage_options=storage_options) # force long line to break line size - with smart_open(path2, "w") as w_file: - with smart_open(tmp2, "r") as r_file: + with smart_open.open(path2, "w") as w_file: + with smart_open.open(tmp2, "r") as r_file: w_file.write(r_file.read().replace("\n", "")) ds = ray.data.read_json(path2, filesystem=fs) dsdf = ds.to_pandas() From f5b387a27abf7c4e258bf9f6f5723962a905f9c2 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 18 Jan 2024 10:46:30 -0800 Subject: [PATCH 09/13] simplify test to not rewrite file as it isn't necessary to make long lines Signed-off-by: Matthew Owen --- python/ray/data/tests/test_json.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index 9758b13f8f17..1972b8056f5a 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -7,7 +7,6 @@ import pyarrow as pa import pyarrow.json as pajson import pytest -import smart_open from pytest_lazyfixture import lazy_fixture import ray @@ -653,13 +652,8 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi "two": ["b" * num_chars for _ in range(num_rows)], } ) - tmp2 = os.path.join(data_path, "tmp2.json") path2 = os.path.join(data_path, "test2.json") - df2.to_json(tmp2, orient="records", lines=True, storage_options=storage_options) - # force long line to break line size - with smart_open.open(path2, "w") as w_file: - with smart_open.open(tmp2, "r") as r_file: - w_file.write(r_file.read().replace("\n", "")) + df2.to_json(path2, orient="records", lines=True, storage_options=storage_options) ds = ray.data.read_json(path2, filesystem=fs) dsdf = ds.to_pandas() assert df2.equals(dsdf) From b5a6ffc2df89197d0aea889b8708314ca50a7dd2 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 18 Jan 2024 12:51:42 -0800 Subject: [PATCH 10/13] updating docs to reflect changes Signed-off-by: Matthew Owen --- python/ray/data/read_api.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 36cb20222b9f..665e2a195134 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1045,9 +1045,10 @@ def read_json( When reading large files, the default block size configured in PyArrow can be too small, resulting in the following error: ``pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries - (try to increase block size?)``. - - To resolve this, use the ``read_options`` parameter to set a larger block size: + (try to increase block size?)``. The read will be retried with geometrically + increasing block size until the size reaches `DataContext.get_current().target_max_block_size`. + The initial block size will start at the PyArrow default block size or it can be + manually set through the ``read_options`` parameter as follows: >>> import pyarrow.json as pajson >>> block_size = 10 << 20 # Set block size to 10MB From 838933bd88a9b829a02bcdcb58a1b9a9ed9d9c01 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Fri, 19 Jan 2024 13:56:56 -0800 Subject: [PATCH 11/13] pr feedback Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 58 ++++++++++++------- python/ray/data/read_api.py | 15 ----- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 94a4ab1c2a86..008865fc1372 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -1,6 +1,6 @@ -import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from ray.data._internal.dataset_logger import DatasetLogger from ray.data.context import DataContext from ray.data.datasource.file_based_datasource import FileBasedDatasource from ray.util.annotations import PublicAPI @@ -8,7 +8,7 @@ if TYPE_CHECKING: import pyarrow -logger = logging.getLogger(__name__) +logger = DatasetLogger(__name__) @PublicAPI @@ -42,32 +42,50 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): from pyarrow import ArrowInvalid, json + # When reading large files, the default block size configured in PyArrow can be + # too small, resulting in the following error: `pyarrow.lib.ArrowInvalid: + # straddling object straddles two block boundaries (try to increase block + # size?)`. The read will be retried with geometrically increasing block size + # until the size reaches `DataContext.get_current().target_max_block_size`. + # The initial block size will start at the PyArrow default block size or it can + # be manually set through the `read_options` parameter as follows. + # + # >>> import pyarrow.json as pajson + # >>> block_size = 10 << 20 # Set block size to 10MB + # >>> ray.data.read_json( # doctest: +SKIP + # ... "s3://anonymous@ray-example-data/log.json", + # ... read_options=pajson.ReadOptions(block_size=block_size) + # ... ) + buffer = f.read_buffer() - block_size = self.read_options.block_size - use_threads = self.read_options.use_threads + init_block_size = self.read_options.block_size max_block_size = DataContext.get_current().target_max_block_size while True: try: yield json.read_json( BytesIO(buffer), - read_options=json.ReadOptions( - use_threads=use_threads, block_size=block_size - ), + read_options=self.read_options, **self.arrow_json_args, ) + self.read_options.block_size = init_block_size break except ArrowInvalid as e: - if ( - isinstance(e, ArrowInvalid) - and "straddling" not in str(e) - or block_size > max_block_size - ): - raise e + if isinstance(e, ArrowInvalid) and "straddling" in str(e): + if self.read_options.block_size < max_block_size: + # Increase the block size in case it was too small. + logger.get_logger(log_to_stdout=False).info( + f"JSONDatasource read failed with " + f"block_size={self.read_options.block_size}. Retrying with " + f"block_size={self.read_options.block_size * 2}." + ) + self.read_options.block_size *= 2 + else: + raise ArrowInvalid( + f"{e} - Auto-increasing block size to " + f"{self.read_options.block_size}B failed. " + f"More information on this issue can be found here: " + f"https://github.com/apache/arrow/issues/25674" + ) else: - # Increase the block size in case it was too small. - logger.info( - f"JSONDatasource read failed with " - f"block_size={block_size}. Retrying with " - f"block_size={block_size * 2}." - ) - block_size *= 2 + # unrelated error, simply reraise + raise e diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 665e2a195134..dba9e934954a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1042,21 +1042,6 @@ def read_json( >>> ds.take(1) [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}] - When reading large files, the default block size configured in PyArrow can be too small, - resulting in the following error: - ``pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries - (try to increase block size?)``. The read will be retried with geometrically - increasing block size until the size reaches `DataContext.get_current().target_max_block_size`. - The initial block size will start at the PyArrow default block size or it can be - manually set through the ``read_options`` parameter as follows: - - >>> import pyarrow.json as pajson - >>> block_size = 10 << 20 # Set block size to 10MB - >>> ray.data.read_json( # doctest: +SKIP - ... "s3://anonymous@ray-example-data/log.json", - ... read_options=pajson.ReadOptions(block_size=block_size) - ... ) - Args: paths: A single file or directory, or a list of file or directory paths. A list of paths can contain both files and directories. From 0177858683c71d08cc392c138f35b1327180298c Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Fri, 19 Jan 2024 17:37:10 -0800 Subject: [PATCH 12/13] respond to a few more nits Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index 008865fc1372..bd552b766216 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -45,10 +45,12 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): # When reading large files, the default block size configured in PyArrow can be # too small, resulting in the following error: `pyarrow.lib.ArrowInvalid: # straddling object straddles two block boundaries (try to increase block - # size?)`. The read will be retried with geometrically increasing block size - # until the size reaches `DataContext.get_current().target_max_block_size`. - # The initial block size will start at the PyArrow default block size or it can - # be manually set through the `read_options` parameter as follows. + # size?)`. More information on this issue can be found here: + # https://github.com/apache/arrow/issues/25674 The read will be retried with + # geometrically increasing block size until the size reaches + # `DataContext.get_current().target_max_block_size`. The initial block size + # will start at the PyArrow default block size or it can be manually set + # through the `read_options` parameter as follows. # # >>> import pyarrow.json as pajson # >>> block_size = 10 << 20 # Set block size to 10MB @@ -70,10 +72,10 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): self.read_options.block_size = init_block_size break except ArrowInvalid as e: - if isinstance(e, ArrowInvalid) and "straddling" in str(e): + if "straddling object straddles two block boundaries" in str(e): if self.read_options.block_size < max_block_size: # Increase the block size in case it was too small. - logger.get_logger(log_to_stdout=False).info( + logger.get_logger(log_to_stdout=True).info( f"JSONDatasource read failed with " f"block_size={self.read_options.block_size}. Retrying with " f"block_size={self.read_options.block_size * 2}." @@ -82,7 +84,7 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): else: raise ArrowInvalid( f"{e} - Auto-increasing block size to " - f"{self.read_options.block_size}B failed. " + f"{self.read_options.block_size} bytes failed. " f"More information on this issue can be found here: " f"https://github.com/apache/arrow/issues/25674" ) From 76dead96b3c324d12d701ab9330df5dde7a14305 Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Mon, 22 Jan 2024 10:02:53 -0800 Subject: [PATCH 13/13] turn of log to stdout Signed-off-by: Matthew Owen --- python/ray/data/datasource/json_datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/datasource/json_datasource.py b/python/ray/data/datasource/json_datasource.py index bd552b766216..673bdcb24ab2 100644 --- a/python/ray/data/datasource/json_datasource.py +++ b/python/ray/data/datasource/json_datasource.py @@ -75,7 +75,7 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str): if "straddling object straddles two block boundaries" in str(e): if self.read_options.block_size < max_block_size: # Increase the block size in case it was too small. - logger.get_logger(log_to_stdout=True).info( + logger.get_logger(log_to_stdout=False).info( f"JSONDatasource read failed with " f"block_size={self.read_options.block_size}. Retrying with " f"block_size={self.read_options.block_size * 2}."