Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming physical writes for native executor #2992

Merged
merged 18 commits into from
Oct 31, 2024

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Oct 3, 2024

Streaming writes for swordfish (parquet + csv only). Iceberg and delta writes are here: #2966

Implement streaming writes as a blocking sink. Unpartitioned writes run with 1 worker, and Partitioned writes run with NUM_CPUs workers. As a drive by, made blocking sinks parallelizable.

Behaviour

  • Unpartitioned: Make writes to a TargetFileSizeWriter, which manages file sizes and row group sizes, as data is streamed in.

  • Partitioned: Partition data via a Dispatcher and send to workers based on the hash. Each worker runs a PartitionedWriter that manages partitioning by value, file sizes, and row group sizes.

Benchmarks:
I made a new benchmark suite in tests/benchmarks/test_streaming_writes.py, it tests writes of tpch lineitem to parquet/csv with/without partition columns and different file/rowgroup size. The streaming executor performs much better when there are partition columns, as seen in this screenshot. Without partition columns it is about the same, when target row group size / file size is decreased, it is slightly slower. Likely due to the fact that probably does more slicing, but will need to investigate more. Memory usage is the same for both.
Screenshot 2024-10-03 at 11 22 32 AM

Memory test on read->write parquet tpch lineitem sf1:
Native:
Screenshot 2024-10-08 at 1 48 34 PM

Python:
Screenshot 2024-10-08 at 1 48 50 PM

@github-actions github-actions bot added the enhancement New feature or request label Oct 3, 2024
Copy link

codspeed-hq bot commented Oct 3, 2024

CodSpeed Performance Report

Merging #2992 will not alter performance

Comparing colin/streaming-physical-writes (b67499e) with main (301cd48)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Oct 3, 2024

Codecov Report

Attention: Patch coverage is 87.94848% with 131 lines in your changes missing coverage. Please review.

Project coverage is 77.96%. Comparing base (f966e02) to head (b67499e).
Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
daft/io/writer.py 0.00% 74 Missing ⚠️
src/daft-table/src/lib.rs 0.00% 13 Missing ⚠️
src/daft-writers/src/partition.rs 87.87% 12 Missing ⚠️
src/daft-local-execution/src/dispatcher.rs 82.50% 7 Missing ⚠️
src/daft-writers/src/physical.rs 85.71% 7 Missing ⚠️
src/daft-writers/src/lib.rs 92.00% 4 Missing ⚠️
src/daft-core/src/utils/identity_hash_set.rs 0.00% 3 Missing ⚠️
src/daft-local-execution/src/buffer.rs 93.75% 3 Missing ⚠️
src/daft-local-execution/src/sinks/aggregate.rs 93.10% 2 Missing ⚠️
src/daft-local-execution/src/sinks/sort.rs 93.10% 2 Missing ⚠️
... and 4 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2992      +/-   ##
==========================================
- Coverage   78.87%   77.96%   -0.91%     
==========================================
  Files         624      633       +9     
  Lines       75923    77485    +1562     
==========================================
+ Hits        59881    60415     +534     
- Misses      16042    17070    +1028     
Files with missing lines Coverage Δ
daft/table/partitioning.py 95.55% <100.00%> (+0.43%) ⬆️
src/daft-io/src/lib.rs 70.73% <ø> (-0.49%) ⬇️
...-execution/src/intermediate_ops/intermediate_op.rs 82.05% <100.00%> (-0.12%) ⬇️
src/daft-local-execution/src/lib.rs 19.77% <ø> (-73.09%) ⬇️
src/daft-local-execution/src/run.rs 88.54% <100.00%> (ø)
...rc/daft-local-execution/src/sinks/blocking_sink.rs 81.65% <100.00%> (+7.26%) ⬆️
.../daft-local-execution/src/sinks/hash_join_build.rs 96.66% <100.00%> (+1.66%) ⬆️
src/daft-local-execution/src/sinks/write.rs 100.00% <100.00%> (ø)
src/daft-micropartition/src/ops/concat.rs 97.91% <100.00%> (+0.41%) ⬆️
src/daft-micropartition/src/python.rs 78.23% <100.00%> (ø)
... and 22 more

