Skip to content

Commit

Permalink
[FEAT] Stream results from native executor into python (#2667)
Browse files Browse the repository at this point in the history
Enables streaming into python for native executor.

How it works:
- Spawn the execution tokio runtime in a separate std::thread.
- Instead of collecting results into a buffer, send them over a channel.
- On the main thread, wraps the channel in an iterator that receives the
results via a `.blocking_recv` in the iterator's `.next` method.

Drive bys:
- Enable configurable morsel sizes in execution config.
- Buffering for each operator is now done prior to sending off the
morsels to the parallel workers.
- Buffering now supports slicing in addition to chunking.
- Fixed a bug in `Micropartition::slice` where the `remaining_rows`
counter was incremented instead of decremented.

Simulate streaming:
```
daft.context.set_execution_config(enable_native_executor=True, default_morsel_size=1)
@daft.udf(return_dtype=daft.DataType.int32())
def add_1(a):
    # simulate work
    time.sleep(0.5)
    return [x + 1 for x in a.to_pylist()]


df = daft.from_pydict({"a": [i for i in range(100)]}).with_column("b", add_1(col("a")))
for r in df:
    print(r)
```

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Aug 23, 2024
1 parent 7e9208e commit ab6d1a5
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 135 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ def set_execution_config(
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution. These configuration values
are used when a Dataframe is executed (e.g. calls to `.write_*`, `.collect()` or `.show()`)
Expand Down Expand Up @@ -323,6 +324,7 @@ def set_execution_config(
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
Expand All @@ -346,6 +348,7 @@ def set_execution_config(
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
default_morsel_size=default_morsel_size,
)

ctx._daft_execution_config = new_daft_execution_config
Expand Down
7 changes: 6 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,9 @@ class NativeExecutor:
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
) -> NativeExecutor: ...
def run(self, psets: dict[str, list[PartitionT]]) -> Iterator[PyMicroPartition]: ...
def run(
self, psets: dict[str, list[PartitionT]], cfg: PyDaftExecutionConfig, results_buffer_size: int | None
) -> Iterator[PyMicroPartition]: ...

class PyDaftExecutionConfig:
@staticmethod
Expand All @@ -1739,6 +1741,7 @@ class PyDaftExecutionConfig:
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
) -> PyDaftExecutionConfig: ...
@property
def scan_tasks_min_size_bytes(self) -> int: ...
Expand Down Expand Up @@ -1772,6 +1775,8 @@ class PyDaftExecutionConfig:
def enable_aqe(self) -> bool: ...
@property
def enable_native_executor(self) -> bool: ...
@property
def default_morsel_size(self) -> int: ...

class PyDaftPlanningConfig:
@staticmethod
Expand Down
11 changes: 9 additions & 2 deletions daft/execution/native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from daft.daft import (
NativeExecutor as _NativeExecutor,
)
from daft.daft import PyDaftExecutionConfig
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.partitioning import (
MaterializedResult,
Expand All @@ -25,10 +26,16 @@ def from_logical_plan_builder(cls, builder: LogicalPlanBuilder) -> NativeExecuto
executor = _NativeExecutor.from_logical_plan_builder(builder._builder)
return cls(executor)

def run(self, psets: dict[str, list[MaterializedResult[PartitionT]]]) -> Iterator[PyMaterializedResult]:
def run(
self,
psets: dict[str, list[MaterializedResult[PartitionT]]],
daft_execution_config: PyDaftExecutionConfig,
results_buffer_size: int | None,
) -> Iterator[PyMaterializedResult]:
from daft.runners.pyrunner import PyMaterializedResult

psets_mp = {part_id: [part.vpartition()._micropartition for part in parts] for part_id, parts in psets.items()}
return (
PyMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._executor.run(psets_mp)
PyMaterializedResult(MicroPartition._from_pymicropartition(part))
for part in self._executor.run(psets_mp, daft_execution_config, results_buffer_size)
)
4 changes: 3 additions & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ def run_iter(
logger.info("Using native executor")
executor = NativeExecutor.from_logical_plan_builder(builder)
results_gen = executor.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
daft_execution_config,
results_buffer_size,
)
yield from results_gen
else:
Expand Down
2 changes: 2 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct DaftExecutionConfig {
pub read_sql_partition_size_bytes: usize,
pub enable_aqe: bool,
pub enable_native_executor: bool,
pub default_morsel_size: usize,
}

impl Default for DaftExecutionConfig {
Expand All @@ -80,6 +81,7 @@ impl Default for DaftExecutionConfig {
read_sql_partition_size_bytes: 512 * 1024 * 1024, // 512MB
enable_aqe: false,
enable_native_executor: false,
default_morsel_size: 128 * 1024,
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl PyDaftExecutionConfig {
read_sql_partition_size_bytes: Option<usize>,
enable_aqe: Option<bool>,
enable_native_executor: Option<bool>,
default_morsel_size: Option<usize>,
) -> PyResult<PyDaftExecutionConfig> {
let mut config = self.config.as_ref().clone();

Expand Down Expand Up @@ -173,6 +174,9 @@ impl PyDaftExecutionConfig {
if let Some(enable_native_executor) = enable_native_executor {
config.enable_native_executor = enable_native_executor;
}
if let Some(default_morsel_size) = default_morsel_size {
config.default_morsel_size = default_morsel_size;
}

Ok(PyDaftExecutionConfig {
config: Arc::new(config),
Expand Down Expand Up @@ -256,6 +260,10 @@ impl PyDaftExecutionConfig {
fn enable_native_executor(&self) -> PyResult<bool> {
Ok(self.config.enable_native_executor)
}
#[getter]
fn default_morsel_size(&self) -> PyResult<usize> {
Ok(self.config.default_morsel_size)
}

fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec<u8>,))> {
let bin_data = bincode::serialize(self.config.as_ref())
Expand Down
3 changes: 2 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[dependencies]
async-stream = {workspace = true}
async-trait = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
Expand All @@ -26,7 +27,7 @@ tokio = {workspace = true}
tracing = {workspace = true}

[features]
python = ["dep:pyo3", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]
python = ["dep:pyo3", "common-daft-config/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]

[package]
edition = {workspace = true}
Expand Down
78 changes: 78 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::{collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;
use std::cmp::Ordering::*;

pub struct OperatorBuffer {
pub buffer: VecDeque<Arc<MicroPartition>>,
pub curr_len: usize,
pub threshold: usize,
}

impl OperatorBuffer {
pub fn new(threshold: usize) -> Self {
assert!(threshold > 0);
Self {
buffer: VecDeque::new(),
curr_len: 0,
threshold,
}
}

pub fn push(&mut self, part: Arc<MicroPartition>) {
self.curr_len += part.len();
self.buffer.push_back(part);
}

pub fn try_clear(&mut self) -> Option<DaftResult<Arc<MicroPartition>>> {
match self.curr_len.cmp(&self.threshold) {
Less => None,
Equal => self.clear_all(),
Greater => Some(self.clear_enough()),
}
}

fn clear_enough(&mut self) -> DaftResult<Arc<MicroPartition>> {
assert!(self.curr_len > self.threshold);

let mut to_concat = Vec::with_capacity(self.buffer.len());
let mut remaining = self.threshold;

while remaining > 0 {
let part = self.buffer.pop_front().expect("Buffer should not be empty");
let part_len = part.len();
if part_len <= remaining {
remaining -= part_len;
to_concat.push(part);
} else {
let (head, tail) = part.split_at(remaining)?;
remaining = 0;
to_concat.push(Arc::new(head));
self.buffer.push_front(Arc::new(tail));
break;
}
}
assert_eq!(remaining, 0);

self.curr_len -= self.threshold;
match to_concat.len() {
1 => Ok(to_concat.pop().unwrap()),
_ => MicroPartition::concat(&to_concat.iter().map(|x| x.as_ref()).collect::<Vec<_>>())
.map(Arc::new),
}
}

pub fn clear_all(&mut self) -> Option<DaftResult<Arc<MicroPartition>>> {
if self.buffer.is_empty() {
return None;
}

let concated =
MicroPartition::concat(&self.buffer.iter().map(|x| x.as_ref()).collect::<Vec<_>>())
.map(Arc::new);
self.buffer.clear();
self.curr_len = 0;
Some(concated)
}
}
46 changes: 31 additions & 15 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use crate::{
ExecutionRuntimeHandle, NUM_CPUS,
};

use super::state::OperatorTaskState;
use super::buffer::OperatorBuffer;

pub trait IntermediateOperator: Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
fn name(&self) -> &'static str;
Expand Down Expand Up @@ -61,19 +62,12 @@ impl IntermediateNode {
sender: SingleSender,
rt_context: Arc<RuntimeStatsContext>,
) -> DaftResult<()> {
let mut state = OperatorTaskState::new();
let span = info_span!("IntermediateOp::execute");
let sender = CountingSender::new(sender, rt_context.clone());
while let Some(morsel) = receiver.recv().await {
rt_context.mark_rows_received(morsel.len() as u64);
let result = rt_context.in_span(&span, || op.execute(&morsel))?;
state.add(result);
if let Some(part) = state.try_clear() {
let _ = sender.send(part?).await;
}
}
if let Some(part) = state.clear() {
let _ = sender.send(part?).await;
let _ = sender.send(result).await;
}
Ok(())
}
Expand Down Expand Up @@ -105,16 +99,31 @@ impl IntermediateNode {
pub async fn send_to_workers(
mut receiver: MultiReceiver,
worker_senders: Vec<SingleSender>,
morsel_size: usize,
) -> DaftResult<()> {
let mut next_worker_idx = 0;
let mut send_to_next_worker = |morsel: Arc<MicroPartition>| {
let next_worker_sender = worker_senders.get(next_worker_idx).unwrap();
next_worker_idx = (next_worker_idx + 1) % worker_senders.len();
next_worker_sender.send(morsel)
};
let mut buffer = OperatorBuffer::new(morsel_size);

while let Some(morsel) = receiver.recv().await {
if morsel.is_empty() {
continue;
buffer.push(morsel);
if let Some(ready) = buffer.try_clear() {
let _ = send_to_next_worker(ready?).await;
}
}

let next_worker_sender = worker_senders.get(next_worker_idx).unwrap();
let _ = next_worker_sender.send(morsel).await;
next_worker_idx = (next_worker_idx + 1) % worker_senders.len();
// Buffer may still have some morsels left above the threshold
while let Some(ready) = buffer.try_clear() {
let _ = send_to_next_worker(ready?).await;
}

// Clear all remaining morsels
if let Some(last_morsel) = buffer.clear_all() {
let _ = send_to_next_worker(last_morsel?).await;
}
Ok(())
}
Expand Down Expand Up @@ -169,7 +178,14 @@ impl PipelineNode for IntermediateNode {
child.start(sender, runtime_handle).await?;

let worker_senders = self.spawn_workers(&mut destination, runtime_handle).await;
runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders), self.name());
runtime_handle.spawn(
Self::send_to_workers(
receiver,
worker_senders,
runtime_handle.default_morsel_size(),
),
self.intermediate_op.name(),
);
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/intermediate_ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod aggregate;
pub mod buffer;
pub mod filter;
pub mod intermediate_op;
pub mod project;
pub mod state;
52 changes: 0 additions & 52 deletions src/daft-local-execution/src/intermediate_ops/state.rs

This file was deleted.

Loading

0 comments on commit ab6d1a5

Please sign in to comment.