Skip to content

Commit

Permalink
Buffer/chunk size tweaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Dec 5, 2023
1 parent 874e35b commit 1f69b26
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
1 change: 1 addition & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ def read_json(
read_options: JsonReadOptions | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
max_chunks_in_flight: int | None = None,
): ...
def read_json_schema(
uri: str,
Expand Down
2 changes: 2 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def read_json(
read_options: JsonReadOptions | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
max_chunks_in_flight: int | None = None,
) -> Table:
return Table._from_pytable(
_read_json(
Expand All @@ -492,6 +493,7 @@ def read_json(
read_options=read_options,
io_config=io_config,
multithreaded_io=multithreaded_io,
max_chunks_in_flight=max_chunks_in_flight,
)
)

Expand Down
4 changes: 3 additions & 1 deletion src/daft-json/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod pylib {

use crate::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};

#[allow(clippy::too_many_arguments)]
#[pyfunction]
pub fn read_json(
py: Python,
Expand All @@ -17,6 +18,7 @@ pub mod pylib {
read_options: Option<JsonReadOptions>,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
max_chunks_in_flight: Option<usize>,
) -> PyResult<PyTable> {
py.allow_threads(|| {
let io_stats = IOStatsContext::new(format!("read_json: for uri {uri}"));
Expand All @@ -33,7 +35,7 @@ pub mod pylib {
io_client,
Some(io_stats),
multithreaded_io.unwrap_or(true),
None,
max_chunks_in_flight,
)?
.into())
})
Expand Down
28 changes: 20 additions & 8 deletions src/daft-json/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ async fn read_json_single_into_stream(
// Use user-provided buffer size, falling back to 8 * the user-provided chunk size if that exists, otherwise falling back to 512 KiB as the default.
let buffer_size = read_options
.as_ref()
.and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs)))
.unwrap_or(512 * 1024);
.and_then(|opt| {
opt.buffer_size
.or_else(|| opt.chunk_size.map(|cs| (64 * cs).min(256 * 1024 * 1024)))
})
.unwrap_or(256 * 1024);
(
Box::new(BufReader::with_capacity(
buffer_size,
Expand All @@ -231,21 +234,30 @@ async fn read_json_single_into_stream(
buffer_size,
read_options
.as_ref()
.and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8)))
.unwrap_or(64 * 1024),
.and_then(|opt| {
opt.chunk_size
.or_else(|| opt.buffer_size.map(|bs| (bs / 64).max(16)))
})
.unwrap_or(64),
)
}
GetResult::Stream(stream, _, _) => (
Box::new(StreamReader::new(stream)),
// Use user-provided buffer size, falling back to 8 * the user-provided chunk size if that exists, otherwise falling back to 512 KiB as the default.
read_options
.as_ref()
.and_then(|opt| opt.buffer_size.or_else(|| opt.chunk_size.map(|cs| 8 * cs)))
.unwrap_or(512 * 1024),
.and_then(|opt| {
opt.buffer_size
.or_else(|| opt.chunk_size.map(|cs| (256 * cs).min(256 * 1024 * 1024)))
})
.unwrap_or(8 * 1024 * 1024),
read_options
.as_ref()
.and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8)))
.unwrap_or(64 * 1024),
.and_then(|opt| {
opt.chunk_size
.or_else(|| opt.buffer_size.map(|bs| (bs / 256).max(16)))
})
.unwrap_or(64),
),
};
// If file is compressed, wrap stream in decoding stream.
Expand Down

0 comments on commit 1f69b26

Please sign in to comment.