... and 15 files with indirect coverage changes

daft/io/writer.py Outdated Show resolved Hide resolved
daft/io/writer.py Outdated Show resolved Hide resolved
daft/io/writer.py Outdated Show resolved Hide resolved
daft/io/writer.py Show resolved Hide resolved
raise NotImplementedError("Subclasses must implement this method.")

def write(self, table: MicroPartition):
if self.current_writer is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better here to rely on override-able methods or properties.
so something like if self.current_writer() is None:
Ideally, you can center the logic here and then have the child classes implement the specifics for each file type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I made the methods in FileWriterBase all abstract methods, so the child classes have their own implementation.

src/daft-micropartition/src/py_writers.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/py_writers.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/lib.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/lib.rs Outdated Show resolved Hide resolved
let mut current_writer: Option<Box<dyn FileWriter>> = None;
let mut current_file_idx = None;
while let Some((data, file_idx)) = input_receiver.recv().await {
if current_file_idx.is_none() || current_file_idx.unwrap() != file_idx {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the general pattern of how we are approaching file writes and I think we can improve it with a better abstraction.

So we already have the trait FileWriter and trait FileWriterFactory

but imagine if we can start layering them.

So at the base we can have ParquetFileWriter but instead of implementing the row group batching in the executor you can instead implement RowBatcherWriter that takes in a FileWriterFactory and then gives you a new factory.

Then you can pass that factory into a TargetFileSizeWriter that will target the target file size for each file it's writing out. Finally that factory can be passed into a PartitionedWriter that will partition by value.

This pattern is pretty common for building adaptors for writers can be seen here in the Iceberg sdk
https://github.com/apache/iceberg/blob/2b55fef7cc2a249d864ac26d85a4923313d96a59/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java

The cool thing is that these Filewriters be be parametered at runtime so the executor can likely be very simple.

Copy link
Contributor Author

@colin-ho colin-ho Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it. So PartitionedWriter is itself a FileWriter, which holds a WriterFactory which can generate TargetFileSizeWriter which is also a FileWriter and also holds a WriterFactory which can generate a RowBatchWriter, which is also a FileWriter etc etc etc.

Implemented this in latest commit, as a blocking sink.

daft/io/writer.py Outdated Show resolved Hide resolved
Table containing metadata about the written file, including path and partition values.
"""
pass

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also have a finalize method rather than overloading close to start a next file and closing the last file

Copy link
Contributor Author

@colin-ho colin-ho Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually intending for these Python writers to be non rotating. i.e. no writing after closing. They should be given a unique file_idx for the file_name generation upon construction, and unique set of partition_values.

I will add assertions and some comments to document this behaviour

pass

@abstractmethod
def close(self) -> Table:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we name this something like start_next_file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirroring the above comment, the python file writers should not write after close

src/daft-local-execution/src/lib.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/pipeline.rs Outdated Show resolved Hide resolved
@@ -1276,5 +1277,27 @@ impl Display for MicroPartition {
}
}

impl Bufferable for MicroPartition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since no one else besides the execution crate is going to use this. What I would recommend is to do this in daft-local-execution to impl this trait on MicroPartition .
this is called the newtype pattern

https://doc.rust-lang.org/book/ch19-03-advanced-traits.html#using-the-newtype-pattern-to-implement-external-traits-on-external-types

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first i was intending for the targetbatchwriter to use the bufferable stuff as well, but i think it's simpler to have it implement it's own buffering logic. And let local execution have it's own buffering logic as well.

src/daft-writers/src/file.rs Outdated Show resolved Hide resolved
src/daft-writers/src/file.rs Outdated Show resolved Hide resolved
}
},
);
match entry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a fan of using the raw pattern outside of the probe table.

Not blocking it for this PR but we should have some kind of way to create Scalars so we can just cram those into the hashmap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would storing the string representation of the partition values be a better alternative for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm but then we'd have to do string conversion every time, so probably not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make an issue for this and tackle this after we create ScalarValue

/// to a separate file. It uses a map to keep track of the writers for each partition.
struct PartitionedWriter {
per_partition_writers:
HashMap<IndexHash, Box<dyn FileWriter<Input = Arc<MicroPartition>, Result = Vec<Table>>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, we should do something like a ScalarValue that wraps the possible partition value types for our non performance critical stuff like here.

@samster25 samster25 self-requested a review October 31, 2024 19:32
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

}
},
);
match entry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make an issue for this and tackle this after we create ScalarValue

@colin-ho colin-ho merged commit 5fc9531 into main Oct 31, 2024
42 checks passed
@colin-ho colin-ho deleted the colin/streaming-physical-writes branch October 31, 2024 20:21
sagiahrac pushed a commit to sagiahrac/Daft that referenced this pull request Nov 4, 2024
Streaming writes for swordfish (parquet + csv only). Iceberg and delta
writes are here: Eventual-Inc#2966

Implement streaming writes as a blocking sink. Unpartitioned writes run
with 1 worker, and Partitioned writes run with NUM_CPUs workers. As a
drive by, made blocking sinks parallelizable.

**Behaviour**
- Unpartitioned: Make writes to a `TargetFileSizeWriter`, which manages
file sizes and row group sizes, as data is streamed in.

- Partitioned: Partition data via a `Dispatcher` and send to workers
based on the hash. Each worker runs a `PartitionedWriter` that manages
partitioning by value, file sizes, and row group sizes.


**Benchmarks:**
I made a new benchmark suite in
`tests/benchmarks/test_streaming_writes.py`, it tests writes of tpch
lineitem to parquet/csv with/without partition columns and different
file/rowgroup size. The streaming executor performs much better when
there are partition columns, as seen in this screenshot. Without
partition columns it is about the same, when target row group size /
file size is decreased, it is slightly slower. Likely due to the fact
that probably does more slicing, but will need to investigate more.
Memory usage is the same for both.
<img width="1400" alt="Screenshot 2024-10-03 at 11 22 32 AM"
src="https://github.com/user-attachments/assets/53b4d77d-553a-4181-8a4d-9eddaa3adaf7">

Memory test on read->write parquet tpch lineitem sf1:
Native:
<img width="1078" alt="Screenshot 2024-10-08 at 1 48 34 PM"
src="https://github.com/user-attachments/assets/3eda33c6-9413-415f-b808-ac3c7437e269">

Python:
<img width="1090" alt="Screenshot 2024-10-08 at 1 48 50 PM"
src="https://github.com/user-attachments/assets/f92b9a9f-a3b5-408b-98d5-4ba2d66b7be4">

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
colin-ho added a commit that referenced this pull request Nov 6, 2024
Implements streaming Iceberg and Delta writes for swordfish. Most of the
write scaffolding has already been implemented in
#2992, this PR implements the
Iceberg/Delta specific functionalities.

A quick TLDR on swordfish writes:
- All of the row group sizing, file sizing, partitioning, is now handled
in the `daft-writer` crate.
- Only the actual writing + flushing is currently handled via Pyarrow
Parquet + Csv writers. We intend to build our own native writers in the
future.

Notes:
- Modified the iceberg writes such that: 
1. the plan now stores just the spec id + partition cols (we used to
keep the whole partitionspec object in the plan but only use the id,
maybe we planned on keeping it around for future work, not sure tho pls
lmk)
2. I made the `add_missing_columns` stuff an explicit projection. It was
a lil cleaner this way instead of having swordfish implement
`add_missing_columns` internally.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants