Skip to content

Commit

Permalink
[FEAT] Streaming physical writes for native executor (Eventual-Inc#2992)
Browse files Browse the repository at this point in the history
Streaming writes for swordfish (parquet + csv only). Iceberg and delta
writes are here: Eventual-Inc#2966

Implement streaming writes as a blocking sink. Unpartitioned writes run
with 1 worker, and Partitioned writes run with NUM_CPUs workers. As a
drive by, made blocking sinks parallelizable.

**Behaviour**
- Unpartitioned: Make writes to a `TargetFileSizeWriter`, which manages
file sizes and row group sizes, as data is streamed in.

- Partitioned: Partition data via a `Dispatcher` and send to workers
based on the hash. Each worker runs a `PartitionedWriter` that manages
partitioning by value, file sizes, and row group sizes.


**Benchmarks:**
I made a new benchmark suite in
`tests/benchmarks/test_streaming_writes.py`, it tests writes of tpch
lineitem to parquet/csv with/without partition columns and different
file/rowgroup size. The streaming executor performs much better when
there are partition columns, as seen in this screenshot. Without
partition columns it is about the same, when target row group size /
file size is decreased, it is slightly slower. Likely due to the fact
that probably does more slicing, but will need to investigate more.
Memory usage is the same for both.
<img width="1400" alt="Screenshot 2024-10-03 at 11 22 32 AM"
src="https://github.com/user-attachments/assets/53b4d77d-553a-4181-8a4d-9eddaa3adaf7">

Memory test on read->write parquet tpch lineitem sf1:
Native:
<img width="1078" alt="Screenshot 2024-10-08 at 1 48 34 PM"
src="https://github.com/user-attachments/assets/3eda33c6-9413-415f-b808-ac3c7437e269">

Python:
<img width="1090" alt="Screenshot 2024-10-08 at 1 48 50 PM"
src="https://github.com/user-attachments/assets/f92b9a9f-a3b5-408b-98d5-4ba2d66b7be4">

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
4 people authored and sagiahrac committed Nov 4, 2024
1 parent d753b44 commit f5bcd4d
Show file tree
Hide file tree
Showing 49 changed files with 2,135 additions and 421 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-sql = {path = "src/daft-sql", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
daft-writers = {path = "src/daft-writers", default-features = false}
lazy_static = {workspace = true}
log = {workspace = true}
lzma-sys = {version = "*", features = ["static"]}
Expand All @@ -53,12 +55,20 @@ python = [
"daft-local-execution/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
"daft-sql/python",
"daft-stats/python",
"daft-table/python",
"daft-functions/python",
"daft-functions-json/python",
"daft-writers/python",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python",
"dep:pyo3",
"dep:pyo3-log"
]
Expand Down Expand Up @@ -141,6 +151,7 @@ members = [
"src/daft-scheduler",
"src/daft-sketch",
"src/daft-sql",
"src/daft-writers",
"src/daft-table",
"src/hyperloglog",
"src/parquet2"
Expand Down
159 changes: 159 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import uuid
from abc import ABC, abstractmethod
from typing import Optional

from daft.daft import IOConfig
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.series import Series
from daft.table.micropartition import MicroPartition
from daft.table.partitioning import (
partition_strings_to_path,
partition_values_to_str_mapping,
)
from daft.table.table import Table


class FileWriterBase(ABC):
def __init__(
self,
root_dir: str,
file_idx: int,
file_format: str,
partition_values: Optional[Table] = None,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
default_partition_fallback: str = "__HIVE_DEFAULT_PARTITION__",
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.partition_values = partition_values
if self.partition_values is not None:
partition_strings = {
key: values.to_pylist()[0]
for key, values in partition_values_to_str_mapping(self.partition_values).items()
}
self.dir_path = partition_strings_to_path(self.resolved_path, partition_strings, default_partition_fallback)
else:
self.dir_path = f"{self.resolved_path}"

self.full_path = f"{self.dir_path}/{self.file_name}"
if is_local_fs:
self.fs.create_dir(self.dir_path, recursive=True)

self.compression = compression if compression is not None else "none"

@abstractmethod
def write(self, table: MicroPartition) -> None:
"""Write data to the file using the appropriate writer.
Args:
table: MicroPartition containing the data to be written.
"""
pass

@abstractmethod
def close(self) -> Table:
"""Close the writer and return metadata about the written file. Write should not be called after close.
Returns:
Table containing metadata about the written file, including path and partition values.
"""
pass


class ParquetFileWriter(FileWriterBase):
def __init__(
self,
root_dir: str,
file_idx: int,
partition_values: Optional[Table] = None,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(
root_dir=root_dir,
file_idx=file_idx,
file_format="parquet",
partition_values=partition_values,
compression=compression,
io_config=io_config,
)
self.is_closed = False
self.current_writer: Optional[pq.ParquetWriter] = None

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)

def write(self, table: MicroPartition) -> None:
assert not self.is_closed, "Cannot write to a closed ParquetFileWriter"
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

def close(self) -> Table:
if self.current_writer is not None:
self.current_writer.close()

self.is_closed = True
metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)


class CSVFileWriter(FileWriterBase):
def __init__(
self,
root_dir: str,
file_idx: int,
partition_values: Optional[Table] = None,
io_config: Optional[IOConfig] = None,
):
super().__init__(
root_dir=root_dir,
file_idx=file_idx,
file_format="csv",
partition_values=partition_values,
io_config=io_config,
)
self.current_writer: Optional[pacsv.CSVWriter] = None
self.is_closed = False

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
return pacsv.CSVWriter(
self.full_path,
schema,
)

def write(self, table: MicroPartition) -> None:
assert not self.is_closed, "Cannot write to a closed CSVFileWriter"
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

def close(self) -> Table:
if self.current_writer is not None:
self.current_writer.close()

self.is_closed = True
metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)
38 changes: 25 additions & 13 deletions daft/table/partitioning.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from daft import Series
from daft.expressions import ExpressionsProjection
from daft.table.table import Table

from .micropartition import MicroPartition


def partition_strings_to_path(
root_path: str, parts: Dict[str, str], partition_null_fallback: str = "__HIVE_DEFAULT_PARTITION__"
root_path: str,
parts: Dict[str, str],
partition_null_fallback: str = "__HIVE_DEFAULT_PARTITION__",
) -> str:
keys = parts.keys()
values = [partition_null_fallback if value is None else value for value in parts.values()]
postfix = "/".join(f"{k}={v}" for k, v in zip(keys, values))
return f"{root_path}/{postfix}"


def partition_values_to_str_mapping(
partition_values: Union[MicroPartition, Table],
) -> Dict[str, Series]:
null_part = Series.from_pylist(
[None]
) # This is to ensure that the null values are replaced with the default_partition_fallback value
pkey_names = partition_values.column_names()

partition_strings = {}

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled

return partition_strings


class PartitionedTable:
def __init__(self, table: MicroPartition, partition_keys: Optional[ExpressionsProjection]):
self.table = table
Expand Down Expand Up @@ -56,20 +78,10 @@ def partition_values_str(self) -> Optional[MicroPartition]:
If the table is not partitioned, returns None.
"""
null_part = Series.from_pylist([None])
partition_values = self.partition_values()

if partition_values is None:
return None
else:
pkey_names = partition_values.column_names()

partition_strings = {}

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled

partition_strings = partition_values_to_str_mapping(partition_values)
return MicroPartition.from_pydict(partition_strings)
13 changes: 12 additions & 1 deletion src/daft-core/src/utils/identity_hash_set.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::hash::{BuildHasherDefault, Hasher};
use std::hash::{BuildHasherDefault, Hash, Hasher};

pub type IdentityBuildHasher = BuildHasherDefault<IdentityHasher>;

Expand Down Expand Up @@ -27,3 +27,14 @@ impl Hasher for IdentityHasher {
self.hash = i;
}
}

pub struct IndexHash {
pub idx: u64,
pub hash: u64,
}

impl Hash for IndexHash {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash);
}
}
1 change: 1 addition & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod object_store_glob;
mod s3_like;
mod stats;
mod stream_utils;

use azure_blob::AzureBlobSource;
use common_file_formats::FileFormat;
use google_cloud::GCSSource;
Expand Down
4 changes: 3 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
async-trait = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
Expand All @@ -17,6 +18,7 @@ daft-physical-plan = {path = "../daft-physical-plan", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
daft-writers = {path = "../daft-writers", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
lazy_static = {workspace = true}
Expand All @@ -29,7 +31,7 @@ tokio-stream = {workspace = true}
tracing = {workspace = true}

[features]
python = ["dep:pyo3", "common-daft-config/python", "common-file-formats/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]
python = ["dep:pyo3", "common-daft-config/python", "common-file-formats/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "daft-writers/python", "common-display/python"]

[lints]
workspace = true
Expand Down
Loading

0 comments on commit f5bcd4d

Please sign in to comment.