Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming physical writes for native executor #2992

Merged
merged 18 commits into from
Oct 31, 2024
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-sql = {path = "src/daft-sql", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
daft-writers = {path = "src/daft-writers", default-features = false}
lazy_static = {workspace = true}
log = {workspace = true}
lzma-sys = {version = "*", features = ["static"]}
Expand All @@ -53,12 +55,20 @@ python = [
"daft-local-execution/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
"daft-sql/python",
"daft-stats/python",
"daft-table/python",
"daft-functions/python",
"daft-functions-json/python",
"daft-writers/python",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python",
"dep:pyo3",
"dep:pyo3-log"
]
Expand Down Expand Up @@ -141,6 +151,7 @@ members = [
"src/daft-scheduler",
"src/daft-sketch",
"src/daft-sql",
"src/daft-writers",
"src/daft-table",
"src/hyperloglog",
"src/parquet2"
Expand Down
159 changes: 159 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import uuid
from abc import ABC, abstractmethod
from typing import Optional

Check warning on line 3 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L1-L3

Added lines #L1 - L3 were not covered by tests

from daft.daft import IOConfig
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (

Check warning on line 7 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L5-L7

Added lines #L5 - L7 were not covered by tests
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.series import Series
from daft.table.micropartition import MicroPartition
from daft.table.partitioning import (

Check warning on line 14 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L12-L14

Added lines #L12 - L14 were not covered by tests
partition_strings_to_path,
partition_values_to_str_mapping,
)
from daft.table.table import Table

Check warning on line 18 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L18

Added line #L18 was not covered by tests


class FileWriterBase(ABC):
def __init__(

Check warning on line 22 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L21-L22

Added lines #L21 - L22 were not covered by tests
self,
root_dir: str,
file_idx: int,
file_format: str,
partition_values: Optional[Table] = None,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
default_partition_fallback: str = "__HIVE_DEFAULT_PARTITION__",
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(root_dir, io_config=io_config)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"

Check warning on line 35 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L32-L35

Added lines #L32 - L35 were not covered by tests

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.partition_values = partition_values
if self.partition_values is not None:
partition_strings = {

Check warning on line 40 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L37-L40

Added lines #L37 - L40 were not covered by tests
key: values.to_pylist()[0]
for key, values in partition_values_to_str_mapping(self.partition_values).items()
}
self.dir_path = partition_strings_to_path(self.resolved_path, partition_strings, default_partition_fallback)

Check warning on line 44 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L44

Added line #L44 was not covered by tests
else:
self.dir_path = f"{self.resolved_path}"

Check warning on line 46 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L46

Added line #L46 was not covered by tests

self.full_path = f"{self.dir_path}/{self.file_name}"
if is_local_fs:
self.fs.create_dir(self.dir_path, recursive=True)

Check warning on line 50 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L48-L50

Added lines #L48 - L50 were not covered by tests

self.compression = compression if compression is not None else "none"

Check warning on line 52 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L52

Added line #L52 was not covered by tests

colin-ho marked this conversation as resolved.
Show resolved Hide resolved
@abstractmethod
def write(self, table: MicroPartition) -> None:

Check warning on line 55 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L54-L55

Added lines #L54 - L55 were not covered by tests
"""Write data to the file using the appropriate writer.

Args:
table: MicroPartition containing the data to be written.
"""
pass

Check warning on line 61 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L61

Added line #L61 was not covered by tests

@abstractmethod
def close(self) -> Table:

Check warning on line 64 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L63-L64

Added lines #L63 - L64 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we name this something like start_next_file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirroring the above comment, the python file writers should not write after close

"""Close the writer and return metadata about the written file. Write should not be called after close.

Returns:
Table containing metadata about the written file, including path and partition values.
"""
pass

Check warning on line 70 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L70

Added line #L70 was not covered by tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also have a finalize method rather than overloading close to start a next file and closing the last file

Copy link
Contributor Author

@colin-ho colin-ho Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually intending for these Python writers to be non rotating. i.e. no writing after closing. They should be given a unique file_idx for the file_name generation upon construction, and unique set of partition_values.

I will add assertions and some comments to document this behaviour


class ParquetFileWriter(FileWriterBase):
def __init__(

Check warning on line 74 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L73-L74

Added lines #L73 - L74 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[Table] = None,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(

Check warning on line 82 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L82

Added line #L82 was not covered by tests
root_dir=root_dir,
file_idx=file_idx,
file_format="parquet",
partition_values=partition_values,
compression=compression,
io_config=io_config,
)
self.is_closed = False
self.current_writer: Optional[pq.ParquetWriter] = None

Check warning on line 91 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L90-L91

Added lines #L90 - L91 were not covered by tests

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(

Check warning on line 94 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L93-L94

Added lines #L93 - L94 were not covered by tests
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)

def write(self, table: MicroPartition) -> None:
assert not self.is_closed, "Cannot write to a closed ParquetFileWriter"
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 106 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L102-L106

Added lines #L102 - L106 were not covered by tests

def close(self) -> Table:
if self.current_writer is not None:
self.current_writer.close()

Check warning on line 110 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L108-L110

Added lines #L108 - L110 were not covered by tests

self.is_closed = True
metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)

Check warning on line 117 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L112-L117

Added lines #L112 - L117 were not covered by tests


class CSVFileWriter(FileWriterBase):
def __init__(

Check warning on line 121 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L120-L121

Added lines #L120 - L121 were not covered by tests
self,
root_dir: str,
file_idx: int,
partition_values: Optional[Table] = None,
io_config: Optional[IOConfig] = None,
):
super().__init__(

Check warning on line 128 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L128

Added line #L128 was not covered by tests
root_dir=root_dir,
file_idx=file_idx,
file_format="csv",
partition_values=partition_values,
io_config=io_config,
)
self.current_writer: Optional[pacsv.CSVWriter] = None
self.is_closed = False

Check warning on line 136 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L135-L136

Added lines #L135 - L136 were not covered by tests

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
return pacsv.CSVWriter(

Check warning on line 139 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L138-L139

Added lines #L138 - L139 were not covered by tests
self.full_path,
schema,
)

def write(self, table: MicroPartition) -> None:
assert not self.is_closed, "Cannot write to a closed CSVFileWriter"
if self.current_writer is None:
self.current_writer = self._create_writer(table.schema().to_pyarrow_schema())
self.current_writer.write_table(table.to_arrow())

Check warning on line 148 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L144-L148

Added lines #L144 - L148 were not covered by tests

def close(self) -> Table:
if self.current_writer is not None:
self.current_writer.close()

Check warning on line 152 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L150-L152

Added lines #L150 - L152 were not covered by tests

self.is_closed = True
metadata = {"path": Series.from_pylist([self.full_path])}
if self.partition_values is not None:
for col_name in self.partition_values.column_names():
metadata[col_name] = self.partition_values.get_column(col_name)
return Table.from_pydict(metadata)

Check warning on line 159 in daft/io/writer.py

View check run for this annotation

Codecov / codecov/patch

daft/io/writer.py#L154-L159

Added lines #L154 - L159 were not covered by tests
38 changes: 25 additions & 13 deletions daft/table/partitioning.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from daft import Series
from daft.expressions import ExpressionsProjection
from daft.table.table import Table

from .micropartition import MicroPartition


def partition_strings_to_path(
root_path: str, parts: Dict[str, str], partition_null_fallback: str = "__HIVE_DEFAULT_PARTITION__"
root_path: str,
parts: Dict[str, str],
partition_null_fallback: str = "__HIVE_DEFAULT_PARTITION__",
) -> str:
keys = parts.keys()
values = [partition_null_fallback if value is None else value for value in parts.values()]
postfix = "/".join(f"{k}={v}" for k, v in zip(keys, values))
return f"{root_path}/{postfix}"


def partition_values_to_str_mapping(
partition_values: Union[MicroPartition, Table],
) -> Dict[str, Series]:
null_part = Series.from_pylist(
[None]
) # This is to ensure that the null values are replaced with the default_partition_fallback value
pkey_names = partition_values.column_names()

partition_strings = {}

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled

return partition_strings


class PartitionedTable:
def __init__(self, table: MicroPartition, partition_keys: Optional[ExpressionsProjection]):
self.table = table
Expand Down Expand Up @@ -56,20 +78,10 @@ def partition_values_str(self) -> Optional[MicroPartition]:

If the table is not partitioned, returns None.
"""
null_part = Series.from_pylist([None])
partition_values = self.partition_values()

if partition_values is None:
return None
else:
pkey_names = partition_values.column_names()

partition_strings = {}

for c in pkey_names:
column = partition_values.get_column(c)
string_names = column._to_str_values()
null_filled = column.is_null().if_else(null_part, string_names)
partition_strings[c] = null_filled

partition_strings = partition_values_to_str_mapping(partition_values)
return MicroPartition.from_pydict(partition_strings)
13 changes: 12 additions & 1 deletion src/daft-core/src/utils/identity_hash_set.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::hash::{BuildHasherDefault, Hasher};
use std::hash::{BuildHasherDefault, Hash, Hasher};

pub type IdentityBuildHasher = BuildHasherDefault<IdentityHasher>;

Expand Down Expand Up @@ -27,3 +27,14 @@
self.hash = i;
}
}

pub struct IndexHash {
pub idx: u64,
pub hash: u64,
}

impl Hash for IndexHash {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash);
}

Check warning on line 39 in src/daft-core/src/utils/identity_hash_set.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-core/src/utils/identity_hash_set.rs#L37-L39

Added lines #L37 - L39 were not covered by tests
}
1 change: 1 addition & 0 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod object_store_glob;
mod s3_like;
mod stats;
mod stream_utils;

use azure_blob::AzureBlobSource;
use common_file_formats::FileFormat;
use google_cloud::GCSSource;
Expand Down
4 changes: 3 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
async-trait = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
Expand All @@ -17,6 +18,7 @@ daft-physical-plan = {path = "../daft-physical-plan", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
daft-writers = {path = "../daft-writers", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
lazy_static = {workspace = true}
Expand All @@ -29,7 +31,7 @@ tokio-stream = {workspace = true}
tracing = {workspace = true}

[features]
python = ["dep:pyo3", "common-daft-config/python", "common-file-formats/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]
python = ["dep:pyo3", "common-daft-config/python", "common-file-formats/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "daft-writers/python", "common-display/python"]

[lints]
workspace = true
Expand Down
Loading
Loading