Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Skip empty JSON files in read_json() #47378

Merged
merged 3 commits into from
Sep 4, 2024

Conversation

venkatram-dev
Copy link
Contributor

@venkatram-dev venkatram-dev commented Aug 28, 2024

Why are these changes needed?

#47198
Skip empty files and do not raise json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Related issue number

#47198

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@venkatram-dev
Copy link
Contributor Author

Tested the changes in local

Before the change

python t.py 
2024-08-27 22:00:47,024	INFO worker.py:1783 -- Started a local Ray instance.
Using files: ['/Users/vr/ray_test_venv/sample_data/file2.json.gz', '/Users/vr/ray_test_venv/sample_data/empty_file.json.gz', '/Users/vr/ray_test_venv/sample_data/file1.json.gz']
2024-08-27 22:00:48,306	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-00-46_401777_16215/logs/ray-data
2024-08-27 22:00:48,306	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Schema of the dataset: Column  Type
------  ----
col1    string
col2    string
col3    string
2024-08-27 22:00:48,538	INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-08-27 22:00:48,542	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-00-46_401777_16215/logs/ray-data
2024-08-27 22:00:48,542	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=5]
(ReadJSON->SplitBlocks(6) pid=16228) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=16228) Empty JSON file
2024-08-27 22:00:48,606	ERROR worker.py:409 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadJSON->SplitBlocks(6)() (pid=16228, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in __call__
    for block in blocks:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
    for data in iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 103, in do_read
    yield from call_with_retry(
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
    yield from result
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
    yield from read_files(read_paths)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
    for block in read_stream(f, read_path):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 139, in _read_stream
    yield from self._read_with_python_json(buffer)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 108, in _read_with_python_json
    parsed_json = json.load(BytesIO(buffer))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 293, in load
    return loads(fp.read(),
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
2024-08-27 22:00:48,820	ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "ReadJSON->SplitBlocks(6)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
Traceback (most recent call last):
  File "/Users/vr/ray_test_venv/t.py", line 38, in <module>
    print("First few records:\n", ds.take(5))
                                  ^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/dataset.py", line 2416, in take
    for row in limited_ds.iter_rows():
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 238, in _wrapped_iterator
    for batch in batch_iterable:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/iterator.py", line 178, in _create_iterator
    for batch in iterator:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 178, in iter_batches
    next_batch = next(async_batch_iter)
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 932, in make_async_gen
    raise next_item
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 909, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 211, in extract_data_from_batch
    for batch in batch_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 313, in restore_original_order
    for batch in batch_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 932, in make_async_gen
    raise next_item
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 909, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 220, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 159, in format_batches
    for batch in block_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 889, in __next__
    return next(self.it)
           ^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 118, in blocks_to_batches
    for block in block_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py", line 55, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 288, in prefetch_batches_locally
    next_ref_bundle = next(ref_bundles)
                      ^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/util.py", line 889, in __next__
    return next(self.it)
           ^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
           ^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
    item = self._outer._output_node.get_output_blocking(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 296, in get_output_blocking
    raise self._exception
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 232, in run
    continue_sched = self._scheduling_loop_step(self._topology)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 470, in process_completed_tasks
    raise e from None
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 437, in process_completed_tasks
    bytes_read = task.on_data_ready(
                 ^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 105, in on_data_ready
    raise ex from None
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 101, in on_data_ready
    ray.get(block_ref)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/worker.py", line 2661, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(JSONDecodeError): ray::ReadJSON->SplitBlocks(6)() (pid=16225, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 451, in __call__
    for block in blocks:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
    for data in iter:
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/planner/plan_read_op.py", line 103, in do_read
    yield from call_with_retry(
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/datasource.py", line 168, in __call__
    yield from result
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 254, in read_task_fn
    yield from read_files(read_paths)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py", line 220, in read_files
    for block in read_stream(f, read_path):
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 139, in _read_stream
    yield from self._read_with_python_json(buffer)
  File "/Users/vr/ray_test_venv/venv/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.py", line 108, in _read_with_python_json
    parsed_json = json.load(BytesIO(buffer))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 293, in load
    return loads(fp.read(),
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
(ReadJSON->SplitBlocks(6) pid=16225) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=16225) Empty JSON file


After the change

python t.py
2024-08-27 22:10:03,861	INFO worker.py:1783 -- Started a local Ray instance.
Using files: ['/Users/vr/ray_test_venv/sample_data/file2.json.gz', '/Users/vr/ray_test_venv/sample_data/empty_file.json.gz', '/Users/vr/ray_test_venv/sample_data/file1.json.gz']
2024-08-27 22:10:05,156	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-10-03_240198_18060/logs/ray-data
2024-08-27 22:10:05,156	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON]
[dataset]: Run `pip install tqdm` to enable progress reporting.
Schema of the dataset: Column  Type
------  ----
col1    string
col2    string
col3    string
2024-08-27 22:10:05,387	INFO dataset.py:2409 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-08-27 22:10:05,391	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-27_22-10-03_240198_18060/logs/ray-data
2024-08-27 22:10:05,391	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=5]
(ReadJSON->SplitBlocks(6) pid=18074) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=18074) Empty JSON file
First few records:
 [{'col1': 'value1', 'col2': 'value2', 'col3': 'value3'}, {'col1': 'value4', 'col2': 'value5', 'col3': 'value6'}, {'col1': 'value1', 'col2': 'value2', 'col3': 'value3'}, {'col1': 'value4', 'col2': 'value5', 'col3': 'value6'}]
(ReadJSON->SplitBlocks(6) pid=18073) Error reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
(ReadJSON->SplitBlocks(6) pid=18073) Empty JSON file


@venkatram-dev
Copy link
Contributor Author

@scottjlee , Please review.

Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add a unit test in test_json.py to test the change in this PR?
you can follow the logic in the reproducible example from the original issue. thanks!

@@ -101,6 +101,11 @@ def _read_with_python_json(self, buffer: "pyarrow.lib.Buffer"):

import pyarrow as pa

# Check if the buffer is empty
if buffer.size == 0:
yield pa.Table.from_pylist([]) # Yield an empty PyArrow Table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case of an empty file, i think we can simply return out, instead of yielding an empty table. but let's confirm with a unit test.

Signed-off-by: venkatram-dev <[email protected]>
Signed-off-by: venkatram-dev <[email protected]>
Signed-off-by: venkatram-dev <[email protected]>
@venkatram-dev
Copy link
Contributor Author

could you also add a unit test in test_json.py to test the change in this PR? you can follow the logic in the reproducible example from the original issue. thanks!

@scottjlee , Added Unit test to read from a file path containing both empty file and non empty file.
Please check.

Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the fix!

@scottjlee scottjlee changed the title skip_empty_json_files [Data] Skip empty JSON files in read_json() Sep 4, 2024
@scottjlee scottjlee added the go add ONLY when ready to merge, run all tests label Sep 4, 2024
Copy link
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm, thanks!

@scottjlee scottjlee merged commit 2414c0c into ray-project:master Sep 4, 2024
6 checks passed
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
## Why are these changes needed?
Skip empty files and do not raise json.decoder.JSONDecodeError:
Expecting value: line 1 column 1 (char 0)

## Related issue number
Closes ray-project#47198

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: venkatram-dev <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants