diff --git a/Cargo.lock b/Cargo.lock index ba1e970fed..f4d16aaeb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1228,6 +1228,7 @@ dependencies = [ "daft-parquet", "daft-plan", "daft-scan", + "daft-scheduler", "daft-stats", "daft-table", "lazy_static", @@ -1552,6 +1553,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "daft-scheduler" +version = "0.2.0-dev0" +dependencies = [ + "bincode", + "common-daft-config", + "common-error", + "common-io-config", + "daft-core", + "daft-dsl", + "daft-plan", + "daft-scan", + "log", + "pyo3", + "pyo3-log", + "rstest", + "serde", +] + [[package]] name = "daft-sketch" version = "0.2.0-dev0" diff --git a/Cargo.toml b/Cargo.toml index 9b5d7f9c5d..2783acd642 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ daft-micropartition = {path = "src/daft-micropartition", default-features = fals daft-parquet = {path = "src/daft-parquet", default-features = false} daft-plan = {path = "src/daft-plan", default-features = false} daft-scan = {path = "src/daft-scan", default-features = false} +daft-scheduler = {path = "src/daft-scheduler", default-features = false} daft-stats = {path = "src/daft-stats", default-features = false} daft-table = {path = "src/daft-table", default-features = false} lazy_static = {workspace = true} @@ -24,16 +25,17 @@ python = [ "dep:pyo3", "dep:pyo3-log", "daft-core/python", - "daft-table/python", + "daft-csv/python", "daft-dsl/python", "daft-io/python", - "daft-plan/python", - "daft-parquet/python", - "daft-csv/python", "daft-json/python", "daft-micropartition/python", + "daft-parquet/python", + "daft-plan/python", "daft-scan/python", + "daft-scheduler/python", "daft-stats/python", + "daft-table/python", "common-daft-config/python", "common-system-info/python" ] @@ -91,6 +93,7 @@ members = [ "src/daft-plan", "src/daft-micropartition", "src/daft-scan", + "src/daft-scheduler", "src/daft-sketch" ] diff --git a/daft/daft.pyi b/daft/daft.pyi index 26ca9289b2..f293e48c40 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -156,12 +156,16 @@ class ResourceRequest: memory_bytes: int | None def __init__( - self, num_cpus: float | None = None, num_gpus: float | None = None, memory_bytes: int | None = None + self, + num_cpus: float | None = None, + num_gpus: float | None = None, + memory_bytes: int | None = None, ): ... @staticmethod def max_resources(resource_requests: list[ResourceRequest]): """Take a field-wise max of the list of resource requests.""" ... + def __add__(self, other: ResourceRequest) -> ResourceRequest: ... def __repr__(self) -> str: ... def __eq__(self, other: ResourceRequest) -> bool: ... # type: ignore[override] @@ -253,29 +257,34 @@ class FileFormatConfig: Create a Parquet file format config. """ ... + @staticmethod def from_csv_config(config: CsvSourceConfig) -> FileFormatConfig: """ Create a CSV file format config. """ ... + @staticmethod def from_json_config(config: JsonSourceConfig) -> FileFormatConfig: """ Create a JSON file format config. """ ... + @staticmethod def from_database_config(config: DatabaseSourceConfig) -> FileFormatConfig: """ Create a database file format config. """ ... + def file_format(self) -> FileFormat: """ Get the file format for this config. """ ... + def __eq__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] def __ne__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override] @@ -396,16 +405,19 @@ class FileInfos: Create from a Daft table with "path", "size", and "num_rows" columns. """ ... + def extend(self, new_infos: FileInfos) -> FileInfos: """ Concatenate two FileInfos together. """ ... + def __getitem__(self, idx: int) -> FileInfo: ... def to_table(self) -> PyTable: """ Convert to a Daft table with "path", "size", and "num_rows" columns. """ + def __len__(self) -> int: ... class S3Config: @@ -560,7 +572,12 @@ class IOConfig: azure: AzureConfig gcs: GCSConfig - def __init__(self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None): ... + def __init__( + self, + s3: S3Config | None = None, + azure: AzureConfig | None = None, + gcs: GCSConfig | None = None, + ): ... @staticmethod def from_json(input: str) -> IOConfig: """ @@ -569,7 +586,10 @@ class IOConfig: ... def replace( - self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None + self, + s3: S3Config | None = None, + azure: AzureConfig | None = None, + gcs: GCSConfig | None = None, ) -> IOConfig: """Replaces values if provided, returning a new IOConfig""" ... @@ -606,12 +626,14 @@ class StorageConfig: Create from a native storage config. """ ... + @staticmethod def python(config: PythonStorageConfig) -> StorageConfig: """ Create from a Python storage config. """ ... + @property def config(self) -> NativeStorageConfig | PythonStorageConfig: ... @@ -625,16 +647,19 @@ class ScanTask: Get number of rows that will be scanned by this ScanTask. """ ... + def size_bytes(self) -> int: """ Get number of bytes that will be scanned by this ScanTask. """ ... + def estimate_in_memory_size_bytes(self, cfg: PyDaftExecutionConfig) -> int: """ Estimate the In Memory Size of this ScanTask. """ ... + @staticmethod def catalog_scan_task( file: str, @@ -651,6 +676,7 @@ class ScanTask: Create a Catalog Scan Task """ ... + @staticmethod def sql_scan_task( url: str, @@ -666,6 +692,7 @@ class ScanTask: Create a SQL Scan Task """ ... + @staticmethod def python_factory_func_scan_task( module: str, @@ -712,7 +739,10 @@ class PartitionField: field: PyField def __init__( - self, field: PyField, source_field: PyField | None = None, transform: PartitionTransform | None = None + self, + field: PyField, + source_field: PyField | None = None, + transform: PartitionTransform | None = None, ) -> None: ... class PartitionTransform: @@ -897,7 +927,11 @@ class PyDataType: @staticmethod def embedding(data_type: PyDataType, size: int) -> PyDataType: ... @staticmethod - def image(mode: ImageMode | None = None, height: int | None = None, width: int | None = None) -> PyDataType: ... + def image( + mode: ImageMode | None = None, + height: int | None = None, + width: int | None = None, + ) -> PyDataType: ... @staticmethod def tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ... @staticmethod @@ -1051,7 +1085,11 @@ class PyExpr: def struct_get(self, name: str) -> PyExpr: ... def map_get(self, key: PyExpr) -> PyExpr: ... def url_download( - self, max_connections: int, raise_error_on_failure: bool, multi_thread: bool, config: IOConfig + self, + max_connections: int, + raise_error_on_failure: bool, + multi_thread: bool, + config: IOConfig, ) -> PyExpr: ... def partitioning_days(self) -> PyExpr: ... def partitioning_hours(self) -> PyExpr: ... @@ -1206,11 +1244,25 @@ class PyTable: def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ... def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyTable: ... def pivot( - self, group_by: list[PyExpr], pivot_column: PyExpr, values_column: PyExpr, names: list[str] + self, + group_by: list[PyExpr], + pivot_column: PyExpr, + values_column: PyExpr, + names: list[str], + ) -> PyTable: ... + def hash_join( + self, + right: PyTable, + left_on: list[PyExpr], + right_on: list[PyExpr], + how: JoinType, ) -> PyTable: ... - def hash_join(self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr], how: JoinType) -> PyTable: ... def sort_merge_join( - self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr], is_sorted: bool + self, + right: PyTable, + left_on: list[PyExpr], + right_on: list[PyExpr], + is_sorted: bool, ) -> PyTable: ... def explode(self, to_explode: list[PyExpr]) -> PyTable: ... def head(self, num: int) -> PyTable: ... @@ -1268,17 +1320,33 @@ class PyMicroPartition: def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ... def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyMicroPartition: ... def hash_join( - self, right: PyMicroPartition, left_on: list[PyExpr], right_on: list[PyExpr], how: JoinType + self, + right: PyMicroPartition, + left_on: list[PyExpr], + right_on: list[PyExpr], + how: JoinType, ) -> PyMicroPartition: ... def pivot( - self, group_by: list[PyExpr], pivot_column: PyExpr, values_column: PyExpr, names: list[str] + self, + group_by: list[PyExpr], + pivot_column: PyExpr, + values_column: PyExpr, + names: list[str], ) -> PyMicroPartition: ... def sort_merge_join( - self, right: PyMicroPartition, left_on: list[PyExpr], right_on: list[PyExpr], is_sorted: bool + self, + right: PyMicroPartition, + left_on: list[PyExpr], + right_on: list[PyExpr], + is_sorted: bool, ) -> PyMicroPartition: ... def explode(self, to_explode: list[PyExpr]) -> PyMicroPartition: ... def unpivot( - self, ids: list[PyExpr], values: list[PyExpr], variable_name: str, value_name: str + self, + ids: list[PyExpr], + values: list[PyExpr], + variable_name: str, + value_name: str, ) -> PyMicroPartition: ... def head(self, num: int) -> PyMicroPartition: ... def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyMicroPartition: ... @@ -1346,6 +1414,11 @@ class PhysicalPlanScheduler: A work scheduler for physical query plans. """ + @staticmethod + def from_logical_plan_builder( + logical_plan_builder: LogicalPlanBuilder, + cfg: PyDaftExecutionConfig, + ) -> PhysicalPlanScheduler: ... def num_partitions(self) -> int: ... def repr_ascii(self, simple: bool) -> str: ... def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ... @@ -1354,6 +1427,12 @@ class AdaptivePhysicalPlanScheduler: """ An adaptive Physical Plan Scheduler. """ + + @staticmethod + def from_logical_plan_builder( + logical_plan_builder: LogicalPlanBuilder, + cfg: PyDaftExecutionConfig, + ) -> AdaptivePhysicalPlanScheduler: ... def next(self) -> tuple[int | None, PhysicalPlanScheduler]: ... def is_done(self) -> bool: ... # Todo use in memory info here instead @@ -1394,7 +1473,11 @@ class LogicalPlanBuilder: def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ... def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ... def unpivot( - self, ids: list[PyExpr], values: list[PyExpr], variable_name: str, value_name: str + self, + ids: list[PyExpr], + values: list[PyExpr], + variable_name: str, + value_name: str, ) -> LogicalPlanBuilder: ... def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ... def hash_repartition( diff --git a/daft/logical/builder.py b/daft/logical/builder.py index e3ae33765c..33fbaca744 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -21,7 +21,10 @@ if TYPE_CHECKING: from pyiceberg.table import Table as IcebergTable - from daft.plan_scheduler.physical_plan_scheduler import AdaptivePhysicalPlanScheduler, PhysicalPlanScheduler + from daft.plan_scheduler.physical_plan_scheduler import ( + AdaptivePhysicalPlanScheduler, + PhysicalPlanScheduler, + ) class LogicalPlanBuilder: @@ -41,14 +44,22 @@ def to_physical_plan_scheduler(self, daft_execution_config: PyDaftExecutionConfi """ from daft.plan_scheduler.physical_plan_scheduler import PhysicalPlanScheduler - return PhysicalPlanScheduler(self._builder.to_physical_plan_scheduler(daft_execution_config)) + return PhysicalPlanScheduler.from_logical_plan_builder( + self, + daft_execution_config, + ) def to_adaptive_physical_plan_scheduler( self, daft_execution_config: PyDaftExecutionConfig ) -> AdaptivePhysicalPlanScheduler: - from daft.plan_scheduler.physical_plan_scheduler import AdaptivePhysicalPlanScheduler + from daft.plan_scheduler.physical_plan_scheduler import ( + AdaptivePhysicalPlanScheduler, + ) - return AdaptivePhysicalPlanScheduler(self._builder.to_adaptive_physical_plan_scheduler(daft_execution_config)) + return AdaptivePhysicalPlanScheduler.from_logical_plan_builder( + self, + daft_execution_config, + ) def schema(self) -> Schema: """ @@ -78,10 +89,20 @@ def optimize(self) -> LogicalPlanBuilder: @classmethod def from_in_memory_scan( - cls, partition: PartitionCacheEntry, schema: Schema, num_partitions: int, size_bytes: int, num_rows: int + cls, + partition: PartitionCacheEntry, + schema: Schema, + num_partitions: int, + size_bytes: int, + num_rows: int, ) -> LogicalPlanBuilder: builder = _LogicalPlanBuilder.in_memory_scan( - partition.key, partition, schema._schema, num_partitions, size_bytes, num_rows + partition.key, + partition, + schema._schema, + num_partitions, + size_bytes, + num_rows, ) return cls(builder) @@ -125,7 +146,11 @@ def explode(self, explode_expressions: list[Expression]) -> LogicalPlanBuilder: return LogicalPlanBuilder(builder) def unpivot( - self, ids: list[Expression], values: list[Expression], variable_name: str, value_name: str + self, + ids: list[Expression], + values: list[Expression], + variable_name: str, + value_name: str, ) -> LogicalPlanBuilder: ids_pyexprs = [expr._expr for expr in ids] values_pyexprs = [expr._expr for expr in values] diff --git a/daft/plan_scheduler/physical_plan_scheduler.py b/daft/plan_scheduler/physical_plan_scheduler.py index 0484e10f3c..8c5d0c7dcf 100644 --- a/daft/plan_scheduler/physical_plan_scheduler.py +++ b/daft/plan_scheduler/physical_plan_scheduler.py @@ -1,9 +1,18 @@ from __future__ import annotations -from daft.daft import AdaptivePhysicalPlanScheduler as _AdaptivePhysicalPlanScheduler +from daft.daft import ( + AdaptivePhysicalPlanScheduler as _AdaptivePhysicalPlanScheduler, +) from daft.daft import PhysicalPlanScheduler as _PhysicalPlanScheduler +from daft.daft import ( + PyDaftExecutionConfig, +) from daft.execution import physical_plan -from daft.runners.partitioning import PartitionCacheEntry, PartitionT +from daft.logical.builder import LogicalPlanBuilder +from daft.runners.partitioning import ( + PartitionCacheEntry, + PartitionT, +) class PhysicalPlanScheduler: @@ -14,6 +23,13 @@ class PhysicalPlanScheduler: def __init__(self, scheduler: _PhysicalPlanScheduler): self._scheduler = scheduler + @classmethod + def from_logical_plan_builder( + cls, builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig + ) -> PhysicalPlanScheduler: + scheduler = _PhysicalPlanScheduler.from_logical_plan_builder(builder._builder, daft_execution_config) + return cls(scheduler) + def num_partitions(self) -> int: return self._scheduler.num_partitions() @@ -37,6 +53,13 @@ class AdaptivePhysicalPlanScheduler: def __init__(self, scheduler: _AdaptivePhysicalPlanScheduler) -> None: self._scheduler = scheduler + @classmethod + def from_logical_plan_builder( + cls, builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig + ) -> AdaptivePhysicalPlanScheduler: + scheduler = _AdaptivePhysicalPlanScheduler.from_logical_plan_builder(builder._builder, daft_execution_config) + return cls(scheduler) + def next(self) -> tuple[int | None, PhysicalPlanScheduler]: sid, pps = self._scheduler.next() return sid, PhysicalPlanScheduler(pps) diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 90c303db3f..7541559ad5 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -7,10 +7,9 @@ use crate::{ partitioning::{ HashRepartitionConfig, IntoPartitionsConfig, RandomShuffleConfig, RepartitionSpec, }, - physical_planner::plan, sink_info::{OutputFileInfo, SinkInfo}, source_info::SourceInfo, - PhysicalPlanScheduler, ResourceRequest, + ResourceRequest, }; use common_error::{DaftError, DaftResult}; use common_io_config::IOConfig; @@ -23,10 +22,8 @@ use daft_scan::{file_format::FileFormat, Pushdowns, ScanExternalInfo, ScanOperat #[cfg(feature = "python")] use { - crate::physical_planner::python::AdaptivePhysicalPlanScheduler, crate::sink_info::{CatalogInfo, IcebergCatalogInfo}, - crate::{physical_plan::PhysicalPlanRef, source_info::InMemoryInfo}, - common_daft_config::PyDaftExecutionConfig, + crate::source_info::InMemoryInfo, daft_core::python::schema::PySchema, daft_dsl::python::PyExpr, daft_scan::python::pylib::ScanOperatorHandle, @@ -560,7 +557,7 @@ impl From for LogicalPlanBuilder { #[derive(Debug)] pub struct PyLogicalPlanBuilder { // Internal logical plan builder. - builder: LogicalPlanBuilder, + pub builder: LogicalPlanBuilder, } impl PyLogicalPlanBuilder { @@ -857,38 +854,9 @@ impl PyLogicalPlanBuilder { }) } - /// Finalize the logical plan, translate the logical plan to a physical plan, and return - /// a physical plan scheduler that's capable of launching the work necessary to compute the output - /// of the physical plan. - pub fn to_physical_plan_scheduler( - &self, - py: Python, - cfg: PyDaftExecutionConfig, - ) -> PyResult { - py.allow_threads(|| { - let logical_plan = self.builder.build(); - let physical_plan: PhysicalPlanRef = plan(logical_plan, cfg.config.clone())?; - Ok(physical_plan.into()) - }) - } - pub fn repr_ascii(&self, simple: bool) -> PyResult { Ok(self.builder.repr_ascii(simple)) } - - pub fn to_adaptive_physical_plan_scheduler( - &self, - py: Python, - cfg: PyDaftExecutionConfig, - ) -> PyResult { - py.allow_threads(|| { - let logical_plan = self.builder.build(); - Ok(AdaptivePhysicalPlanScheduler::new( - logical_plan, - cfg.config.clone(), - )) - }) - } } impl From for PyLogicalPlanBuilder { diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index f6a03d4c19..52602f5b1e 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -9,7 +9,7 @@ mod logical_ops; mod logical_optimization; mod logical_plan; mod partitioning; -mod physical_ops; +pub mod physical_ops; mod physical_plan; mod physical_planner; mod resource_request; @@ -24,13 +24,19 @@ pub use daft_core::join::{JoinStrategy, JoinType}; use daft_scan::file_format::FileFormat; pub use logical_plan::{LogicalPlan, LogicalPlanRef}; pub use partitioning::ClusteringSpec; -pub use physical_plan::PhysicalPlanScheduler; +pub use physical_plan::{PhysicalPlan, PhysicalPlanRef}; +pub use physical_planner::{ + logical_to_physical, AdaptivePlanner, MaterializedResults, QueryStageOutput, +}; pub use resource_request::ResourceRequest; -pub use source_info::{FileInfo, FileInfos}; +pub use sink_info::OutputFileInfo; +pub use source_info::{FileInfo, FileInfos, InMemoryInfo}; #[cfg(feature = "python")] use pyo3::prelude::*; #[cfg(feature = "python")] +pub use sink_info::{DeltaLakeCatalogInfo, IcebergCatalogInfo}; +#[cfg(feature = "python")] use { daft_scan::file_format::{ CsvSourceConfig, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig, @@ -42,8 +48,6 @@ use { pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { use daft_scan::file_format::DatabaseSourceConfig; - use crate::physical_planner::python::AdaptivePhysicalPlanScheduler; - parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; @@ -51,8 +55,6 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; - parent.add_class::()?; - parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-plan/src/physical_ops/explode.rs b/src/daft-plan/src/physical_ops/explode.rs index 9c86a880ae..51e11ea06a 100644 --- a/src/daft-plan/src/physical_ops/explode.rs +++ b/src/daft-plan/src/physical_ops/explode.rs @@ -81,7 +81,7 @@ mod tests { use crate::{ partitioning::{HashClusteringConfig, UnknownClusteringConfig}, - physical_planner::plan, + physical_planner::logical_to_physical, test::{dummy_scan_node, dummy_scan_operator}, ClusteringSpec, }; @@ -100,7 +100,7 @@ mod tests { .explode(vec![col("b")])? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; let expected_clustering_spec = ClusteringSpec::Hash(HashClusteringConfig::new(3, vec![col("a")])); @@ -127,7 +127,7 @@ mod tests { .explode(vec![col("b")])? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; let expected_clustering_spec = ClusteringSpec::Unknown(UnknownClusteringConfig::new(3)); diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index 5d287087dd..5d46465013 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -66,7 +66,7 @@ mod tests { use crate::{ partitioning::{ClusteringSpec, HashClusteringConfig, UnknownClusteringConfig}, - physical_planner::plan, + physical_planner::logical_to_physical, test::{dummy_scan_node, dummy_scan_operator}, }; @@ -89,7 +89,7 @@ mod tests { .select(expressions)? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; let expected_clustering_spec = ClusteringSpec::Hash(HashClusteringConfig::new(3, vec![col("aa"), col("b")])); @@ -126,7 +126,7 @@ mod tests { .select(projection)? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; let expected_clustering_spec = ClusteringSpec::Unknown(UnknownClusteringConfig::new(3)); assert_eq!( @@ -153,7 +153,7 @@ mod tests { .select(expressions)? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; let expected_clustering_spec = ClusteringSpec::Hash(HashClusteringConfig::new(3, vec![col("a"), col("b")])); diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 4a308215b0..8795d13933 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -1,22 +1,3 @@ -#[cfg(feature = "python")] -use { - crate::sink_info::OutputFileInfo, - common_io_config::IOConfig, - daft_core::python::schema::PySchema, - daft_core::schema::SchemaRef, - daft_dsl::python::PyExpr, - daft_dsl::Expr, - daft_scan::{file_format::FileFormat, python::pylib::PyScanTask}, - pyo3::{ - pyclass, pymethods, types::PyBytes, PyObject, PyRef, PyRefMut, PyResult, PyTypeInfo, - Python, ToPyObject, - }, - std::collections::HashMap, -}; - -use crate::source_info::InMemoryInfo; -use daft_core::impl_bincode_py_state_serialization; -use daft_dsl::ExprRef; use serde::{Deserialize, Serialize}; use std::{cmp::max, ops::Add, sync::Arc}; @@ -29,13 +10,7 @@ use crate::{ physical_ops::*, }; -#[cfg(feature = "python")] -use crate::sink_info::IcebergCatalogInfo; - -#[cfg(feature = "python")] -use crate::sink_info::DeltaLakeCatalogInfo; - -pub(crate) type PhysicalPlanRef = Arc; +pub type PhysicalPlanRef = Arc; /// Physical plan for a Daft query. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -623,645 +598,3 @@ impl PhysicalPlan { s } } - -/// A work scheduler for physical plans. -#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] -#[derive(Debug, Serialize, Deserialize)] -pub struct PhysicalPlanScheduler { - plan: PhysicalPlanRef, -} - -#[cfg(feature = "python")] -#[pymethods] -impl PhysicalPlanScheduler { - pub fn num_partitions(&self) -> PyResult { - Ok(self.plan.clustering_spec().num_partitions() as i64) - } - - pub fn repr_ascii(&self, simple: bool) -> PyResult { - Ok(self.plan.repr_ascii(simple)) - } - /// Converts the contained physical plan into an iterator of executable partition tasks. - pub fn to_partition_tasks(&self, psets: HashMap>) -> PyResult { - Python::with_gil(|py| self.plan.to_partition_tasks(py, &psets)) - } -} - -impl_bincode_py_state_serialization!(PhysicalPlanScheduler); - -impl From for PhysicalPlanScheduler { - fn from(plan: PhysicalPlanRef) -> Self { - Self { plan } - } -} - -#[cfg(feature = "python")] -#[pyclass] -struct PartitionIterator { - parts: Vec, - index: usize, -} - -#[cfg(feature = "python")] -#[pymethods] -impl PartitionIterator { - fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { - slf - } - fn __next__(mut slf: PyRefMut<'_, Self>) -> Option { - let index = slf.index; - slf.index += 1; - slf.parts.get(index).map(|part| part.clone_ref(slf.py())) - } -} - -#[allow(clippy::too_many_arguments)] -#[cfg(feature = "python")] -fn tabular_write( - py: Python<'_>, - upstream_iter: PyObject, - file_format: &FileFormat, - schema: &SchemaRef, - root_dir: &String, - compression: &Option, - partition_cols: &Option>, - io_config: &Option, -) -> PyResult { - let part_cols = partition_cols.as_ref().map(|cols| { - cols.iter() - .map(|e| e.clone().into()) - .collect::>() - }); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "write_file"))? - .call1(( - upstream_iter, - file_format.clone(), - PySchema::from(schema.clone()), - root_dir, - compression.clone(), - part_cols, - io_config - .as_ref() - .map(|cfg| common_io_config::python::IOConfig { - config: cfg.clone(), - }), - ))?; - Ok(py_iter.into()) -} - -#[allow(clippy::too_many_arguments)] -#[cfg(feature = "python")] -fn iceberg_write( - py: Python<'_>, - upstream_iter: PyObject, - iceberg_info: &IcebergCatalogInfo, -) -> PyResult { - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "write_iceberg"))? - .call1(( - upstream_iter, - &iceberg_info.table_location, - &iceberg_info.iceberg_schema, - &iceberg_info.iceberg_properties, - iceberg_info.spec_id, - iceberg_info - .io_config - .as_ref() - .map(|cfg| common_io_config::python::IOConfig { - config: cfg.clone(), - }), - ))?; - Ok(py_iter.into()) -} - -#[allow(clippy::too_many_arguments)] -#[cfg(feature = "python")] -fn deltalake_write( - py: Python<'_>, - upstream_iter: PyObject, - delta_lake_info: &DeltaLakeCatalogInfo, -) -> PyResult { - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "write_deltalake"))? - .call1(( - upstream_iter, - &delta_lake_info.path, - delta_lake_info.large_dtypes, - delta_lake_info.version, - delta_lake_info - .io_config - .as_ref() - .map(|cfg| common_io_config::python::IOConfig { - config: cfg.clone(), - }), - ))?; - Ok(py_iter.into()) -} - -#[cfg(feature = "python")] -impl PhysicalPlan { - pub fn to_partition_tasks( - &self, - py: Python<'_>, - psets: &HashMap>, - ) -> PyResult { - match self { - PhysicalPlan::InMemoryScan(InMemoryScan { - in_memory_info: InMemoryInfo { cache_key, .. }, - .. - }) => { - let partition_iter = PartitionIterator { - parts: psets[cache_key].clone(), - index: 0usize, - }; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "partition_read"))? - .call1((partition_iter,))?; - Ok(py_iter.into()) - } - PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => { - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "scan_with_tasks"))? - .call1((scan_tasks - .iter() - .map(|scan_task| PyScanTask(scan_task.clone())) - .collect::>(),))?; - Ok(py_iter.into()) - } - PhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => { - let schema_mod = py.import(pyo3::intern!(py, "daft.logical.schema"))?; - let python_schema = schema_mod - .getattr(pyo3::intern!(py, "Schema"))? - .getattr(pyo3::intern!(py, "_from_pyschema"))? - .call1((PySchema { - schema: schema.clone(), - },))?; - - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "empty_scan"))? - .call1((python_schema,))?; - Ok(py_iter.into()) - } - - PhysicalPlan::Project(Project { - input, - projection, - resource_request, - .. - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let projection_pyexprs: Vec = projection - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "project"))? - .call1((upstream_iter, projection_pyexprs, resource_request.clone()))?; - Ok(py_iter.into()) - } - PhysicalPlan::Filter(Filter { input, predicate }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let expressions_mod = - py.import(pyo3::intern!(py, "daft.expressions.expressions"))?; - let py_predicate = expressions_mod - .getattr(pyo3::intern!(py, "Expression"))? - .getattr(pyo3::intern!(py, "_from_pyexpr"))? - .call1((PyExpr::from(predicate.clone()),))?; - let expressions_projection = expressions_mod - .getattr(pyo3::intern!(py, "ExpressionsProjection"))? - .call1((vec![py_predicate],))?; - let execution_step_mod = - py.import(pyo3::intern!(py, "daft.execution.execution_step"))?; - let filter_step = execution_step_mod - .getattr(pyo3::intern!(py, "Filter"))? - .call1((expressions_projection,))?; - let resource_request = execution_step_mod - .getattr(pyo3::intern!(py, "ResourceRequest"))? - .call0()?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "pipeline_instruction"))? - .call1((upstream_iter, filter_step, resource_request))?; - Ok(py_iter.into()) - } - PhysicalPlan::Limit(Limit { - input, - limit, - eager, - num_partitions, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_physical_plan = - py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?; - let global_limit_iter = py_physical_plan - .getattr(pyo3::intern!(py, "global_limit"))? - .call1((upstream_iter, *limit, *eager, *num_partitions))?; - Ok(global_limit_iter.into()) - } - PhysicalPlan::Explode(Explode { - input, to_explode, .. - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let explode_pyexprs: Vec = to_explode - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "explode"))? - .call1((upstream_iter, explode_pyexprs))?; - Ok(py_iter.into()) - } - PhysicalPlan::Unpivot(Unpivot { - input, - ids, - values, - variable_name, - value_name, - .. - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let ids_pyexprs: Vec = - ids.iter().map(|expr| PyExpr::from(expr.clone())).collect(); - let values_pyexprs: Vec = values - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "unpivot"))? - .call1(( - upstream_iter, - ids_pyexprs, - values_pyexprs, - variable_name, - value_name, - ))?; - Ok(py_iter.into()) - } - PhysicalPlan::Sample(Sample { - input, - fraction, - with_replacement, - seed, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "sample"))? - .call1((upstream_iter, *fraction, *with_replacement, *seed))?; - Ok(py_iter.into()) - } - PhysicalPlan::MonotonicallyIncreasingId(MonotonicallyIncreasingId { - input, - column_name, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "monotonically_increasing_id"))? - .call1((upstream_iter, column_name))?; - Ok(py_iter.into()) - } - PhysicalPlan::Sort(Sort { - input, - sort_by, - descending, - num_partitions, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let sort_by_pyexprs: Vec = sort_by - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "sort"))? - .call1(( - upstream_iter, - sort_by_pyexprs, - descending.clone(), - *num_partitions, - ))?; - Ok(py_iter.into()) - } - PhysicalPlan::Split(Split { - input, - input_num_partitions, - output_num_partitions, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "split"))? - .call1((upstream_iter, *input_num_partitions, *output_num_partitions))?; - Ok(py_iter.into()) - } - PhysicalPlan::Flatten(Flatten { input }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "flatten_plan"))? - .call1((upstream_iter,))?; - Ok(py_iter.into()) - } - PhysicalPlan::FanoutRandom(FanoutRandom { - input, - num_partitions, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "fanout_random"))? - .call1((upstream_iter, *num_partitions))?; - Ok(py_iter.into()) - } - PhysicalPlan::FanoutByHash(FanoutByHash { - input, - num_partitions, - partition_by, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let partition_by_pyexprs: Vec = partition_by - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "split_by_hash"))? - .call1((upstream_iter, *num_partitions, partition_by_pyexprs))?; - Ok(py_iter.into()) - } - PhysicalPlan::FanoutByRange(_) => unimplemented!( - "FanoutByRange not implemented, since only use case (sorting) doesn't need it yet." - ), - PhysicalPlan::ReduceMerge(ReduceMerge { input }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "reduce_merge"))? - .call1((upstream_iter,))?; - Ok(py_iter.into()) - } - PhysicalPlan::Aggregate(Aggregate { - aggregations, - groupby, - input, - .. - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let aggs_as_pyexprs: Vec = aggregations - .iter() - .map(|agg_expr| PyExpr::from(Expr::Agg(agg_expr.clone()))) - .collect(); - let groupbys_as_pyexprs: Vec = groupby - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "local_aggregate"))? - .call1((upstream_iter, aggs_as_pyexprs, groupbys_as_pyexprs))?; - Ok(py_iter.into()) - } - PhysicalPlan::Pivot(Pivot { - input, - group_by, - pivot_column, - value_column, - names, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let groupbys_as_pyexprs: Vec = group_by - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let pivot_column_pyexpr = PyExpr::from(pivot_column.clone()); - let value_column_pyexpr = PyExpr::from(value_column.clone()); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "pivot"))? - .call1(( - upstream_iter, - groupbys_as_pyexprs, - pivot_column_pyexpr, - value_column_pyexpr, - names.clone(), - ))?; - Ok(py_iter.into()) - } - PhysicalPlan::Coalesce(Coalesce { - input, - num_from, - num_to, - }) => { - let upstream_iter = input.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "coalesce"))? - .call1((upstream_iter, *num_from, *num_to))?; - Ok(py_iter.into()) - } - PhysicalPlan::Concat(Concat { other, input }) => { - let upstream_input_iter = input.to_partition_tasks(py, psets)?; - let upstream_other_iter = other.to_partition_tasks(py, psets)?; - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? - .getattr(pyo3::intern!(py, "concat"))? - .call1((upstream_input_iter, upstream_other_iter))?; - Ok(py_iter.into()) - } - PhysicalPlan::HashJoin(HashJoin { - left, - right, - left_on, - right_on, - join_type, - .. - }) => { - let upstream_left_iter = left.to_partition_tasks(py, psets)?; - let upstream_right_iter = right.to_partition_tasks(py, psets)?; - let left_on_pyexprs: Vec = left_on - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let right_on_pyexprs: Vec = right_on - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "hash_join"))? - .call1(( - upstream_left_iter, - upstream_right_iter, - left_on_pyexprs, - right_on_pyexprs, - *join_type, - ))?; - Ok(py_iter.into()) - } - PhysicalPlan::SortMergeJoin(SortMergeJoin { - left, - right, - left_on, - right_on, - join_type, - num_partitions, - left_is_larger, - needs_presort, - }) => { - let left_iter = left.to_partition_tasks(py, psets)?; - let right_iter = right.to_partition_tasks(py, psets)?; - let left_on_pyexprs: Vec = left_on - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let right_on_pyexprs: Vec = right_on - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - // TODO(Clark): Elide sorting one side of the join if already range-partitioned, where we'd use that side's boundaries to sort the other side. - let py_iter = if *needs_presort { - py.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "sort_merge_join_aligned_boundaries"))? - .call1(( - left_iter, - right_iter, - left_on_pyexprs, - right_on_pyexprs, - *join_type, - *num_partitions, - *left_is_larger, - ))? - } else { - py.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "merge_join_sorted"))? - .call1(( - left_iter, - right_iter, - left_on_pyexprs, - right_on_pyexprs, - *join_type, - *left_is_larger, - ))? - }; - Ok(py_iter.into()) - } - PhysicalPlan::BroadcastJoin(BroadcastJoin { - broadcaster: left, - receiver: right, - left_on, - right_on, - join_type, - is_swapped, - }) => { - let upstream_left_iter = left.to_partition_tasks(py, psets)?; - let upstream_right_iter = right.to_partition_tasks(py, psets)?; - let left_on_pyexprs: Vec = left_on - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let right_on_pyexprs: Vec = right_on - .iter() - .map(|expr| PyExpr::from(expr.clone())) - .collect(); - let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? - .getattr(pyo3::intern!(py, "broadcast_join"))? - .call1(( - upstream_left_iter, - upstream_right_iter, - left_on_pyexprs, - right_on_pyexprs, - *join_type, - *is_swapped, - ))?; - Ok(py_iter.into()) - } - PhysicalPlan::TabularWriteParquet(TabularWriteParquet { - schema, - file_info: - OutputFileInfo { - root_dir, - file_format, - partition_cols, - compression, - io_config, - }, - input, - }) => tabular_write( - py, - input.to_partition_tasks(py, psets)?, - file_format, - schema, - root_dir, - compression, - partition_cols, - io_config, - ), - PhysicalPlan::TabularWriteCsv(TabularWriteCsv { - schema, - file_info: - OutputFileInfo { - root_dir, - file_format, - partition_cols, - compression, - io_config, - }, - input, - }) => tabular_write( - py, - input.to_partition_tasks(py, psets)?, - file_format, - schema, - root_dir, - compression, - partition_cols, - io_config, - ), - PhysicalPlan::TabularWriteJson(TabularWriteJson { - schema, - file_info: - OutputFileInfo { - root_dir, - file_format, - partition_cols, - compression, - io_config, - }, - input, - }) => tabular_write( - py, - input.to_partition_tasks(py, psets)?, - file_format, - schema, - root_dir, - compression, - partition_cols, - io_config, - ), - #[cfg(feature = "python")] - PhysicalPlan::IcebergWrite(IcebergWrite { - schema: _, - iceberg_info, - input, - }) => iceberg_write(py, input.to_partition_tasks(py, psets)?, iceberg_info), - #[cfg(feature = "python")] - PhysicalPlan::DeltaLakeWrite(DeltaLakeWrite { - schema: _, - delta_lake_info, - input, - }) => deltalake_write(py, input.to_partition_tasks(py, psets)?, delta_lake_info), - } - } -} diff --git a/src/daft-plan/src/physical_planner/mod.rs b/src/daft-plan/src/physical_planner/mod.rs index fe50c61621..14eff27629 100644 --- a/src/daft-plan/src/physical_planner/mod.rs +++ b/src/daft-plan/src/physical_planner/mod.rs @@ -9,12 +9,11 @@ use crate::LogicalPlan; use crate::physical_planner::planner::PhysicalPlanTranslator; use common_treenode::TreeNode; mod planner; -#[cfg(feature = "python")] -pub mod python; +pub use planner::{AdaptivePlanner, MaterializedResults, QueryStageOutput}; mod translate; /// Translate a logical plan to a physical plan. -pub fn plan( +pub fn logical_to_physical( logical_plan: Arc, cfg: Arc, ) -> DaftResult { diff --git a/src/daft-plan/src/physical_planner/planner.rs b/src/daft-plan/src/physical_planner/planner.rs index 33a770e330..8fe0beb6f0 100644 --- a/src/daft-plan/src/physical_planner/planner.rs +++ b/src/daft-plan/src/physical_planner/planner.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; use common_treenode::{Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor}; +use serde::{Deserialize, Serialize}; use crate::logical_ops::Source; use crate::logical_optimization::Optimizer; @@ -243,7 +244,8 @@ impl TreeNodeRewriter for ReplacePlaceholdersWithMaterializedResult { } } -pub(super) enum QueryStageOutput { +#[derive(Debug, Serialize, Deserialize)] +pub enum QueryStageOutput { Partial { physical_plan: PhysicalPlanRef, source_id: usize, @@ -263,6 +265,13 @@ impl QueryStageOutput { QueryStageOutput::Final { physical_plan } => (None, physical_plan), } } + + pub fn source_id(&self) -> Option { + match self { + QueryStageOutput::Partial { source_id, .. } => Some(*source_id), + QueryStageOutput::Final { .. } => None, + } + } } #[derive(PartialEq, Debug)] enum AdaptivePlannerStatus { @@ -271,12 +280,12 @@ enum AdaptivePlannerStatus { Done, } -pub(super) struct MaterializedResults { +pub struct MaterializedResults { pub source_id: usize, pub in_memory_info: InMemoryInfo, } -pub(super) struct AdaptivePlanner { +pub struct AdaptivePlanner { logical_plan: LogicalPlanRef, cfg: Arc, status: AdaptivePlannerStatus, @@ -291,7 +300,7 @@ impl AdaptivePlanner { } } - pub fn next(&mut self) -> DaftResult { + pub fn next_stage(&mut self) -> DaftResult { assert_eq!(self.status, AdaptivePlannerStatus::Ready); let mut rewriter = QueryStagePhysicalPlanTranslator { diff --git a/src/daft-plan/src/physical_planner/translate.rs b/src/daft-plan/src/physical_planner/translate.rs index 12a4c5fb1a..ec2511f867 100644 --- a/src/daft-plan/src/physical_planner/translate.rs +++ b/src/daft-plan/src/physical_planner/translate.rs @@ -914,7 +914,7 @@ mod tests { use std::sync::Arc; use crate::physical_plan::PhysicalPlan; - use crate::physical_planner::plan; + use crate::physical_planner::logical_to_physical; use crate::test::{dummy_scan_node, dummy_scan_operator}; /// Tests that planner drops a simple Repartition (e.g. df.into_partitions()) the child already has the desired number of partitions. @@ -931,13 +931,13 @@ mod tests { .into_partitions(10)? .filter(col("a").lt(lit(2)))?; assert_eq!( - plan(builder.build(), cfg.clone())? + logical_to_physical(builder.build(), cfg.clone())? .clustering_spec() .num_partitions(), 10 ); let logical_plan = builder.into_partitions(10)?.build(); - let physical_plan = plan(logical_plan, cfg.clone())?; + let physical_plan = logical_to_physical(logical_plan, cfg.clone())?; // Check that the last repartition was dropped (the last op should be the filter). assert_matches!(physical_plan.as_ref(), PhysicalPlan::Filter(_)); Ok(()) @@ -955,13 +955,13 @@ mod tests { Field::new("b", DataType::Utf8), ])); assert_eq!( - plan(builder.build(), cfg.clone())? + logical_to_physical(builder.build(), cfg.clone())? .clustering_spec() .num_partitions(), 1 ); let logical_plan = builder.hash_repartition(Some(1), vec![col("a")])?.build(); - let physical_plan = plan(logical_plan, cfg.clone())?; + let physical_plan = logical_to_physical(logical_plan, cfg.clone())?; assert_matches!(physical_plan.as_ref(), PhysicalPlan::TabularScan(_)); Ok(()) } @@ -980,7 +980,7 @@ mod tests { .filter(col("a").lt(lit(2)))? .hash_repartition(Some(10), vec![col("a")])? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; // Check that the last repartition was dropped (the last op should be the filter). assert_matches!(physical_plan.as_ref(), PhysicalPlan::Filter(_)); Ok(()) @@ -1000,7 +1000,7 @@ mod tests { .aggregate(vec![col("a").sum()], vec![col("b")])? .hash_repartition(Some(10), vec![col("b")])? .build(); - let physical_plan = plan(logical_plan, cfg)?; + let physical_plan = logical_to_physical(logical_plan, cfg)?; // Check that the last repartition was dropped (the last op should be a projection for a multi-partition aggregation). assert_matches!(physical_plan.as_ref(), PhysicalPlan::Project(_)); Ok(()) diff --git a/src/daft-scheduler/Cargo.toml b/src/daft-scheduler/Cargo.toml new file mode 100644 index 0000000000..e1cff8f25e --- /dev/null +++ b/src/daft-scheduler/Cargo.toml @@ -0,0 +1,25 @@ +[dependencies] +bincode = {workspace = true} +common-daft-config = {path = "../common/daft-config", default-features = false} +common-error = {path = "../common/error", default-features = false} +common-io-config = {path = "../common/io-config", default-features = false} +daft-core = {path = "../daft-core", default-features = false} +daft-dsl = {path = "../daft-dsl", default-features = false} +daft-plan = {path = "../daft-plan", default-features = false} +daft-scan = {path = "../daft-scan", default-features = false} +log = {workspace = true} +pyo3 = {workspace = true, optional = true} +pyo3-log = {workspace = true, optional = true} +serde = {workspace = true, features = ["rc"]} + +[dev-dependencies] +rstest = {workspace = true} + +[features] +default = ["python"] +python = ["dep:pyo3", "common-error/python", "common-io-config/python", "common-daft-config/python", "daft-core/python", "daft-plan/python", "daft-dsl/python"] + +[package] +edition = {workspace = true} +name = "daft-scheduler" +version = {workspace = true} diff --git a/src/daft-plan/src/physical_planner/python.rs b/src/daft-scheduler/src/adaptive.rs similarity index 67% rename from src/daft-plan/src/physical_planner/python.rs rename to src/daft-scheduler/src/adaptive.rs index 61c0e0ca68..b3cf84a919 100644 --- a/src/daft-plan/src/physical_planner/python.rs +++ b/src/daft-scheduler/src/adaptive.rs @@ -3,11 +3,15 @@ use std::sync::Arc; use common_daft_config::DaftExecutionConfig; use daft_core::schema::Schema; -use crate::physical_planner::planner::MaterializedResults; -use crate::source_info::InMemoryInfo; -use crate::LogicalPlan; -use crate::{physical_planner::planner::AdaptivePlanner, PhysicalPlanScheduler}; -use pyo3::prelude::*; +use crate::PhysicalPlanScheduler; +use daft_plan::InMemoryInfo; +use daft_plan::LogicalPlan; +use daft_plan::{AdaptivePlanner, MaterializedResults}; + +#[cfg(feature = "python")] +use { + common_daft_config::PyDaftExecutionConfig, daft_plan::PyLogicalPlanBuilder, pyo3::prelude::*, +}; /// A work scheduler for physical plans. #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] pub struct AdaptivePhysicalPlanScheduler { @@ -25,11 +29,25 @@ impl AdaptivePhysicalPlanScheduler { #[cfg(feature = "python")] #[pymethods] impl AdaptivePhysicalPlanScheduler { + #[staticmethod] + pub fn from_logical_plan_builder( + logical_plan_builder: &PyLogicalPlanBuilder, + py: Python<'_>, + cfg: PyDaftExecutionConfig, + ) -> PyResult { + py.allow_threads(|| { + let logical_plan = logical_plan_builder.builder.build(); + Ok(AdaptivePhysicalPlanScheduler::new( + logical_plan, + cfg.config.clone(), + )) + }) + } pub fn next(&mut self, py: Python) -> PyResult<(Option, PhysicalPlanScheduler)> { py.allow_threads(|| { - let output = self.planner.next()?; - let (sid, pps) = output.unwrap(); - Ok((sid, pps.into())) + let output = self.planner.next_stage()?; + let sid = output.source_id(); + Ok((sid, output.into())) }) } diff --git a/src/daft-scheduler/src/lib.rs b/src/daft-scheduler/src/lib.rs new file mode 100644 index 0000000000..3b7154bd15 --- /dev/null +++ b/src/daft-scheduler/src/lib.rs @@ -0,0 +1,16 @@ +mod adaptive; +mod scheduler; + +pub use adaptive::AdaptivePhysicalPlanScheduler; +pub use scheduler::PhysicalPlanScheduler; + +#[cfg(feature = "python")] +use pyo3::prelude::*; + +#[cfg(feature = "python")] +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + parent.add_class::()?; + parent.add_class::()?; + + Ok(()) +} diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs new file mode 100644 index 0000000000..d49400a170 --- /dev/null +++ b/src/daft-scheduler/src/scheduler.rs @@ -0,0 +1,723 @@ +use common_error::DaftResult; +use daft_plan::{logical_to_physical, PhysicalPlan, PhysicalPlanRef, QueryStageOutput}; +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "python")] +use { + common_daft_config::PyDaftExecutionConfig, + common_io_config::IOConfig, + daft_core::python::schema::PySchema, + daft_core::schema::SchemaRef, + daft_dsl::python::PyExpr, + daft_dsl::Expr, + daft_plan::{OutputFileInfo, PyLogicalPlanBuilder}, + daft_scan::{file_format::FileFormat, python::pylib::PyScanTask}, + pyo3::{ + pyclass, pymethods, types::PyBytes, PyObject, PyRef, PyRefMut, PyResult, PyTypeInfo, + Python, ToPyObject, + }, + std::collections::HashMap, +}; + +use daft_core::impl_bincode_py_state_serialization; +use daft_dsl::ExprRef; +use daft_plan::InMemoryInfo; +use std::sync::Arc; + +use daft_plan::physical_ops::*; + +#[cfg(feature = "python")] +use daft_plan::{DeltaLakeCatalogInfo, IcebergCatalogInfo}; + +/// A work scheduler for physical plans. +#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +#[derive(Debug, Serialize, Deserialize)] +pub struct PhysicalPlanScheduler { + query_stage: Arc, +} + +impl PhysicalPlanScheduler { + fn plan(&self) -> PhysicalPlanRef { + match self.query_stage.as_ref() { + QueryStageOutput::Partial { physical_plan, .. } => physical_plan.clone(), + QueryStageOutput::Final { physical_plan } => physical_plan.clone(), + } + } +} + +#[cfg(feature = "python")] +#[pymethods] +impl PhysicalPlanScheduler { + #[staticmethod] + pub fn from_logical_plan_builder( + logical_plan_builder: &PyLogicalPlanBuilder, + py: Python<'_>, + cfg: PyDaftExecutionConfig, + ) -> PyResult { + py.allow_threads(|| { + let logical_plan = logical_plan_builder.builder.build(); + let physical_plan: PhysicalPlanRef = + logical_to_physical(logical_plan, cfg.config.clone())?; + Ok(QueryStageOutput::Final { physical_plan }.into()) + }) + } + + pub fn num_partitions(&self) -> PyResult { + Ok(self.plan().clustering_spec().num_partitions() as i64) + } + + pub fn repr_ascii(&self, simple: bool) -> PyResult { + Ok(self.plan().repr_ascii(simple)) + } + /// Converts the contained physical plan into an iterator of executable partition tasks. + pub fn to_partition_tasks( + &self, + py: Python, + psets: HashMap>, + ) -> PyResult { + physical_plan_to_partition_tasks(self.plan().as_ref(), py, &psets) + } +} + +#[cfg(feature = "python")] +#[pyclass] +struct StreamingPartitionIterator { + iter: Box> + Send>, +} + +#[cfg(feature = "python")] +#[pymethods] +impl StreamingPartitionIterator { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) -> PyResult> { + let iter = &mut slf.iter; + Ok(py.allow_threads(|| iter.next().transpose())?) + } +} + +impl_bincode_py_state_serialization!(PhysicalPlanScheduler); + +impl From for PhysicalPlanScheduler { + fn from(query_stage: QueryStageOutput) -> Self { + Self { + query_stage: Arc::new(query_stage), + } + } +} + +#[cfg(feature = "python")] +#[pyclass] +struct PartitionIterator { + parts: Vec, + index: usize, +} + +#[cfg(feature = "python")] +#[pymethods] +impl PartitionIterator { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + fn __next__(mut slf: PyRefMut<'_, Self>) -> Option { + let index = slf.index; + slf.index += 1; + slf.parts.get(index).map(|part| part.clone_ref(slf.py())) + } +} + +#[allow(clippy::too_many_arguments)] +#[cfg(feature = "python")] +fn tabular_write( + py: Python<'_>, + upstream_iter: PyObject, + file_format: &FileFormat, + schema: &SchemaRef, + root_dir: &String, + compression: &Option, + partition_cols: &Option>, + io_config: &Option, +) -> PyResult { + let part_cols = partition_cols.as_ref().map(|cols| { + cols.iter() + .map(|e| e.clone().into()) + .collect::>() + }); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "write_file"))? + .call1(( + upstream_iter, + file_format.clone(), + PySchema::from(schema.clone()), + root_dir, + compression.clone(), + part_cols, + io_config + .as_ref() + .map(|cfg| common_io_config::python::IOConfig { + config: cfg.clone(), + }), + ))?; + Ok(py_iter.into()) +} + +#[allow(clippy::too_many_arguments)] +#[cfg(feature = "python")] +fn iceberg_write( + py: Python<'_>, + upstream_iter: PyObject, + iceberg_info: &IcebergCatalogInfo, +) -> PyResult { + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "write_iceberg"))? + .call1(( + upstream_iter, + &iceberg_info.table_location, + &iceberg_info.iceberg_schema, + &iceberg_info.iceberg_properties, + iceberg_info.spec_id, + iceberg_info + .io_config + .as_ref() + .map(|cfg| common_io_config::python::IOConfig { + config: cfg.clone(), + }), + ))?; + Ok(py_iter.into()) +} + +#[allow(clippy::too_many_arguments)] +#[cfg(feature = "python")] +fn deltalake_write( + py: Python<'_>, + upstream_iter: PyObject, + delta_lake_info: &DeltaLakeCatalogInfo, +) -> PyResult { + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "write_deltalake"))? + .call1(( + upstream_iter, + &delta_lake_info.path, + delta_lake_info.large_dtypes, + delta_lake_info.version, + delta_lake_info + .io_config + .as_ref() + .map(|cfg| common_io_config::python::IOConfig { + config: cfg.clone(), + }), + ))?; + Ok(py_iter.into()) +} + +#[cfg(feature = "python")] +fn physical_plan_to_partition_tasks( + physical_plan: &PhysicalPlan, + py: Python<'_>, + psets: &HashMap>, +) -> PyResult { + match physical_plan { + PhysicalPlan::InMemoryScan(InMemoryScan { + in_memory_info: InMemoryInfo { cache_key, .. }, + .. + }) => { + let partition_iter = PartitionIterator { + parts: psets[cache_key].clone(), + index: 0usize, + }; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "partition_read"))? + .call1((partition_iter,))?; + Ok(py_iter.into()) + } + PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => { + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "scan_with_tasks"))? + .call1((scan_tasks + .iter() + .map(|scan_task| PyScanTask(scan_task.clone())) + .collect::>(),))?; + Ok(py_iter.into()) + } + PhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => { + let schema_mod = py.import(pyo3::intern!(py, "daft.logical.schema"))?; + let python_schema = schema_mod + .getattr(pyo3::intern!(py, "Schema"))? + .getattr(pyo3::intern!(py, "_from_pyschema"))? + .call1((PySchema { + schema: schema.clone(), + },))?; + + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "empty_scan"))? + .call1((python_schema,))?; + Ok(py_iter.into()) + } + + PhysicalPlan::Project(Project { + input, + projection, + resource_request, + .. + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let projection_pyexprs: Vec = projection + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "project"))? + .call1((upstream_iter, projection_pyexprs, resource_request.clone()))?; + Ok(py_iter.into()) + } + PhysicalPlan::Filter(Filter { input, predicate }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let expressions_mod = py.import(pyo3::intern!(py, "daft.expressions.expressions"))?; + let py_predicate = expressions_mod + .getattr(pyo3::intern!(py, "Expression"))? + .getattr(pyo3::intern!(py, "_from_pyexpr"))? + .call1((PyExpr::from(predicate.clone()),))?; + let expressions_projection = expressions_mod + .getattr(pyo3::intern!(py, "ExpressionsProjection"))? + .call1((vec![py_predicate],))?; + let execution_step_mod = + py.import(pyo3::intern!(py, "daft.execution.execution_step"))?; + let filter_step = execution_step_mod + .getattr(pyo3::intern!(py, "Filter"))? + .call1((expressions_projection,))?; + let resource_request = execution_step_mod + .getattr(pyo3::intern!(py, "ResourceRequest"))? + .call0()?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "pipeline_instruction"))? + .call1((upstream_iter, filter_step, resource_request))?; + Ok(py_iter.into()) + } + PhysicalPlan::Limit(Limit { + input, + limit, + eager, + num_partitions, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_physical_plan = py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?; + let global_limit_iter = py_physical_plan + .getattr(pyo3::intern!(py, "global_limit"))? + .call1((upstream_iter, *limit, *eager, *num_partitions))?; + Ok(global_limit_iter.into()) + } + PhysicalPlan::Explode(Explode { + input, to_explode, .. + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let explode_pyexprs: Vec = to_explode + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "explode"))? + .call1((upstream_iter, explode_pyexprs))?; + Ok(py_iter.into()) + } + PhysicalPlan::Unpivot(Unpivot { + input, + ids, + values, + variable_name, + value_name, + .. + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let ids_pyexprs: Vec = + ids.iter().map(|expr| PyExpr::from(expr.clone())).collect(); + let values_pyexprs: Vec = values + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "unpivot"))? + .call1(( + upstream_iter, + ids_pyexprs, + values_pyexprs, + variable_name, + value_name, + ))?; + Ok(py_iter.into()) + } + PhysicalPlan::Sample(Sample { + input, + fraction, + with_replacement, + seed, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "sample"))? + .call1((upstream_iter, *fraction, *with_replacement, *seed))?; + Ok(py_iter.into()) + } + PhysicalPlan::MonotonicallyIncreasingId(MonotonicallyIncreasingId { + input, + column_name, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "monotonically_increasing_id"))? + .call1((upstream_iter, column_name))?; + Ok(py_iter.into()) + } + PhysicalPlan::Sort(Sort { + input, + sort_by, + descending, + num_partitions, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let sort_by_pyexprs: Vec = sort_by + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "sort"))? + .call1(( + upstream_iter, + sort_by_pyexprs, + descending.clone(), + *num_partitions, + ))?; + Ok(py_iter.into()) + } + PhysicalPlan::Split(Split { + input, + input_num_partitions, + output_num_partitions, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "split"))? + .call1((upstream_iter, *input_num_partitions, *output_num_partitions))?; + Ok(py_iter.into()) + } + PhysicalPlan::Flatten(Flatten { input }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "flatten_plan"))? + .call1((upstream_iter,))?; + Ok(py_iter.into()) + } + PhysicalPlan::FanoutRandom(FanoutRandom { + input, + num_partitions, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "fanout_random"))? + .call1((upstream_iter, *num_partitions))?; + Ok(py_iter.into()) + } + PhysicalPlan::FanoutByHash(FanoutByHash { + input, + num_partitions, + partition_by, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let partition_by_pyexprs: Vec = partition_by + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "split_by_hash"))? + .call1((upstream_iter, *num_partitions, partition_by_pyexprs))?; + Ok(py_iter.into()) + } + PhysicalPlan::FanoutByRange(_) => unimplemented!( + "FanoutByRange not implemented, since only use case (sorting) doesn't need it yet." + ), + PhysicalPlan::ReduceMerge(ReduceMerge { input }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "reduce_merge"))? + .call1((upstream_iter,))?; + Ok(py_iter.into()) + } + PhysicalPlan::Aggregate(Aggregate { + aggregations, + groupby, + input, + .. + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let aggs_as_pyexprs: Vec = aggregations + .iter() + .map(|agg_expr| PyExpr::from(Expr::Agg(agg_expr.clone()))) + .collect(); + let groupbys_as_pyexprs: Vec = groupby + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "local_aggregate"))? + .call1((upstream_iter, aggs_as_pyexprs, groupbys_as_pyexprs))?; + Ok(py_iter.into()) + } + PhysicalPlan::Pivot(Pivot { + input, + group_by, + pivot_column, + value_column, + names, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let groupbys_as_pyexprs: Vec = group_by + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let pivot_column_pyexpr = PyExpr::from(pivot_column.clone()); + let value_column_pyexpr = PyExpr::from(value_column.clone()); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "pivot"))? + .call1(( + upstream_iter, + groupbys_as_pyexprs, + pivot_column_pyexpr, + value_column_pyexpr, + names.clone(), + ))?; + Ok(py_iter.into()) + } + PhysicalPlan::Coalesce(Coalesce { + input, + num_from, + num_to, + }) => { + let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "coalesce"))? + .call1((upstream_iter, *num_from, *num_to))?; + Ok(py_iter.into()) + } + PhysicalPlan::Concat(Concat { other, input }) => { + let upstream_input_iter = physical_plan_to_partition_tasks(input, py, psets)?; + let upstream_other_iter = physical_plan_to_partition_tasks(other, py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "concat"))? + .call1((upstream_input_iter, upstream_other_iter))?; + Ok(py_iter.into()) + } + PhysicalPlan::HashJoin(HashJoin { + left, + right, + left_on, + right_on, + join_type, + .. + }) => { + let upstream_left_iter = physical_plan_to_partition_tasks(left, py, psets)?; + let upstream_right_iter = physical_plan_to_partition_tasks(right, py, psets)?; + let left_on_pyexprs: Vec = left_on + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let right_on_pyexprs: Vec = right_on + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "hash_join"))? + .call1(( + upstream_left_iter, + upstream_right_iter, + left_on_pyexprs, + right_on_pyexprs, + *join_type, + ))?; + Ok(py_iter.into()) + } + PhysicalPlan::SortMergeJoin(SortMergeJoin { + left, + right, + left_on, + right_on, + join_type, + num_partitions, + left_is_larger, + needs_presort, + }) => { + let left_iter = physical_plan_to_partition_tasks(left, py, psets)?; + let right_iter = physical_plan_to_partition_tasks(right, py, psets)?; + let left_on_pyexprs: Vec = left_on + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let right_on_pyexprs: Vec = right_on + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + // TODO(Clark): Elide sorting one side of the join if already range-partitioned, where we'd use that side's boundaries to sort the other side. + let py_iter = if *needs_presort { + py.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "sort_merge_join_aligned_boundaries"))? + .call1(( + left_iter, + right_iter, + left_on_pyexprs, + right_on_pyexprs, + *join_type, + *num_partitions, + *left_is_larger, + ))? + } else { + py.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "merge_join_sorted"))? + .call1(( + left_iter, + right_iter, + left_on_pyexprs, + right_on_pyexprs, + *join_type, + *left_is_larger, + ))? + }; + Ok(py_iter.into()) + } + PhysicalPlan::BroadcastJoin(BroadcastJoin { + broadcaster: left, + receiver: right, + left_on, + right_on, + join_type, + is_swapped, + }) => { + let upstream_left_iter = physical_plan_to_partition_tasks(left, py, psets)?; + let upstream_right_iter = physical_plan_to_partition_tasks(right, py, psets)?; + let left_on_pyexprs: Vec = left_on + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let right_on_pyexprs: Vec = right_on + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "broadcast_join"))? + .call1(( + upstream_left_iter, + upstream_right_iter, + left_on_pyexprs, + right_on_pyexprs, + *join_type, + *is_swapped, + ))?; + Ok(py_iter.into()) + } + PhysicalPlan::TabularWriteParquet(TabularWriteParquet { + schema, + file_info: + OutputFileInfo { + root_dir, + file_format, + partition_cols, + compression, + io_config, + }, + input, + }) => tabular_write( + py, + physical_plan_to_partition_tasks(input, py, psets)?, + file_format, + schema, + root_dir, + compression, + partition_cols, + io_config, + ), + PhysicalPlan::TabularWriteCsv(TabularWriteCsv { + schema, + file_info: + OutputFileInfo { + root_dir, + file_format, + partition_cols, + compression, + io_config, + }, + input, + }) => tabular_write( + py, + physical_plan_to_partition_tasks(input, py, psets)?, + file_format, + schema, + root_dir, + compression, + partition_cols, + io_config, + ), + PhysicalPlan::TabularWriteJson(TabularWriteJson { + schema, + file_info: + OutputFileInfo { + root_dir, + file_format, + partition_cols, + compression, + io_config, + }, + input, + }) => tabular_write( + py, + physical_plan_to_partition_tasks(input, py, psets)?, + file_format, + schema, + root_dir, + compression, + partition_cols, + io_config, + ), + #[cfg(feature = "python")] + PhysicalPlan::IcebergWrite(IcebergWrite { + schema: _, + iceberg_info, + input, + }) => iceberg_write( + py, + physical_plan_to_partition_tasks(input, py, psets)?, + iceberg_info, + ), + #[cfg(feature = "python")] + PhysicalPlan::DeltaLakeWrite(DeltaLakeWrite { + schema: _, + delta_lake_info, + input, + }) => deltalake_write( + py, + physical_plan_to_partition_tasks(input, py, psets)?, + delta_lake_info, + ), + } +} diff --git a/src/lib.rs b/src/lib.rs index dd77645316..b3db2d092f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,6 +70,7 @@ pub mod pylib { daft_plan::register_modules(_py, m)?; daft_micropartition::register_modules(_py, m)?; daft_scan::register_modules(_py, m)?; + daft_scheduler::register_modules(_py, m)?; m.add_wrapped(wrap_pyfunction!(version))?; m.add_wrapped(wrap_pyfunction!(build_type))?; m.add_wrapped(wrap_pyfunction!(refresh_logger))?; diff --git a/tests/cookbook/conftest.py b/tests/cookbook/conftest.py index ff5b328ba7..c04f7ab208 100644 --- a/tests/cookbook/conftest.py +++ b/tests/cookbook/conftest.py @@ -10,7 +10,14 @@ from daft.expressions import col from tests.cookbook.assets import COOKBOOK_DATA_CSV -COLUMNS = ["Unique Key", "Complaint Type", "Borough", "Created Date", "Descriptor", "Closed Date"] +COLUMNS = [ + "Unique Key", + "Complaint Type", + "Borough", + "Created Date", + "Descriptor", + "Closed Date", +] CsvPathAndColumns = Tuple[str, List[str]] diff --git a/tests/cookbook/test_filter.py b/tests/cookbook/test_filter.py index f69361a731..2009c0f73d 100644 --- a/tests/cookbook/test_filter.py +++ b/tests/cookbook/test_filter.py @@ -15,16 +15,22 @@ pytest.param( lambda daft_df: ( daft_df.where(col("Complaint Type") == "Noise - Street/Sidewalk").select( - col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor") + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), ) ), id="where..select", ), pytest.param( lambda daft_df: ( - daft_df.select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")).where( - col("Complaint Type") == "Noise - Street/Sidewalk" - ) + daft_df.select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ).where(col("Complaint Type") == "Noise - Street/Sidewalk") ), id="select..where", ), @@ -53,13 +59,23 @@ def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_np | (col("Complaint Type") == "Noise - Commercial") ) & (col("Borough") == "BROOKLYN") - ).select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")) + ).select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ) ), id="where..select", ), pytest.param( lambda daft_df: ( - daft_df.select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")).where( + daft_df.select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ).where( ( (col("Complaint Type") == "Noise - Street/Sidewalk") | (col("Complaint Type") == "Noise - Commercial") @@ -71,7 +87,12 @@ def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_np ), pytest.param( lambda daft_df: ( - daft_df.select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")).where( + daft_df.select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ).where( (col("Borough") == "BROOKLYN") & ( (col("Complaint Type") == "Noise - Street/Sidewalk") @@ -105,21 +126,36 @@ def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repart lambda daft_df: ( daft_df.where(col("Complaint Type") == "Noise - Street/Sidewalk") .where(col("Borough") == "BROOKLYN") - .select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")) + .select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ) ), id="where..where..select", ), pytest.param( lambda daft_df: ( daft_df.where(col("Complaint Type") == "Noise - Street/Sidewalk") - .select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")) + .select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ) .where(col("Borough") == "BROOKLYN") ), id="where..select..where", ), pytest.param( lambda daft_df: ( - daft_df.select(col("Unique Key"), col("Complaint Type"), col("Borough"), col("Descriptor")) + daft_df.select( + col("Unique Key"), + col("Complaint Type"), + col("Borough"), + col("Descriptor"), + ) .where(col("Complaint Type") == "Noise - Street/Sidewalk") .where(col("Borough") == "BROOKLYN") ),