Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Combine small chunks in sinks for streaming pipelines #14346

Merged
Merged
31 changes: 2 additions & 29 deletions crates/polars-pipe/src/executors/sinks/ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use std::any::Any;
use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;

use crate::operators::{
estimated_chunks, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
StreamingVstacker,
chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
};

// Ensure the data is return in the order it was streamed
Expand Down Expand Up @@ -57,32 +55,7 @@ impl Sink for OrderedSink {
self.sort();

let chunks = std::mem::take(&mut self.chunks);
let mut combiner = StreamingVstacker::default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would rechunk, we could simply rechunk here. But I don't want to do that as that should be left to the consumer of the streaming engine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it should probably be done in write_parquet(), otherwise the collect(streaming=True).write_parquet() case will continue to be slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which would requite the new struct be e.g. moved into the polars-core crate and made public.

Here's the runtime with latest commit on my computer, last case is slow again:

SINK 1.0721302032470703
COLLECT+WRITE 0.9235994815826416
STREAMING COLLECT+WRITE 4.254129648208618

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe also write_feather(). or the cloud parquet writer. etc.. (Having it in OrderedSink seemed like a low-cost smoothing of performance bumps, limited to a single place.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then that logic should be in write_parquet indeed. That writer should check the chunk sizes.

I will first merge this one, and then we can follow up with the write_parquet optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe also write_feather(). or the cloud parquet writer. etc.. (Having it in OrderedSink seemed like a low-cost smoothing of performance bumps, limited to a single place.)

Yes, but it is more expensive for other operations. Operations themselves should knie their best chunking strategies.

let mut frames_iterator = chunks
.into_iter()
.flat_map(|c| combiner.add(c.data))
.map(|mut df| {
// The dataframe may only be a single, large chunk, in
// which case we don't want to bother with copying it...
if estimated_chunks(&df) > 1 {
df.as_single_chunk_par();
}
df
})
.peekable();
let result = if frames_iterator.peek().is_some() {
let mut result = accumulate_dataframes_vertical_unchecked(frames_iterator);
if let Some(mut df) = combiner.finish() {
if estimated_chunks(&df) > 1 {
df.as_single_chunk_par();
}
let _ = result.vstack_mut(&df);
}
result
} else {
combiner.finish().unwrap()
};
Ok(FinalizedSink::Finished(result))
Ok(FinalizedSink::Finished(chunks_to_df_unchecked(chunks)))
}
fn as_any(&mut self) -> &mut dyn Any {
self
Expand Down
19 changes: 9 additions & 10 deletions crates/polars-pipe/src/executors/sinks/output/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use crossbeam_channel::{Receiver, Sender};
use polars_core::prelude::*;

use crate::operators::{
estimated_chunks, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
StreamingVstacker,
DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, StreamingVstacker,
};

pub(super) trait SinkWriter {
Expand Down Expand Up @@ -46,24 +45,24 @@ pub(super) fn init_writer_thread(
}

for chunk in chunks.drain(0..) {
itamarst marked this conversation as resolved.
Show resolved Hide resolved
for mut dataframe in vstacker.add(chunk.data) {
for mut df in vstacker.add(chunk.data) {
// The dataframe may only be a single, large chunk, in
// which case we don't want to bother with copying it...
if estimated_chunks(&dataframe) > 1 {
dataframe.as_single_chunk();
if df.n_chunks() > 1 {
df.as_single_chunk();
}
writer._write_batch(&dataframe).unwrap();
writer._write_batch(&df).unwrap();
}
}
// all chunks are written remove them
chunks.clear();

if last_write {
if let Some(mut dataframe) = vstacker.finish() {
if estimated_chunks(&dataframe) > 1 {
dataframe.as_single_chunk();
if let Some(mut df) = vstacker.finish() {
if df.n_chunks() > 1 {
df.as_single_chunk();
}
writer._write_batch(&dataframe).unwrap();
writer._write_batch(&df).unwrap();
}
writer._finish().unwrap();
return;
Expand Down
9 changes: 0 additions & 9 deletions crates/polars-pipe/src/operators/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,6 @@ impl Default for StreamingVstacker {
}
}

/// Make a guess at the number of chunks in the `DataFrame` series.
pub(crate) fn estimated_chunks(df: &DataFrame) -> usize {
df.get_columns()
.iter()
.flat_map(|s| s.chunk_lengths())
.next()
.unwrap_or(0)
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading