Skip to content

Commit

Permalink
[FEAT] Add streaming + parallel CSV reader, with decompression suppor…
Browse files Browse the repository at this point in the history
…t. (#1501)

This PR adds streaming + parallel CSV reading and parsing, along with
support for streaming decompression. In particular, this PR:
- Adds support for streaming decompression for brotli, bz, deflate,
gzip, lzma, xz, zlib, and zstd.
- Performs chunk-based streaming CSV reads, filling up a small buffer of
unparsed records.
- Pipelines chunk-based CSV parsing with reading by spawning Tokio +
rayon parsing tasks.
- Performances chunk parsing, as well as column parsing within a chunk,
in parallel on the rayon threadpool.
- Changes schema inference to involve an (at most) 1 MiB file peak
rather than a full file read.
- Gathers a mean row size in bytes estimate during schema inference and
propagates this estimate back to the reader.
- Unifies local and cloud reads + schema inference.
- Adds thorough Rust-side local + cloud test coverage.

The streaming + parallel reading + parsing leads to a 4-8x speed up over
the pyarrow reader and the previous non-parallel reader when
benchmarking large file (~1 GB) reads, while also resulting in lower
memory utilization due to the streaming reading + parsing.

## TODOs (follow-up PRs)

- [ ] Add snappy decompression support (need to essentially do something
like
[this](https://github.com/belltoy/tokio-snappy/blob/master/src/lib.rs))
  • Loading branch information
clarkzinzow authored Oct 20, 2023
1 parent 76e256a commit ad829c9
Show file tree
Hide file tree
Showing 42 changed files with 2,191 additions and 158 deletions.
142 changes: 139 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ members = [

[workspace.dependencies]
async-compat = "0.2.1"
async-compression = {version = "0.4.4", features = ["tokio", "all-algorithms"]}
async-stream = "0.3.5"
bytes = "1.4.0"
futures = "0.3.28"
Expand All @@ -88,15 +89,16 @@ prettytable-rs = "0.10"
rand = "^0.8"
rayon = "1.7.0"
serde_json = "1.0.104"
snafu = "0.7.4"
snafu = {version = "0.7.4", features = ["futures"]}
tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}
tokio-util = "0.7.8"
url = "2.4.0"

[workspace.dependencies.arrow2]
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
rev = "065a31da8fd8a75cbece5f99295a4068713a71ed"
rev = "0a6f79e0da7e32cc30381f4cc8cf5a8483909f78"

[workspace.dependencies.bincode]
version = "1.3.3"
Expand Down
13 changes: 12 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,16 @@ class CsvSourceConfig:

delimiter: str
has_headers: bool
buffer_size: int | None
chunk_size: int | None

def __init__(self, delimiter: str, has_headers: bool): ...
def __init__(
self,
delimiter: str,
has_headers: bool,
buffer_size: int | None = None,
chunk_size: int | None = None,
): ...

class JsonSourceConfig:
"""
Expand Down Expand Up @@ -425,6 +433,9 @@ def read_csv(
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
schema: PySchema | None = None,
buffer_size: int | None = None,
chunk_size: int | None = None,
): ...
def read_csv_schema(
uri: str,
Expand Down
2 changes: 2 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ def _handle_tabular_files_scan(
csv_options=TableParseCSVOptions(
delimiter=format_config.delimiter,
header_index=0 if format_config.has_headers else None,
buffer_size=format_config.buffer_size,
chunk_size=format_config.chunk_size,
),
read_options=read_options,
)
Expand Down
9 changes: 8 additions & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def read_csv(
delimiter: str = ",",
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
_buffer_size: Optional[int] = None,
_chunk_size: Optional[int] = None,
) -> DataFrame:
"""Creates a DataFrame from CSV file(s)
Expand Down Expand Up @@ -62,7 +64,12 @@ def read_csv(
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of CSV filepaths")

csv_config = CsvSourceConfig(delimiter=delimiter, has_headers=has_headers)
csv_config = CsvSourceConfig(
delimiter=delimiter,
has_headers=has_headers,
buffer_size=_buffer_size,
chunk_size=_chunk_size,
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
Expand Down
4 changes: 4 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ class TableParseCSVOptions:
Args:
delimiter: The delimiter to use when parsing CSVs, defaults to ","
header_index: Index of the header row, or None if no header
buffer_size: Size of the buffer (in bytes) used by the streaming reader.
chunk_size: Size of the chunks (in bytes) deserialized in parallel by the streaming reader.
"""

delimiter: str = ","
header_index: int | None = 0
buffer_size: int | None = None
chunk_size: int | None = None


@dataclass(frozen=True)
Expand Down
6 changes: 6 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ def read_csv(
delimiter: str | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
schema: Schema | None = None,
buffer_size: int | None = None,
chunk_size: int | None = None,
) -> Table:
return Table._from_pytable(
_read_csv(
Expand All @@ -457,6 +460,9 @@ def read_csv(
delimiter=delimiter,
io_config=io_config,
multithreaded_io=multithreaded_io,
schema=schema._schema if schema is not None else None,
buffer_size=buffer_size,
chunk_size=chunk_size,
)
)

Expand Down
3 changes: 3 additions & 0 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def read_csv(
has_header=has_header,
delimiter=csv_options.delimiter,
io_config=config.io_config,
schema=schema,
buffer_size=csv_options.buffer_size,
chunk_size=csv_options.chunk_size,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)

Expand Down
2 changes: 1 addition & 1 deletion src/daft-core/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::{DaftArrayType, Field};
use crate::series::Series;
use crate::DataType;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FixedSizeListArray {
pub field: Arc<Field>,
pub flat_child: Series,
Expand Down
Loading

0 comments on commit ad829c9

Please sign in to comment.