Skip to content

Commit

Permalink
[FEAT] Add smart planning of ScanTasks starting with merging by files…
Browse files Browse the repository at this point in the history
…izes (#1692)

Refactors/changes required on ScanTask itself:

1. Added a `ScanTask::merge`
2. Added a `ScanTask::partition_spec()`
3. Added some validation in `ScanTask::new` to assert that all the
underlying sources have the same partition spec

I then added a new module `daft_scan::scan_task_iterators` which
contains functions that perform transformations on a `Box<dyn
Iterator<item = DaftResult<ScanTaskRef>>>`.

TODO:

- [x] Make the file_size configurable (as an environment
variable/context flag) so that our unit-tests still run correctly when
we do multi-file tests for multi-partition dataframes (see: #1700 )

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Dec 5, 2023
1 parent 08702a3 commit a53cd51
Show file tree
Hide file tree
Showing 21 changed files with 543 additions and 37 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
common-daft-config = {path = "src/common/daft-config", default-features = false}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dsl = {path = "src/daft-dsl", default-features = false}
Expand Down Expand Up @@ -26,7 +27,8 @@ python = [
"daft-csv/python",
"daft-micropartition/python",
"daft-scan/python",
"daft-stats/python"
"daft-stats/python",
"common-daft-config/python"
]

[lib]
Expand Down Expand Up @@ -69,6 +71,7 @@ members = [
"src/common/error",
"src/common/io-config",
"src/common/treenode",
"src/common/daft-config",
"src/daft-core",
"src/daft-io",
"src/daft-parquet",
Expand Down
34 changes: 34 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import warnings
from typing import TYPE_CHECKING, ClassVar

from daft.daft import PyDaftConfig

if TYPE_CHECKING:
from daft.runners.runner import Runner

Expand Down Expand Up @@ -59,6 +61,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:
class DaftContext:
"""Global context for the current Daft execution environment"""

daft_config: PyDaftConfig = PyDaftConfig()
runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env)
disallow_set_runner: bool = False

Expand Down Expand Up @@ -185,3 +188,34 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
)
_set_context(new_ctx)
return new_ctx


def set_config(
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution
Args:
merge_scan_tasks_min_size_bytes: Minimum size in bytes when merging ScanTasks when reading files from storage.
Increasing this value will make Daft perform more merging of files into a single partition before yielding,
which leads to bigger but fewer partitions. (Defaults to 64MB)
merge_scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage.
Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but
fewer partitions. (Defaults to 512MB)
"""
old_ctx = get_context()

# Replace values in the DaftConfig with user-specified overrides
old_daft_config = old_ctx.daft_config
new_daft_config = old_daft_config.with_config_values(
merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes,
merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes,
)

new_ctx = dataclasses.replace(
old_ctx,
daft_config=new_daft_config,
)
_set_context(new_ctx)
return new_ctx
13 changes: 12 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,20 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
def schema(self) -> PySchema: ...
def optimize(self) -> LogicalPlanBuilder: ...
def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: ...
def to_physical_plan_scheduler(self, cfg: PyDaftConfig) -> PhysicalPlanScheduler: ...
def repr_ascii(self, simple: bool) -> str: ...

class PyDaftConfig:
def with_config_values(
self,
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
) -> PyDaftConfig: ...
@property
def merge_scan_tasks_min_size_bytes(self): ...
@property
def merge_scan_tasks_max_size_bytes(self): ...

def build_type() -> str: ...
def version() -> str: ...
def __getattr__(name) -> Any: ...
Expand Down
3 changes: 2 additions & 1 deletion daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ def explain(self, show_optimized: bool = False, simple=False) -> None:
print(builder.pretty_print(simple))

def num_partitions(self) -> int:
return self.__builder.to_physical_plan_scheduler().num_partitions()
daft_config = get_context().daft_config
return self.__builder.to_physical_plan_scheduler(daft_config).num_partitions()

@DataframePublicAPI
def schema(self) -> Schema:
Expand Down
5 changes: 3 additions & 2 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import (
PartitionScheme,
PyDaftConfig,
ResourceRequest,
ScanOperatorHandle,
StorageConfig,
Expand All @@ -34,7 +35,7 @@ class LogicalPlanBuilder:
def __init__(self, builder: _LogicalPlanBuilder) -> None:
self._builder = builder

def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler:
def to_physical_plan_scheduler(self, daft_config: PyDaftConfig) -> PhysicalPlanScheduler:
"""
Convert the underlying logical plan to a physical plan scheduler, which is
used to generate executable tasks for the physical plan.
Expand All @@ -43,7 +44,7 @@ def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler:
"""
from daft.plan_scheduler.physical_plan_scheduler import PhysicalPlanScheduler

return PhysicalPlanScheduler(self._builder.to_physical_plan_scheduler())
return PhysicalPlanScheduler(self._builder.to_physical_plan_scheduler(daft_config))

def schema(self) -> Schema:
"""
Expand Down
5 changes: 4 additions & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import psutil

from daft.context import get_context
from daft.daft import (
FileFormatConfig,
FileInfos,
Expand Down Expand Up @@ -131,11 +132,13 @@ def run_iter(
# NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0
results_buffer_size: int | None = None,
) -> Iterator[PyMaterializedResult]:
daft_config = get_context().daft_config

# Optimize the logical plan.
builder = builder.optimize()
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler()
plan_scheduler = builder.to_physical_plan_scheduler(daft_config)
psets = {
key: entry.value.values()
for key, entry in self._part_set_cache._uuid_to_partition_set.items()
Expand Down
5 changes: 4 additions & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import pyarrow as pa

from daft.context import get_context
from daft.logical.builder import LogicalPlanBuilder
from daft.plan_scheduler import PhysicalPlanScheduler
from daft.runners.progress_bar import ProgressBar
Expand Down Expand Up @@ -675,11 +676,13 @@ def active_plans(self) -> list[str]:
def run_iter(
self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None
) -> Iterator[RayMaterializedResult]:
daft_config = get_context().daft_config

# Optimize the logical plan.
builder = builder.optimize()
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler()
plan_scheduler = builder.to_physical_plan_scheduler(daft_config)

psets = {
key: entry.value.values()
Expand Down
12 changes: 12 additions & 0 deletions src/common/daft-config/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[dependencies]
lazy_static = {workspace = true}
pyo3 = {workspace = true, optional = true}

[features]
default = ["python"]
python = ["dep:pyo3"]

[package]
edition = {workspace = true}
name = "common-daft-config"
version = {workspace = true}
30 changes: 30 additions & 0 deletions src/common/daft-config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#[derive(Clone)]
pub struct DaftConfig {
pub merge_scan_tasks_min_size_bytes: usize,
pub merge_scan_tasks_max_size_bytes: usize,
}

impl Default for DaftConfig {
fn default() -> Self {
DaftConfig {
merge_scan_tasks_min_size_bytes: 64 * 1024 * 1024, // 64MB
merge_scan_tasks_max_size_bytes: 512 * 1024 * 1024, // 512MB
}
}
}

#[cfg(feature = "python")]
mod python;

#[cfg(feature = "python")]
pub use python::PyDaftConfig;

#[cfg(feature = "python")]
use pyo3::prelude::*;

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<python::PyDaftConfig>()?;

Ok(())
}
48 changes: 48 additions & 0 deletions src/common/daft-config/src/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::Arc;

use pyo3::prelude::*;

use crate::DaftConfig;

#[derive(Clone, Default)]
#[pyclass]
pub struct PyDaftConfig {
pub config: Arc<DaftConfig>,
}

#[pymethods]
impl PyDaftConfig {
#[new]
pub fn new() -> Self {
PyDaftConfig::default()
}

fn with_config_values(
&mut self,
merge_scan_tasks_min_size_bytes: Option<usize>,
merge_scan_tasks_max_size_bytes: Option<usize>,
) -> PyResult<PyDaftConfig> {
let mut config = self.config.as_ref().clone();

if let Some(merge_scan_tasks_max_size_bytes) = merge_scan_tasks_max_size_bytes {
config.merge_scan_tasks_max_size_bytes = merge_scan_tasks_max_size_bytes;
}
if let Some(merge_scan_tasks_min_size_bytes) = merge_scan_tasks_min_size_bytes {
config.merge_scan_tasks_min_size_bytes = merge_scan_tasks_min_size_bytes;
}

Ok(PyDaftConfig {
config: Arc::new(config),
})
}

#[getter(merge_scan_tasks_min_size_bytes)]
fn get_merge_scan_tasks_min_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.merge_scan_tasks_min_size_bytes)
}

#[getter(merge_scan_tasks_max_size_bytes)]
fn get_merge_scan_tasks_max_size_bytes(&self) -> PyResult<usize> {
Ok(self.config.merge_scan_tasks_max_size_bytes)
}
}
3 changes: 2 additions & 1 deletion src/daft-plan/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[dependencies]
arrow2 = {workspace = true, features = ["chrono-tz", "compute_take", "compute_cast", "compute_aggregate", "compute_if_then_else", "compute_sort", "compute_filter", "compute_temporal", "compute_comparison", "compute_arithmetics", "compute_concatenate", "io_ipc"]}
bincode = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-io-config = {path = "../common/io-config", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
Expand All @@ -20,7 +21,7 @@ rstest = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "common-error/python", "common-io-config/python", "daft-core/python", "daft-dsl/python", "daft-table/python"]
python = ["dep:pyo3", "common-error/python", "common-io-config/python", "common-daft-config/python", "daft-core/python", "daft-dsl/python", "daft-table/python"]

[package]
edition = {workspace = true}
Expand Down
10 changes: 8 additions & 2 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use daft_scan::{
#[cfg(feature = "python")]
use {
crate::{physical_plan::PhysicalPlan, source_info::InMemoryInfo},
common_daft_config::PyDaftConfig,
daft_core::python::schema::PySchema,
daft_dsl::python::PyExpr,
daft_scan::{file_format::PyFileFormatConfig, python::pylib::ScanOperatorHandle},
Expand Down Expand Up @@ -466,10 +467,15 @@ impl PyLogicalPlanBuilder {
/// Finalize the logical plan, translate the logical plan to a physical plan, and return
/// a physical plan scheduler that's capable of launching the work necessary to compute the output
/// of the physical plan.
pub fn to_physical_plan_scheduler(&self, py: Python) -> PyResult<PhysicalPlanScheduler> {
pub fn to_physical_plan_scheduler(
&self,
py: Python,
cfg: PyDaftConfig,
) -> PyResult<PhysicalPlanScheduler> {
py.allow_threads(|| {
let logical_plan = self.builder.build();
let physical_plan: Arc<PhysicalPlan> = plan(logical_plan.as_ref())?.into();
let physical_plan: Arc<PhysicalPlan> =
plan(logical_plan.as_ref(), cfg.config.clone())?.into();
Ok(physical_plan.into())
})
}
Expand Down
Loading

0 comments on commit a53cd51

Please sign in to comment.