Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [JSON Reader] Add native streaming + parallel JSON reader. #1679

Merged
merged 6 commits into from
Dec 6, 2023

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Nov 29, 2023

This PR adds a streaming + parallel JSON reader, with full support for most fundamental dtypes (sans decimal and binary types), arbitrary nesting with JSON lists and objects, including nulls at all levels of the JSON object tree.

TODOs

  • Add schema inference unit test for dtype coverage (i.e. reading the dtypes.jsonl file).
  • Add temporal type inference + parsing test coverage.
  • Benchmarking + performance audit: this reader follows the same general concurrency + parallelism model of the streaming CSV reader, which performs relatively well for cloud reads, but there's bound to be a lot of low-hanging fruit around unnecessary copies.
  • (Follow-up?) Add thorough parsing and dtype inference unit tests on in-memory defined JSON strings.
  • (Follow-up) Support for decimal and (large) binary types.
  • (Follow-up) Add support for strict parsing, i.e. returning an error instead of falling back to a null value when parsing fails.
  • (Follow-up) Misc. bugs in Arrow2 that should be fixed and upstreamed.
  • (Follow-up) Deflate compression support.

@clarkzinzow clarkzinzow changed the title [JSON Reader] Add native streaming + parallel JSON reader. [FEAT] [JSON Reader] Add native streaming + parallel JSON reader. Nov 29, 2023
@github-actions github-actions bot added the enhancement New feature or request label Nov 29, 2023
Copy link

codecov bot commented Nov 29, 2023

Codecov Report

Merging #1679 (ce4ac30) into main (a53cd51) will decrease coverage by 3.22%.
The diff coverage is 96.66%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1679      +/-   ##
==========================================
- Coverage   85.11%   81.89%   -3.22%     
==========================================
  Files          55       55              
  Lines        5368     5391      +23     
==========================================
- Hits         4569     4415     -154     
- Misses        799      976     +177     
Files Coverage Δ
daft/execution/execution_step.py 85.15% <ø> (-7.82%) ⬇️
daft/io/_json.py 100.00% <100.00%> (ø)
daft/logical/schema.py 89.47% <100.00%> (-1.44%) ⬇️
daft/table/micropartition.py 89.74% <100.00%> (+0.16%) ⬆️
daft/table/schema_inference.py 98.27% <100.00%> (+0.12%) ⬆️
daft/table/table_io.py 95.67% <100.00%> (+0.16%) ⬆️
daft/table/table.py 59.39% <80.00%> (-24.62%) ⬇️

... and 9 files with indirect coverage changes

@clarkzinzow clarkzinzow force-pushed the clark/json-reader branch 3 times, most recently from 0da989c to 9b1b830 Compare December 1, 2023 17:03
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really clean! Just a few minor bugs and issues!

file_format_config = FileFormatConfig.from_json_config(json_config)
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
multithreaded_io = not context.get_context().is_ray_runner
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not doing that for the CSV reader at the moment, not sure if the CSV and JSON readers suffers from the same issues as the Parquet reader here, and I'd want to ensure that benchmarks aren't negatively effected before disabling multithreaded reads on the Ray runner!

storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))

daft/table/table_io.py Outdated Show resolved Hide resolved
read_options=json_read_options,
io_config=config.io_config,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the rational for passing in the schema both in the read options and then to _cast_table_to_schema ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema is used by the JSON reader for deserialization, and then _cast_table_to_schema() is a file format agnostic utility for ensuring that the table read from each file (1) has its dtypes coerced to the inferred global schema, (2) has a column ordering that matches the inferred global schema, and (3) applies column pruning imposed by projections.

def _cast_table_to_schema(table: MicroPartition, read_options: TableReadOptions, schema: Schema) -> pa.Table:
"""Performs a cast of a Daft MicroPartition to the requested Schema/Data. This is required because:
1. Data read from the datasource may have types that do not match the inferred global schema
2. Data read from the datasource may have columns that are out-of-order with the inferred schema
3. We may need only a subset of columns, or differently-ordered columns, in `read_options`
This helper function takes care of all that, ensuring that the resulting MicroPartition has all column types matching
their corresponding dtype in `schema`, and column ordering/inclusion matches `read_options.column_names` (if provided).
"""
pruned_schema = schema
# If reading only a subset of fields, prune the schema
if read_options.column_names is not None:
pruned_schema = Schema._from_fields([schema[name] for name in read_options.column_names])
table = table.cast_to_schema(pruned_schema)
return table

cc @jaychia for that utility

src/daft-decoding/src/compression.rs Outdated Show resolved Hide resolved
@@ -6,6 +6,7 @@ daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-json = {path = "../daft-json", default-features = false}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may also need to add daft-json/python to the python feature in this toml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't think so, none of the daft_json/python APIs should be used from daft_micropartition, and all pyo3-exposed classes (e.g. the parse and read options) should be usable from Rust without the pyo3 wrapping.

Let me know if I'm misunderstanding this dependency!

src/daft-json/src/inference.rs Show resolved Hide resolved
src/daft-json/src/inference.rs Outdated Show resolved Hide resolved
src/daft-json/src/inference.rs Show resolved Hide resolved
src/daft-json/src/read.rs Outdated Show resolved Hide resolved
fn deserialize_into<'a, A: Borrow<Value<'a>>>(target: &mut Box<dyn MutableArray>, rows: &[A]) {
match target.data_type() {
DataType::Null => {
// TODO(Clark): Return an error if any of rows are not Value::Null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this TODO important?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, we still need to add a "strict" parsing mode for both the CSV reader and JSON reader, so this TODO can be addressed when we add that parsing mode; for now, the readers are very forgiving, falling back to UTF8 when parsing fails and trusting the inferred dtype (e.g. here with the DataType::Null dtype).

@clarkzinzow clarkzinzow merged commit 3693c22 into main Dec 6, 2023
39 of 40 checks passed
@clarkzinzow clarkzinzow deleted the clark/json-reader branch December 6, 2023 02:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants