From c1a534377c09abb6e74231cd5c74a4de21af2304 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Thu, 26 Oct 2023 20:49:49 -0700 Subject: [PATCH 01/10] Remove Coalesce logical op. --- daft/dataframe/dataframe.py | 24 +++--------- daft/logical/builder.py | 8 ---- src/daft-plan/src/builder.rs | 10 ----- src/daft-plan/src/logical_ops/coalesce.rs | 17 -------- src/daft-plan/src/logical_ops/mod.rs | 2 - src/daft-plan/src/logical_plan.rs | 12 +----- .../optimization/rules/push_down_filter.rs | 20 +--------- .../src/optimization/rules/push_down_limit.rs | 24 +----------- .../rules/push_down_projection.rs | 1 - src/daft-plan/src/planner.rs | 39 ++++++++++--------- 10 files changed, 30 insertions(+), 127 deletions(-) delete mode 100644 src/daft-plan/src/logical_ops/coalesce.rs diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index ce1ebf6a35..44563be70e 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -657,24 +657,12 @@ def into_partitions(self, num: int) -> "DataFrame": Returns: DataFrame: Dataframe with ``num`` partitions. """ - current_partitions = self._builder.num_partitions() - - if num > current_partitions: - # Do a split (increase the number of partitions). - builder = self._builder.repartition( - num_partitions=num, - partition_by=[], - scheme=PartitionScheme.Unknown, - ) - return DataFrame(builder) - - elif num < current_partitions: - # Do a coalese (decrease the number of partitions). - builder = self._builder.coalesce(num) - return DataFrame(builder) - - else: - return self + builder = self._builder.repartition( + num_partitions=num, + partition_by=[], + scheme=PartitionScheme.Unknown, + ) + return DataFrame(builder) @DataframePublicAPI def join( diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 5c976eadb4..b05733c80b 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -140,14 +140,6 @@ def repartition( builder = self._builder.repartition(num_partitions, partition_by_pyexprs, scheme) return LogicalPlanBuilder(builder) - def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: - if num_partitions > self.num_partitions(): - raise ValueError( - f"Coalesce can only reduce the number of partitions: {num_partitions} vs {self.num_partitions}" - ) - builder = self._builder.coalesce(num_partitions) - return LogicalPlanBuilder(builder) - def agg( self, to_agg: list[tuple[Expression, str]], diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 005fcabdd0..0f850cbc97 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -149,12 +149,6 @@ impl LogicalPlanBuilder { Ok(logical_plan.into()) } - pub fn coalesce(&self, num_partitions: usize) -> DaftResult { - let logical_plan: LogicalPlan = - logical_ops::Coalesce::new(self.plan.clone(), num_partitions).into(); - Ok(logical_plan.into()) - } - pub fn distinct(&self) -> DaftResult { let logical_plan: LogicalPlan = logical_ops::Distinct::new(self.plan.clone()).into(); Ok(logical_plan.into()) @@ -351,10 +345,6 @@ impl PyLogicalPlanBuilder { .into()) } - pub fn coalesce(&self, num_partitions: usize) -> PyResult { - Ok(self.builder.coalesce(num_partitions)?.into()) - } - pub fn distinct(&self) -> PyResult { Ok(self.builder.distinct()?.into()) } diff --git a/src/daft-plan/src/logical_ops/coalesce.rs b/src/daft-plan/src/logical_ops/coalesce.rs deleted file mode 100644 index 41d3208eef..0000000000 --- a/src/daft-plan/src/logical_ops/coalesce.rs +++ /dev/null @@ -1,17 +0,0 @@ -use std::sync::Arc; - -use crate::LogicalPlan; - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct Coalesce { - // Upstream node. - pub input: Arc, - // Number of partitions to coalesce to. - pub num_to: usize, -} - -impl Coalesce { - pub(crate) fn new(input: Arc, num_to: usize) -> Self { - Self { input, num_to } - } -} diff --git a/src/daft-plan/src/logical_ops/mod.rs b/src/daft-plan/src/logical_ops/mod.rs index ad8e4130cf..662ade978c 100644 --- a/src/daft-plan/src/logical_ops/mod.rs +++ b/src/daft-plan/src/logical_ops/mod.rs @@ -1,5 +1,4 @@ mod agg; -mod coalesce; mod concat; mod distinct; mod explode; @@ -13,7 +12,6 @@ mod sort; mod source; pub use agg::Aggregate; -pub use coalesce::Coalesce; pub use concat::Concat; pub use distinct::Distinct; pub use explode::Explode; diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 5fba68d491..125b724cb4 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -18,7 +18,6 @@ pub enum LogicalPlan { Explode(Explode), Sort(Sort), Repartition(Repartition), - Coalesce(Coalesce), Distinct(Distinct), Aggregate(Aggregate), Concat(Concat), @@ -40,7 +39,6 @@ impl LogicalPlan { }) => exploded_schema.clone(), Self::Sort(Sort { input, .. }) => input.schema(), Self::Repartition(Repartition { input, .. }) => input.schema(), - Self::Coalesce(Coalesce { input, .. }) => input.schema(), Self::Distinct(Distinct { input, .. }) => input.schema(), Self::Aggregate(aggregate) => aggregate.schema(), Self::Concat(Concat { input, .. }) => input.schema(), @@ -52,7 +50,7 @@ impl LogicalPlan { pub fn required_columns(&self) -> Vec> { // TODO: https://github.com/Eventual-Inc/Daft/pull/1288#discussion_r1307820697 match self { - Self::Limit(..) | Self::Coalesce(..) => vec![IndexSet::new()], + Self::Limit(..) => vec![IndexSet::new()], Self::Concat(..) => vec![IndexSet::new(), IndexSet::new()], Self::Project(projection) => { let res = projection @@ -147,9 +145,6 @@ impl LogicalPlan { Some(partition_by.clone()), ) .into(), - Self::Coalesce(Coalesce { num_to, .. }) => { - PartitionSpec::new_internal(PartitionScheme::Unknown, *num_to, None).into() - } Self::Distinct(Distinct { input, .. }) => input.partition_spec(), Self::Aggregate(Aggregate { input, groupby, .. }) => { let input_partition_spec = input.partition_spec(); @@ -209,7 +204,6 @@ impl LogicalPlan { Self::Explode(Explode { input, .. }) => vec![input], Self::Sort(Sort { input, .. }) => vec![input], Self::Repartition(Repartition { input, .. }) => vec![input], - Self::Coalesce(Coalesce { input, .. }) => vec![input], Self::Distinct(Distinct { input, .. }) => vec![input], Self::Aggregate(Aggregate { input, .. }) => vec![input], Self::Concat(Concat { input, other }) => vec![input, other], @@ -230,7 +224,6 @@ impl LogicalPlan { Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()), Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone())), - Self::Coalesce(Coalesce { num_to, .. }) => Self::Coalesce(Coalesce::new(input.clone(), *num_to)), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), Self::Sink(Sink { schema, sink_info, .. }) => Self::Sink(Sink::new(input.clone(), schema.clone(), sink_info.clone())), @@ -271,7 +264,6 @@ impl LogicalPlan { Self::Explode(..) => "Explode", Self::Sort(..) => "Sort", Self::Repartition(..) => "Repartition", - Self::Coalesce(..) => "Coalesce", Self::Distinct(..) => "Distinct", Self::Aggregate(..) => "Aggregate", Self::Concat(..) => "Concat", @@ -299,7 +291,6 @@ impl LogicalPlan { } Self::Sort(sort) => sort.multiline_display(), Self::Repartition(repartition) => repartition.multiline_display(), - Self::Coalesce(Coalesce { num_to, .. }) => vec![format!("Coalesce: To = {num_to}")], Self::Distinct(_) => vec!["Distinct".to_string()], Self::Aggregate(aggregate) => aggregate.multiline_display(), Self::Concat(_) => vec!["Concat".to_string()], @@ -360,7 +351,6 @@ impl_from_data_struct_for_logical_plan!(Limit); impl_from_data_struct_for_logical_plan!(Explode); impl_from_data_struct_for_logical_plan!(Sort); impl_from_data_struct_for_logical_plan!(Repartition); -impl_from_data_struct_for_logical_plan!(Coalesce); impl_from_data_struct_for_logical_plan!(Distinct); impl_from_data_struct_for_logical_plan!(Aggregate); impl_from_data_struct_for_logical_plan!(Concat); diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index 8469abe5c6..ba1c6d59d2 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -135,7 +135,7 @@ impl OptimizerRule for PushDownFilter { post_projection_filter.into() } } - LogicalPlan::Sort(_) | LogicalPlan::Repartition(_) | LogicalPlan::Coalesce(_) => { + LogicalPlan::Sort(_) | LogicalPlan::Repartition(_) => { // Naive commuting with unary ops. let new_filter = plan.with_new_children(&[child_plan.children()[0].clone()]); child_plan.with_new_children(&[new_filter]) @@ -381,24 +381,6 @@ mod tests { Ok(()) } - /// Tests that Filter commutes with Coalesce. - #[test] - fn filter_commutes_with_coalesce() -> DaftResult<()> { - let plan = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Utf8), - ]) - .coalesce(1)? - .filter(col("a").lt(&lit(2)))? - .build(); - let expected = "\ - Coalesce: To = 1\ - \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; - assert_optimized_plan_eq(plan, expected)?; - Ok(()) - } - /// Tests that Filter commutes with Concat. #[test] fn filter_commutes_with_concat() -> DaftResult<()> { diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index e4baa1f0c1..2c19cc9bdc 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -29,9 +29,7 @@ impl OptimizerRule for PushDownLimit { // Naive commuting with unary ops. // // Limit-UnaryOp -> UnaryOp-Limit - LogicalPlan::Repartition(_) - | LogicalPlan::Coalesce(_) - | LogicalPlan::Project(_) => { + LogicalPlan::Repartition(_) | LogicalPlan::Project(_) => { let new_limit = plan.with_new_children(&[input.children()[0].clone()]); Ok(Transformed::Yes(input.with_new_children(&[new_limit]))) } @@ -209,26 +207,6 @@ mod tests { Ok(()) } - /// Tests that Limit commutes with Coalesce. - /// - /// Limit-Coalesce-Source -> Coalesce-Source[with_limit] - #[test] - fn limit_commutes_with_coalesce() -> DaftResult<()> { - let plan = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Utf8), - ]) - .coalesce(1)? - .limit(5, false)? - .build(); - let expected = "\ - Coalesce: To = 1\ - \n Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; - assert_optimized_plan_eq(plan, expected)?; - Ok(()) - } - /// Tests that Limit commutes with Projections. /// /// Limit-Project-Source -> Project-Source[with_limit] diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index e113af4eb1..9249b50abc 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -235,7 +235,6 @@ impl PushDownProjection { } LogicalPlan::Sort(..) | LogicalPlan::Repartition(..) - | LogicalPlan::Coalesce(..) | LogicalPlan::Limit(..) | LogicalPlan::Filter(..) | LogicalPlan::Explode(..) => { diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index bf35db6cda..13f0609dcf 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -6,10 +6,10 @@ use daft_core::count_mode::CountMode; use daft_dsl::Expr; use crate::logical_ops::{ - Aggregate as LogicalAggregate, Coalesce as LogicalCoalesce, Concat as LogicalConcat, - Distinct as LogicalDistinct, Explode as LogicalExplode, Filter as LogicalFilter, - Join as LogicalJoin, Limit as LogicalLimit, Project as LogicalProject, - Repartition as LogicalRepartition, Sink as LogicalSink, Sort as LogicalSort, Source, + Aggregate as LogicalAggregate, Concat as LogicalConcat, Distinct as LogicalDistinct, + Explode as LogicalExplode, Filter as LogicalFilter, Join as LogicalJoin, Limit as LogicalLimit, + Project as LogicalProject, Repartition as LogicalRepartition, Sink as LogicalSink, + Sort as LogicalSort, Source, }; use crate::logical_plan::LogicalPlan; use crate::physical_plan::PhysicalPlan; @@ -147,12 +147,23 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { let input_physical = Arc::new(plan(input)?); match scheme { PartitionScheme::Unknown => { - let split_op = PhysicalPlan::Split(Split::new( - input_physical, - input.partition_spec().num_partitions, - *num_partitions, - )); - Ok(PhysicalPlan::Flatten(Flatten::new(split_op.into()))) + let input_num_partitions = input.partition_spec().num_partitions; + if *num_partitions > input_num_partitions { + // Split input partitions into num_partitions. + let split_op = PhysicalPlan::Split(Split::new( + input_physical, + input_num_partitions, + *num_partitions, + )); + Ok(PhysicalPlan::Flatten(Flatten::new(split_op.into()))) + } else { + // Coalesce input partitions into num_partitions. + Ok(PhysicalPlan::Coalesce(Coalesce::new( + input_physical, + input_num_partitions, + *num_partitions, + ))) + } } PartitionScheme::Random => { let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( @@ -172,14 +183,6 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { PartitionScheme::Range => unreachable!("Repartitioning by range is not supported"), } } - LogicalPlan::Coalesce(LogicalCoalesce { input, num_to }) => { - let input_physical = plan(input)?; - Ok(PhysicalPlan::Coalesce(Coalesce::new( - input_physical.into(), - input.partition_spec().num_partitions, - *num_to, - ))) - } LogicalPlan::Distinct(LogicalDistinct { input }) => { let input_physical = plan(input)?; let col_exprs = input From 6449328793afe6c41429d5ce9cf9a0a2fc916db1 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Thu, 26 Oct 2023 21:17:39 -0700 Subject: [PATCH 02/10] Push partition spec into physical plan. --- daft/daft.pyi | 8 +- daft/dataframe/dataframe.py | 36 +-- daft/io/file_path.py | 4 +- daft/logical/builder.py | 25 +- daft/runners/partitioning.py | 3 + src/daft-plan/src/builder.rs | 42 +-- src/daft-plan/src/logical_ops/project.rs | 285 +----------------- src/daft-plan/src/logical_ops/repartition.rs | 6 +- src/daft-plan/src/logical_ops/source.rs | 11 +- src/daft-plan/src/logical_plan.rs | 78 +---- src/daft-plan/src/optimization/optimizer.rs | 4 +- src/daft-plan/src/optimization/rules/mod.rs | 4 +- .../optimization/rules/push_down_filter.rs | 2 +- .../src/optimization/rules/push_down_limit.rs | 2 +- .../rules/push_down_projection.rs | 9 +- src/daft-plan/src/physical_ops/project.rs | 165 +++++++++- src/daft-plan/src/physical_plan.rs | 114 ++++++- src/daft-plan/src/planner.rs | 115 ++++--- src/daft-plan/src/source_info/mod.rs | 9 +- tests/dataframe/test_accessors.py | 7 - 20 files changed, 412 insertions(+), 517 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 8444e5c1fd..b67797ba37 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -805,7 +805,7 @@ class LogicalPlanBuilder: @staticmethod def in_memory_scan( - partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, partition_spec: PartitionSpec + partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int ) -> LogicalPlanBuilder: ... @staticmethod def table_scan( @@ -817,7 +817,10 @@ class LogicalPlanBuilder: def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ... def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ... def repartition( - self, num_partitions: int, partition_by: list[PyExpr], scheme: PartitionScheme + self, + partition_by: list[PyExpr], + scheme: PartitionScheme, + num_partitions: int | None, ) -> LogicalPlanBuilder: ... def coalesce(self, num_partitions: int) -> LogicalPlanBuilder: ... def distinct(self) -> LogicalPlanBuilder: ... @@ -834,7 +837,6 @@ class LogicalPlanBuilder: compression: str | None = None, ) -> LogicalPlanBuilder: ... def schema(self) -> PySchema: ... - def partition_spec(self) -> PartitionSpec: ... def optimize(self) -> LogicalPlanBuilder: ... def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: ... def repr_ascii(self, simple: bool) -> str: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 44563be70e..cd7f458378 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -24,13 +24,7 @@ from daft.api_annotations import DataframePublicAPI from daft.context import get_context from daft.convert import InputListType -from daft.daft import ( - FileFormat, - JoinType, - PartitionScheme, - PartitionSpec, - ResourceRequest, -) +from daft.daft import FileFormat, JoinType, PartitionScheme, ResourceRequest from daft.dataframe.preview import DataFramePreview from daft.datatype import DataType from daft.errors import ExpressionTypeError @@ -89,8 +83,11 @@ def _builder(self) -> LogicalPlanBuilder: if self._result_cache is None: return self.__builder else: + num_partitions = self._result_cache.num_partitions() + # Partition set should always be set on cache entry. + assert num_partitions is not None return self.__builder.from_in_memory_scan( - self._result_cache, self.__builder.schema(), self.__builder.partition_spec() + self._result_cache, self.__builder.schema(), num_partitions=num_partitions ) def _get_current_builder(self) -> LogicalPlanBuilder: @@ -126,9 +123,6 @@ def explain(self, show_optimized: bool = False, simple=False) -> None: builder = builder.optimize() print(builder.pretty_print(simple)) - def num_partitions(self) -> int: - return self.__builder.num_partitions() - @DataframePublicAPI def schema(self) -> Schema: """Returns the Schema of the DataFrame, which provides information about each column @@ -306,7 +300,7 @@ def _from_tables(cls, *parts: Table) -> "DataFrame": context = get_context() cache_entry = context.runner().put_partition_set_into_cache(result_pset) - builder = LogicalPlanBuilder.from_in_memory_scan(cache_entry, parts[0].schema()) + builder = LogicalPlanBuilder.from_in_memory_scan(cache_entry, parts[0].schema(), result_pset.num_partitions()) return cls(builder) ### @@ -345,7 +339,7 @@ def write_parquet( cols = self.__column_input_to_expression(tuple(partition_cols)) for c in cols: assert c._is_column(), "we cant support non Column Expressions for partition writing" - self.repartition(self.num_partitions(), *cols) + self.repartition(None, *cols) else: pass builder = self._builder.write_tabular( @@ -386,7 +380,7 @@ def write_csv( cols = self.__column_input_to_expression(tuple(partition_cols)) for c in cols: assert c._is_column(), "we cant support non Column Expressions for partition writing" - self.repartition(self.num_partitions(), *cols) + self.repartition(None, *cols) else: pass builder = self._builder.write_tabular( @@ -614,7 +608,7 @@ def count_rows(self) -> int: return count_df.to_pydict()["count"][0] @DataframePublicAPI - def repartition(self, num: int, *partition_by: ColumnInputType) -> "DataFrame": + def repartition(self, num: Optional[int], *partition_by: ColumnInputType) -> "DataFrame": """Repartitions DataFrame to ``num`` partitions If columns are passed in, then DataFrame will be repartitioned by those, otherwise @@ -625,8 +619,8 @@ def repartition(self, num: int, *partition_by: ColumnInputType) -> "DataFrame": >>> part_by_df = df.repartition(4, 'x', col('y') + 1) Args: - num (int): number of target partitions. - *partition_by (Union[str, Expression]): optional columns to partition by. + num (Optional[int]): Number of target partitions; if None, the number of partitions will not be changed. + *partition_by (Union[str, Expression]): Optional columns to partition by. Returns: DataFrame: Repartitioned DataFrame. @@ -1164,9 +1158,7 @@ def _from_ray_dataset(cls, ds: "RayDataset") -> "DataFrame": partition_set, schema = ray_runner_io.partition_set_from_ray_dataset(ds) cache_entry = context.runner().put_partition_set_into_cache(partition_set) builder = LogicalPlanBuilder.from_in_memory_scan( - cache_entry, - schema=schema, - partition_spec=PartitionSpec(PartitionScheme.Unknown, partition_set.num_partitions()), + cache_entry, schema=schema, num_partitions=partition_set.num_partitions() ) return cls(builder) @@ -1233,9 +1225,7 @@ def _from_dask_dataframe(cls, ddf: "dask.DataFrame") -> "DataFrame": partition_set, schema = ray_runner_io.partition_set_from_dask_dataframe(ddf) cache_entry = context.runner().put_partition_set_into_cache(partition_set) builder = LogicalPlanBuilder.from_in_memory_scan( - cache_entry, - schema=schema, - partition_spec=PartitionSpec(PartitionScheme.Unknown, partition_set.num_partitions()), + cache_entry, schema=schema, num_partitions=partition_set.num_partitions() ) return cls(builder) diff --git a/daft/io/file_path.py b/daft/io/file_path.py index bb3cdb217b..2cfd3388c3 100644 --- a/daft/io/file_path.py +++ b/daft/io/file_path.py @@ -5,7 +5,7 @@ from daft.api_annotations import PublicAPI from daft.context import get_context -from daft.daft import IOConfig, PartitionScheme, PartitionSpec +from daft.daft import IOConfig from daft.dataframe import DataFrame from daft.logical.builder import LogicalPlanBuilder from daft.runners.pyrunner import LocalPartitionSet @@ -51,6 +51,6 @@ def from_glob_path(path: str, io_config: Optional[IOConfig] = None) -> DataFrame builder = LogicalPlanBuilder.from_in_memory_scan( cache_entry, schema=file_infos_table.schema(), - partition_spec=PartitionSpec(PartitionScheme.Unknown, partition.num_partitions()), + num_partitions=partition.num_partitions(), ) return DataFrame(builder) diff --git a/daft/logical/builder.py b/daft/logical/builder.py index b05733c80b..311c5e29fd 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -5,7 +5,7 @@ from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder -from daft.daft import PartitionScheme, PartitionSpec, ResourceRequest, StorageConfig +from daft.daft import PartitionScheme, ResourceRequest, StorageConfig from daft.expressions import Expression, col from daft.logical.schema import Schema from daft.runners.partitioning import PartitionCacheEntry @@ -40,19 +40,6 @@ def schema(self) -> Schema: pyschema = self._builder.schema() return Schema._from_pyschema(pyschema) - def partition_spec(self) -> PartitionSpec: - """ - Partition spec for the current logical plan. - """ - # TODO(Clark): Push PartitionSpec into planner. - return self._builder.partition_spec() - - def num_partitions(self) -> int: - """ - Number of partitions for the current logical plan. - """ - return self.partition_spec().num_partitions - def pretty_print(self, simple: bool = False) -> str: """ Pretty prints the current underlying logical plan. @@ -74,11 +61,9 @@ def optimize(self) -> LogicalPlanBuilder: @classmethod def from_in_memory_scan( - cls, partition: PartitionCacheEntry, schema: Schema, partition_spec: PartitionSpec | None = None + cls, partition: PartitionCacheEntry, schema: Schema, num_partitions: int ) -> LogicalPlanBuilder: - if partition_spec is None: - partition_spec = PartitionSpec(scheme=PartitionScheme.Unknown, num_partitions=1) - builder = _LogicalPlanBuilder.in_memory_scan(partition.key, partition, schema._schema, partition_spec) + builder = _LogicalPlanBuilder.in_memory_scan(partition.key, partition, schema._schema, num_partitions) return cls(builder) @classmethod @@ -134,10 +119,10 @@ def sort(self, sort_by: list[Expression], descending: list[bool] | bool = False) return LogicalPlanBuilder(builder) def repartition( - self, num_partitions: int, partition_by: list[Expression], scheme: PartitionScheme + self, num_partitions: int | None, partition_by: list[Expression], scheme: PartitionScheme ) -> LogicalPlanBuilder: partition_by_pyexprs = [expr._expr for expr in partition_by] - builder = self._builder.repartition(num_partitions, partition_by_pyexprs, scheme) + builder = self._builder.repartition(partition_by_pyexprs, scheme, num_partitions=num_partitions) return LogicalPlanBuilder(builder) def agg( diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 4a496ab0e1..6b6f2f001e 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -170,6 +170,9 @@ def __setstate__(self, key): self.key = key self.value = None + def num_partitions(self) -> int | None: + return self.value.num_partitions() if self.value is not None else None + class PartitionSetCache: def __init__(self) -> None: diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 0f850cbc97..1a5645a640 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -10,7 +10,7 @@ use crate::{ ExternalInfo as ExternalSourceInfo, FileFormatConfig, FileInfos as InputFileInfos, PyStorageConfig, SourceInfo, StorageConfig, }, - FileFormat, JoinType, PartitionScheme, PartitionSpec, PhysicalPlanScheduler, ResourceRequest, + FileFormat, JoinType, PartitionScheme, PhysicalPlanScheduler, ResourceRequest, }; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; @@ -51,20 +51,16 @@ impl LogicalPlanBuilder { partition_key: &str, cache_entry: PyObject, schema: Arc, - partition_spec: PartitionSpec, + num_partitions: usize, ) -> DaftResult { let source_info = SourceInfo::InMemoryInfo(InMemoryInfo::new( schema.clone(), partition_key.into(), cache_entry, + num_partitions, )); - let logical_plan: LogicalPlan = logical_ops::Source::new( - schema.clone(), - source_info.into(), - partition_spec.clone().into(), - None, - ) - .into(); + let logical_plan: LogicalPlan = + logical_ops::Source::new(schema.clone(), source_info.into(), None).into(); Ok(logical_plan.into()) } @@ -84,22 +80,14 @@ impl LogicalPlanBuilder { storage_config: Arc, limit: Option, ) -> DaftResult { - let num_partitions = file_infos.len(); let source_info = SourceInfo::ExternalInfo(ExternalSourceInfo::new( schema.clone(), file_infos.into(), file_format_config, storage_config, )); - let partition_spec = - PartitionSpec::new_internal(PartitionScheme::Unknown, num_partitions, None); - let logical_plan: LogicalPlan = logical_ops::Source::new( - schema.clone(), - source_info.into(), - partition_spec.into(), - limit, - ) - .into(); + let logical_plan: LogicalPlan = + logical_ops::Source::new(schema.clone(), source_info.into(), limit).into(); Ok(logical_plan.into()) } @@ -139,7 +127,7 @@ impl LogicalPlanBuilder { pub fn repartition( &self, - num_partitions: usize, + num_partitions: Option, partition_by: Vec, scheme: PartitionScheme, ) -> DaftResult { @@ -225,10 +213,6 @@ impl LogicalPlanBuilder { self.plan.schema() } - pub fn partition_spec(&self) -> PartitionSpec { - self.plan.partition_spec().as_ref().clone() - } - pub fn repr_ascii(&self, simple: bool) -> String { self.plan.repr_ascii(simple) } @@ -266,13 +250,13 @@ impl PyLogicalPlanBuilder { partition_key: &str, cache_entry: &PyAny, schema: PySchema, - partition_spec: PartitionSpec, + num_partitions: usize, ) -> PyResult { Ok(LogicalPlanBuilder::in_memory_scan( partition_key, cache_entry.to_object(cache_entry.py()), schema.into(), - partition_spec, + num_partitions, )? .into()) } @@ -331,9 +315,9 @@ impl PyLogicalPlanBuilder { pub fn repartition( &self, - num_partitions: usize, partition_by: Vec, scheme: PartitionScheme, + num_partitions: Option, ) -> PyResult { let partition_by_exprs: Vec = partition_by .iter() @@ -405,10 +389,6 @@ impl PyLogicalPlanBuilder { Ok(self.builder.schema().into()) } - pub fn partition_spec(&self) -> PyResult { - Ok(self.builder.partition_spec()) - } - /// Optimize the underlying logical plan, returning a new plan builder containing the optimized plan. pub fn optimize(&self) -> PyResult { let optimizer = Optimizer::new(Default::default()); diff --git a/src/daft-plan/src/logical_ops/project.rs b/src/daft-plan/src/logical_ops/project.rs index f92ca58b11..5901c47523 100644 --- a/src/daft-plan/src/logical_ops/project.rs +++ b/src/daft-plan/src/logical_ops/project.rs @@ -8,7 +8,7 @@ use snafu::ResultExt; use crate::logical_plan::{CreationSnafu, Result}; use crate::optimization::Transformed; -use crate::{LogicalPlan, PartitionScheme, PartitionSpec, ResourceRequest}; +use crate::{LogicalPlan, ResourceRequest}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Project { @@ -17,7 +17,6 @@ pub struct Project { pub projection: Vec, pub resource_request: ResourceRequest, pub projected_schema: SchemaRef, - pub partition_spec: Arc, } impl Project { @@ -39,180 +38,23 @@ impl Project { .context(CreationSnafu)?; Schema::new(fields).context(CreationSnafu)?.into() }; - let partition_spec = - Self::translate_partition_spec(factored_input.partition_spec(), &factored_projection); Ok(Self { input: factored_input, projection: factored_projection, resource_request, projected_schema, - partition_spec, }) } pub fn multiline_display(&self) -> Vec { - vec![ - format!( - "Project: {}", - self.projection - .iter() - .map(|e| e.to_string()) - .collect::>() - .join(", ") - ), - format!("Partition spec = {:?}", self.partition_spec), - ] - } - - fn translate_partition_spec( - input_pspec: Arc, - projection: &Vec, - ) -> Arc { - // Given an input partition spec, and a new projection, - // produce the new partition spec. - - use crate::PartitionScheme::*; - match input_pspec.scheme { - // If the scheme is vacuous, the result partiiton spec is the same. - Random | Unknown => input_pspec.clone(), - // Otherwise, need to reevaluate the partition scheme for each expression. - Range | Hash => { - // See what columns the projection directly translates into new columns. - let mut old_colname_to_new_colname = IndexMap::new(); - for expr in projection { - if let Some(oldname) = expr.input_mapping() { - let newname = expr.name().unwrap().to_string(); - // Add the oldname -> newname mapping, - // but don't overwrite any existing identity mappings (e.g. "a" -> "a"). - if old_colname_to_new_colname.get(&oldname) != Some(&oldname) { - old_colname_to_new_colname.insert(oldname, newname); - } - } - } - - // Then, see if we can fully translate the partition spec. - let maybe_new_pspec = input_pspec - .by - .as_ref() - .unwrap() - .iter() - .map(|e| Self::translate_partition_spec_expr(e, &old_colname_to_new_colname)) - .collect::, _>>(); - maybe_new_pspec.map_or_else( - |()| { - PartitionSpec::new_internal( - PartitionScheme::Unknown, - input_pspec.num_partitions, - None, - ) - .into() - }, - |new_pspec: Vec| { - PartitionSpec::new_internal( - input_pspec.scheme.clone(), - input_pspec.num_partitions, - Some(new_pspec), - ) - .into() - }, - ) - } - } - } - - fn translate_partition_spec_expr( - pspec_expr: &Expr, - old_colname_to_new_colname: &IndexMap, - ) -> std::result::Result { - // Given a single expression of an input partition spec, - // translate it to a new expression in the given projection. - // Returns: - // - Ok(expr) with expr being the translation, or - // - Err(()) if no translation is possible in the new projection. - - match pspec_expr { - Expr::Column(name) => match old_colname_to_new_colname.get(name.as_ref()) { - Some(newname) => Ok(Expr::Column(newname.as_str().into())), - None => Err(()), - }, - Expr::Literal(_) => Ok(pspec_expr.clone()), - Expr::Alias(child, name) => { - let newchild = Self::translate_partition_spec_expr( - child.as_ref(), - old_colname_to_new_colname, - )?; - Ok(Expr::Alias(newchild.into(), name.clone())) - } - Expr::BinaryOp { op, left, right } => { - let newleft = - Self::translate_partition_spec_expr(left.as_ref(), old_colname_to_new_colname)?; - let newright = Self::translate_partition_spec_expr( - right.as_ref(), - old_colname_to_new_colname, - )?; - Ok(Expr::BinaryOp { - op: *op, - left: newleft.into(), - right: newright.into(), - }) - } - Expr::Cast(child, dtype) => { - let newchild = Self::translate_partition_spec_expr( - child.as_ref(), - old_colname_to_new_colname, - )?; - Ok(Expr::Cast(newchild.into(), dtype.clone())) - } - Expr::Function { func, inputs } => { - let new_inputs = inputs - .iter() - .map(|e| Self::translate_partition_spec_expr(e, old_colname_to_new_colname)) - .collect::, _>>()?; - Ok(Expr::Function { - func: func.clone(), - inputs: new_inputs, - }) - } - Expr::Not(child) => { - let newchild = Self::translate_partition_spec_expr( - child.as_ref(), - old_colname_to_new_colname, - )?; - Ok(Expr::Not(newchild.into())) - } - Expr::IsNull(child) => { - let newchild = Self::translate_partition_spec_expr( - child.as_ref(), - old_colname_to_new_colname, - )?; - Ok(Expr::IsNull(newchild.into())) - } - Expr::IfElse { - if_true, - if_false, - predicate, - } => { - let newtrue = Self::translate_partition_spec_expr( - if_true.as_ref(), - old_colname_to_new_colname, - )?; - let newfalse = Self::translate_partition_spec_expr( - if_false.as_ref(), - old_colname_to_new_colname, - )?; - let newpred = Self::translate_partition_spec_expr( - predicate.as_ref(), - old_colname_to_new_colname, - )?; - Ok(Expr::IfElse { - if_true: newtrue.into(), - if_false: newfalse.into(), - predicate: newpred.into(), - }) - } - // Cannot have agg exprs in partition specs. - Expr::Agg(_) => Err(()), - } + vec![format!( + "Project: {}", + self.projection + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + )] } fn try_factor_subexpressions( @@ -526,11 +368,9 @@ fn replace_column_with_semantic_id_aggexpr( mod tests { use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; - use daft_dsl::{binary_op, col, lit, Expr, Operator}; + use daft_dsl::{binary_op, col, lit, Operator}; - use crate::{ - logical_ops::Project, test::dummy_scan_node, LogicalPlan, PartitionScheme, PartitionSpec, - }; + use crate::{logical_ops::Project, test::dummy_scan_node, LogicalPlan}; /// Test that nested common subexpressions are correctly split /// into multiple levels of projections. @@ -640,107 +480,4 @@ mod tests { Ok(()) } - - /// Test that projections preserving column inputs, even through aliasing, - /// do not destroy the partition spec. - #[test] - fn test_partition_spec_preserving() -> DaftResult<()> { - let source = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Int64), - Field::new("c", DataType::Int64), - ]) - .repartition( - 3, - vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, - )? - .build(); - - let expressions = vec![ - (col("a") % lit(2)), // this is now "a" - col("b"), - col("a").alias("aa"), - ]; - - let result_projection = Project::try_new(source, expressions, Default::default())?; - - let expected_pspec = - PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("aa"), col("b")])); - - assert_eq!( - expected_pspec, - result_projection.partition_spec.as_ref().clone() - ); - - Ok(()) - } - - /// Test that projections destroying even a single column input from the partition spec - /// destroys the entire partition spec. - #[test] - fn test_partition_spec_destroying() -> DaftResult<()> { - let source = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Int64), - Field::new("c", DataType::Int64), - ]) - .repartition( - 3, - vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, - )? - .build(); - - let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Unknown, 3, None); - - let test_cases = vec![ - vec![col("a"), col("c").alias("b")], // original "b" is gone even though "b" is present - vec![col("b")], // original "a" dropped - vec![col("a") % lit(2), col("b")], // original "a" gone - vec![col("c")], // everything gone - ]; - - for projection in test_cases { - let result_projection = - Project::try_new(source.clone(), projection, Default::default())?; - assert_eq!( - expected_pspec, - result_projection.partition_spec.as_ref().clone() - ); - } - - Ok(()) - } - - /// Test that new partition specs favor existing instead of new names. - /// i.e. ("a", "a" as "b") remains partitioned by "a", not "b" - #[test] - fn test_partition_spec_prefer_existing_names() -> DaftResult<()> { - let source = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Int64), - Field::new("c", DataType::Int64), - ]) - .repartition( - 3, - vec![Expr::Column("a".into()), Expr::Column("b".into())], - PartitionScheme::Hash, - )? - .build(); - - let expressions = vec![col("a").alias("y"), col("a"), col("a").alias("z"), col("b")]; - - let result_projection = Project::try_new(source, expressions, Default::default())?; - - let expected_pspec = - PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("a"), col("b")])); - - assert_eq!( - expected_pspec, - result_projection.partition_spec.as_ref().clone() - ); - - Ok(()) - } } diff --git a/src/daft-plan/src/logical_ops/repartition.rs b/src/daft-plan/src/logical_ops/repartition.rs index c10e7040c1..de21600fa8 100644 --- a/src/daft-plan/src/logical_ops/repartition.rs +++ b/src/daft-plan/src/logical_ops/repartition.rs @@ -8,7 +8,7 @@ use crate::{LogicalPlan, PartitionScheme}; pub struct Repartition { // Upstream node. pub input: Arc, - pub num_partitions: usize, + pub num_partitions: Option, pub partition_by: Vec, pub scheme: PartitionScheme, } @@ -16,7 +16,7 @@ pub struct Repartition { impl Repartition { pub(crate) fn new( input: Arc, - num_partitions: usize, + num_partitions: Option, partition_by: Vec, scheme: PartitionScheme, ) -> Self { @@ -31,7 +31,7 @@ impl Repartition { pub fn multiline_display(&self) -> Vec { let mut res = vec![]; res.push(format!("Repartition: Scheme = {:?}", self.scheme)); - res.push(format!("Number of partitions = {}", self.num_partitions)); + res.push(format!("Number of partitions = {:?}", self.num_partitions)); if !self.partition_by.is_empty() { res.push(format!( "Partition by = {}", diff --git a/src/daft-plan/src/logical_ops/source.rs b/src/daft-plan/src/logical_ops/source.rs index 1537265348..7e3738dd05 100644 --- a/src/daft-plan/src/logical_ops/source.rs +++ b/src/daft-plan/src/logical_ops/source.rs @@ -3,10 +3,7 @@ use std::sync::Arc; use daft_core::schema::SchemaRef; use daft_dsl::ExprRef; -use crate::{ - source_info::{ExternalInfo, SourceInfo}, - PartitionSpec, -}; +use crate::source_info::{ExternalInfo, SourceInfo}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Source { @@ -17,8 +14,6 @@ pub struct Source { /// Information about the source data location. pub source_info: Arc, - pub partition_spec: Arc, - /// Optional filters to apply to the source data. pub filters: Vec, /// Optional number of rows to read. @@ -29,13 +24,11 @@ impl Source { pub(crate) fn new( output_schema: SchemaRef, source_info: Arc, - partition_spec: Arc, limit: Option, ) -> Self { Self { output_schema, source_info, - partition_spec, limit, filters: vec![], // Will be populated by plan optimizer. } @@ -45,7 +38,6 @@ impl Source { Self { output_schema: self.output_schema.clone(), source_info: self.source_info.clone(), - partition_spec: self.partition_spec.clone(), filters: self.filters.clone(), limit, } @@ -55,7 +47,6 @@ impl Source { Self { output_schema: self.output_schema.clone(), source_info: self.source_info.clone(), - partition_spec: self.partition_spec.clone(), filters, limit: self.limit, } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 125b724cb4..dfccd30458 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -1,4 +1,4 @@ -use std::{cmp::max, num::NonZeroUsize, sync::Arc}; +use std::{num::NonZeroUsize, sync::Arc}; use common_error::DaftError; use daft_core::schema::SchemaRef; @@ -6,7 +6,7 @@ use daft_dsl::{optimization::get_required_columns, Expr}; use indexmap::IndexSet; use snafu::Snafu; -use crate::{display::TreeDisplay, logical_ops::*, PartitionScheme, PartitionSpec}; +use crate::{display::TreeDisplay, logical_ops::*}; /// Logical plan for a Daft query. #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -121,80 +121,6 @@ impl LogicalPlan { } } - pub fn partition_spec(&self) -> Arc { - match self { - Self::Source(Source { partition_spec, .. }) => partition_spec.clone(), - Self::Project(Project { partition_spec, .. }) => partition_spec.clone(), - Self::Filter(Filter { input, .. }) => input.partition_spec(), - Self::Limit(Limit { input, .. }) => input.partition_spec(), - Self::Explode(Explode { input, .. }) => input.partition_spec(), - Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal( - PartitionScheme::Range, - input.partition_spec().num_partitions, - Some(sort_by.clone()), - ) - .into(), - Self::Repartition(Repartition { - num_partitions, - partition_by, - scheme, - .. - }) => PartitionSpec::new_internal( - scheme.clone(), - *num_partitions, - Some(partition_by.clone()), - ) - .into(), - Self::Distinct(Distinct { input, .. }) => input.partition_spec(), - Self::Aggregate(Aggregate { input, groupby, .. }) => { - let input_partition_spec = input.partition_spec(); - if input_partition_spec.num_partitions == 1 { - input_partition_spec.clone() - } else if groupby.is_empty() { - PartitionSpec::new_internal(PartitionScheme::Unknown, 1, None).into() - } else { - PartitionSpec::new_internal( - PartitionScheme::Hash, - input.partition_spec().num_partitions, - Some(groupby.clone()), - ) - .into() - } - } - Self::Concat(Concat { input, other }) => PartitionSpec::new_internal( - PartitionScheme::Unknown, - input.partition_spec().num_partitions + other.partition_spec().num_partitions, - None, - ) - .into(), - Self::Join(Join { - left, - right, - left_on, - .. - }) => { - let input_partition_spec = left.partition_spec(); - match max( - input_partition_spec.num_partitions, - right.partition_spec().num_partitions, - ) { - // NOTE: This duplicates the repartitioning logic in the planner, where we - // conditionally repartition the left and right tables. - // TODO(Clark): Consolidate this logic with the planner logic when we push the partition spec - // to be an entirely planner-side concept. - 1 => input_partition_spec, - num_partitions => PartitionSpec::new_internal( - PartitionScheme::Hash, - num_partitions, - Some(left_on.clone()), - ) - .into(), - } - } - Self::Sink(Sink { input, .. }) => input.partition_spec(), - } - } - pub fn children(&self) -> Vec<&Arc> { match self { Self::Source(..) => vec![], diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index 7e1cc7d9e1..0456a9ccb3 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -7,8 +7,7 @@ use crate::LogicalPlan; use super::{ logical_plan_tracker::LogicalPlanTracker, rules::{ - ApplyOrder, DropRepartition, OptimizerRule, PushDownFilter, PushDownLimit, - PushDownProjection, Transformed, + ApplyOrder, OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, Transformed, }, }; @@ -131,7 +130,6 @@ impl Optimizer { // Default rule batches. let rule_batches: Vec = vec![RuleBatch::new( vec![ - Box::new(DropRepartition::new()), Box::new(PushDownFilter::new()), Box::new(PushDownProjection::new()), Box::new(PushDownLimit::new()), diff --git a/src/daft-plan/src/optimization/rules/mod.rs b/src/daft-plan/src/optimization/rules/mod.rs index fd1578af73..80616352b0 100644 --- a/src/daft-plan/src/optimization/rules/mod.rs +++ b/src/daft-plan/src/optimization/rules/mod.rs @@ -1,11 +1,11 @@ -mod drop_repartition; +// mod drop_repartition; mod push_down_filter; mod push_down_limit; mod push_down_projection; mod rule; mod utils; -pub use drop_repartition::DropRepartition; +// pub use drop_repartition::DropRepartition; pub use push_down_filter::PushDownFilter; pub use push_down_limit::PushDownLimit; pub use push_down_projection::PushDownProjection; diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index ba1c6d59d2..1d3d078044 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -370,7 +370,7 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(1, vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? .filter(col("a").lt(&lit(2)))? .build(); let expected = "\ diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 2c19cc9bdc..15b2bfa165 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -196,7 +196,7 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(1, vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? .limit(5, false)? .build(); let expected = "\ diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index 9249b50abc..a5e66d3afb 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -153,13 +153,8 @@ impl PushDownProjection { }) .collect::>(); let schema = Schema::new(pruned_upstream_schema)?; - let new_source: LogicalPlan = Source::new( - schema.into(), - source.source_info.clone(), - source.partition_spec.clone(), - source.limit, - ) - .into(); + let new_source: LogicalPlan = + Source::new(schema.into(), source.source_info.clone(), source.limit).into(); let new_plan = plan.with_new_children(&[new_source.into()]); // Retry optimization now that the upstream node is different. diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index 8fbf248cf7..5959ef0d29 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use common_error::DaftResult; use daft_dsl::Expr; +use indexmap::IndexMap; -use crate::{physical_plan::PhysicalPlan, ResourceRequest}; +use crate::{physical_plan::PhysicalPlan, PartitionScheme, PartitionSpec, ResourceRequest}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -11,18 +13,173 @@ pub struct Project { pub input: Arc, pub projection: Vec, pub resource_request: ResourceRequest, + pub partition_spec: Arc, } impl Project { - pub(crate) fn new( + pub(crate) fn try_new( input: Arc, projection: Vec, resource_request: ResourceRequest, - ) -> Self { - Self { + partition_spec: Arc, + ) -> DaftResult { + let partition_spec = Self::translate_partition_spec(partition_spec, &projection); + Ok(Self { input, projection, resource_request, + partition_spec, + }) + } + + fn translate_partition_spec( + input_pspec: Arc, + projection: &Vec, + ) -> Arc { + // Given an input partition spec, and a new projection, + // produce the new partition spec. + + use crate::PartitionScheme::*; + match input_pspec.scheme { + // If the scheme is vacuous, the result partiiton spec is the same. + Random | Unknown => input_pspec.clone(), + // Otherwise, need to reevaluate the partition scheme for each expression. + Range | Hash => { + // See what columns the projection directly translates into new columns. + let mut old_colname_to_new_colname = IndexMap::new(); + for expr in projection { + if let Some(oldname) = expr.input_mapping() { + let newname = expr.name().unwrap().to_string(); + // Add the oldname -> newname mapping, + // but don't overwrite any existing identity mappings (e.g. "a" -> "a"). + if old_colname_to_new_colname.get(&oldname) != Some(&oldname) { + old_colname_to_new_colname.insert(oldname, newname); + } + } + } + + // Then, see if we can fully translate the partition spec. + let maybe_new_pspec = input_pspec + .by + .as_ref() + .unwrap() + .iter() + .map(|e| Self::translate_partition_spec_expr(e, &old_colname_to_new_colname)) + .collect::, _>>(); + maybe_new_pspec.map_or_else( + |()| { + PartitionSpec::new_internal( + PartitionScheme::Unknown, + input_pspec.num_partitions, + None, + ) + .into() + }, + |new_pspec: Vec| { + PartitionSpec::new_internal( + input_pspec.scheme.clone(), + input_pspec.num_partitions, + Some(new_pspec), + ) + .into() + }, + ) + } + } + } + + fn translate_partition_spec_expr( + pspec_expr: &Expr, + old_colname_to_new_colname: &IndexMap, + ) -> std::result::Result { + // Given a single expression of an input partition spec, + // translate it to a new expression in the given projection. + // Returns: + // - Ok(expr) with expr being the translation, or + // - Err(()) if no translation is possible in the new projection. + + match pspec_expr { + Expr::Column(name) => match old_colname_to_new_colname.get(name.as_ref()) { + Some(newname) => Ok(Expr::Column(newname.as_str().into())), + None => Err(()), + }, + Expr::Literal(_) => Ok(pspec_expr.clone()), + Expr::Alias(child, name) => { + let newchild = Self::translate_partition_spec_expr( + child.as_ref(), + old_colname_to_new_colname, + )?; + Ok(Expr::Alias(newchild.into(), name.clone())) + } + Expr::BinaryOp { op, left, right } => { + let newleft = + Self::translate_partition_spec_expr(left.as_ref(), old_colname_to_new_colname)?; + let newright = Self::translate_partition_spec_expr( + right.as_ref(), + old_colname_to_new_colname, + )?; + Ok(Expr::BinaryOp { + op: *op, + left: newleft.into(), + right: newright.into(), + }) + } + Expr::Cast(child, dtype) => { + let newchild = Self::translate_partition_spec_expr( + child.as_ref(), + old_colname_to_new_colname, + )?; + Ok(Expr::Cast(newchild.into(), dtype.clone())) + } + Expr::Function { func, inputs } => { + let new_inputs = inputs + .iter() + .map(|e| Self::translate_partition_spec_expr(e, old_colname_to_new_colname)) + .collect::, _>>()?; + Ok(Expr::Function { + func: func.clone(), + inputs: new_inputs, + }) + } + Expr::Not(child) => { + let newchild = Self::translate_partition_spec_expr( + child.as_ref(), + old_colname_to_new_colname, + )?; + Ok(Expr::Not(newchild.into())) + } + Expr::IsNull(child) => { + let newchild = Self::translate_partition_spec_expr( + child.as_ref(), + old_colname_to_new_colname, + )?; + Ok(Expr::IsNull(newchild.into())) + } + Expr::IfElse { + if_true, + if_false, + predicate, + } => { + let newtrue = Self::translate_partition_spec_expr( + if_true.as_ref(), + old_colname_to_new_colname, + )?; + let newfalse = Self::translate_partition_spec_expr( + if_false.as_ref(), + old_colname_to_new_colname, + )?; + let newpred = Self::translate_partition_spec_expr( + predicate.as_ref(), + old_colname_to_new_colname, + )?; + Ok(Expr::IfElse { + if_true: newtrue.into(), + if_false: newfalse.into(), + predicate: newpred.into(), + }) + } + // Cannot have agg exprs in partition specs. + Expr::Agg(_) => Err(()), } } } diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index b66388be59..f90d26d457 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -20,9 +20,9 @@ use { use daft_core::impl_bincode_py_state_serialization; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{cmp::max, sync::Arc}; -use crate::physical_ops::*; +use crate::{physical_ops::*, PartitionScheme, PartitionSpec}; /// Physical plan for a Daft query. #[derive(Debug, Serialize, Deserialize)] @@ -38,6 +38,7 @@ pub enum PhysicalPlan { Explode(Explode), Sort(Sort), Split(Split), + Coalesce(Coalesce), Flatten(Flatten), FanoutRandom(FanoutRandom), FanoutByHash(FanoutByHash), @@ -45,7 +46,6 @@ pub enum PhysicalPlan { FanoutByRange(FanoutByRange), ReduceMerge(ReduceMerge), Aggregate(Aggregate), - Coalesce(Coalesce), Concat(Concat), Join(Join), TabularWriteParquet(TabularWriteParquet), @@ -53,6 +53,113 @@ pub enum PhysicalPlan { TabularWriteCsv(TabularWriteCsv), } +impl PhysicalPlan { + pub fn partition_spec(&self) -> Arc { + match self { + #[cfg(feature = "python")] + Self::InMemoryScan(InMemoryScan { partition_spec, .. }) => partition_spec.clone(), + Self::TabularScanParquet(TabularScanParquet { partition_spec, .. }) => { + partition_spec.clone() + } + Self::TabularScanCsv(TabularScanCsv { partition_spec, .. }) => partition_spec.clone(), + Self::TabularScanJson(TabularScanJson { partition_spec, .. }) => partition_spec.clone(), + Self::Project(Project { partition_spec, .. }) => partition_spec.clone(), + Self::Filter(Filter { input, .. }) => input.partition_spec(), + Self::Limit(Limit { input, .. }) => input.partition_spec(), + Self::Explode(Explode { input, .. }) => input.partition_spec(), + Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal( + PartitionScheme::Range, + input.partition_spec().num_partitions, + Some(sort_by.clone()), + ) + .into(), + Self::Split(Split { + output_num_partitions, + .. + }) => { + PartitionSpec::new_internal(PartitionScheme::Unknown, *output_num_partitions, None) + .into() + } + Self::Coalesce(Coalesce { num_to, .. }) => { + PartitionSpec::new_internal(PartitionScheme::Unknown, *num_to, None).into() + } + Self::Flatten(Flatten { input }) => input.partition_spec(), + Self::FanoutRandom(FanoutRandom { num_partitions, .. }) => { + PartitionSpec::new_internal(PartitionScheme::Random, *num_partitions, None).into() + } + Self::FanoutByHash(FanoutByHash { + num_partitions, + partition_by, + .. + }) => PartitionSpec::new_internal( + PartitionScheme::Hash, + *num_partitions, + Some(partition_by.clone()), + ) + .into(), + Self::FanoutByRange(FanoutByRange { + num_partitions, + sort_by, + .. + }) => PartitionSpec::new_internal( + PartitionScheme::Range, + *num_partitions, + Some(sort_by.clone()), + ) + .into(), + Self::ReduceMerge(ReduceMerge { input }) => input.partition_spec(), + Self::Aggregate(Aggregate { input, groupby, .. }) => { + let input_partition_spec = input.partition_spec(); + if input_partition_spec.num_partitions == 1 { + input_partition_spec.clone() + } else if groupby.is_empty() { + PartitionSpec::new_internal(PartitionScheme::Unknown, 1, None).into() + } else { + PartitionSpec::new_internal( + PartitionScheme::Hash, + input.partition_spec().num_partitions, + Some(groupby.clone()), + ) + .into() + } + } + Self::Concat(Concat { input, other }) => PartitionSpec::new_internal( + PartitionScheme::Unknown, + input.partition_spec().num_partitions + other.partition_spec().num_partitions, + None, + ) + .into(), + Self::Join(Join { + left, + right, + left_on, + .. + }) => { + let input_partition_spec = left.partition_spec(); + match max( + input_partition_spec.num_partitions, + right.partition_spec().num_partitions, + ) { + // NOTE: This duplicates the repartitioning logic in the planner, where we + // conditionally repartition the left and right tables. + // TODO(Clark): Consolidate this logic with the planner logic when we push the partition spec + // to be an entirely planner-side concept. + 1 => input_partition_spec, + num_partitions => PartitionSpec::new_internal( + PartitionScheme::Hash, + num_partitions, + Some(left_on.clone()), + ) + .into(), + } + } + Self::TabularWriteParquet(TabularWriteParquet { input, .. }) => input.partition_spec(), + Self::TabularWriteCsv(TabularWriteCsv { input, .. }) => input.partition_spec(), + Self::TabularWriteJson(TabularWriteJson { input, .. }) => input.partition_spec(), + } + } +} + /// A work scheduler for physical plans. #[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] #[derive(Debug, Serialize, Deserialize)] @@ -263,6 +370,7 @@ impl PhysicalPlan { input, projection, resource_request, + .. }) => { let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?; let projection_pyexprs: Vec = projection diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 13f0609dcf..24a2f5b211 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -27,47 +27,58 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { LogicalPlan::Source(Source { output_schema, source_info, - partition_spec, limit, filters, }) => match source_info.as_ref() { SourceInfo::ExternalInfo( ext_info @ ExternalSourceInfo { - file_format_config, .. + file_format_config, + file_infos, + .. }, - ) => match file_format_config.as_ref() { - FileFormatConfig::Parquet(_) => { - Ok(PhysicalPlan::TabularScanParquet(TabularScanParquet::new( - output_schema.clone(), - ext_info.clone(), - partition_spec.clone(), - *limit, - filters.to_vec(), - ))) - } - FileFormatConfig::Csv(_) => Ok(PhysicalPlan::TabularScanCsv(TabularScanCsv::new( - output_schema.clone(), - ext_info.clone(), - partition_spec.clone(), - *limit, - filters.to_vec(), - ))), - FileFormatConfig::Json(_) => { - Ok(PhysicalPlan::TabularScanJson(TabularScanJson::new( - output_schema.clone(), - ext_info.clone(), - partition_spec.clone(), - *limit, - filters.to_vec(), - ))) + ) => { + let partition_spec = Arc::new(PartitionSpec::new_internal( + PartitionScheme::Unknown, + file_infos.len(), + None, + )); + match file_format_config.as_ref() { + FileFormatConfig::Parquet(_) => { + Ok(PhysicalPlan::TabularScanParquet(TabularScanParquet::new( + output_schema.clone(), + ext_info.clone(), + partition_spec, + *limit, + filters.to_vec(), + ))) + } + FileFormatConfig::Csv(_) => { + Ok(PhysicalPlan::TabularScanCsv(TabularScanCsv::new( + output_schema.clone(), + ext_info.clone(), + partition_spec, + *limit, + filters.to_vec(), + ))) + } + FileFormatConfig::Json(_) => { + Ok(PhysicalPlan::TabularScanJson(TabularScanJson::new( + output_schema.clone(), + ext_info.clone(), + partition_spec, + *limit, + filters.to_vec(), + ))) + } } - }, + } #[cfg(feature = "python")] SourceInfo::InMemoryInfo(mem_info) => { let scan = PhysicalPlan::InMemoryScan(InMemoryScan::new( mem_info.source_schema.clone(), mem_info.clone(), - partition_spec.clone(), + PartitionSpec::new(PartitionScheme::Unknown, mem_info.num_partitions, None) + .into(), )); let plan = if output_schema.fields.len() < mem_info.source_schema.fields.len() { let projection = output_schema @@ -75,7 +86,13 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { .iter() .map(|(name, _)| Expr::Column(name.clone().into())) .collect::>(); - PhysicalPlan::Project(Project::new(scan.into(), projection, Default::default())) + let partition_spec = scan.partition_spec().clone(); + PhysicalPlan::Project(Project::try_new( + scan.into(), + projection, + Default::default(), + partition_spec, + )?) } else { scan }; @@ -89,11 +106,13 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { .. }) => { let input_physical = plan(input)?; - Ok(PhysicalPlan::Project(Project::new( + let partition_spec = input_physical.partition_spec().clone(); + Ok(PhysicalPlan::Project(Project::try_new( input_physical.into(), projection.clone(), resource_request.clone(), - ))) + partition_spec, + )?)) } LogicalPlan::Filter(LogicalFilter { input, predicate }) => { let input_physical = plan(input)?; @@ -108,11 +127,12 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { eager, }) => { let input_physical = plan(input)?; + let num_partitions = input_physical.partition_spec().num_partitions; Ok(PhysicalPlan::Limit(Limit::new( input_physical.into(), *limit, *eager, - logical_plan.partition_spec().num_partitions, + num_partitions, ))) } LogicalPlan::Explode(LogicalExplode { @@ -130,7 +150,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { descending, }) => { let input_physical = plan(input)?; - let num_partitions = logical_plan.partition_spec().num_partitions; + let num_partitions = input_physical.partition_spec().num_partitions; Ok(PhysicalPlan::Sort(Sort::new( input_physical.into(), sort_by.clone(), @@ -145,15 +165,16 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { scheme, }) => { let input_physical = Arc::new(plan(input)?); + let input_num_partitions = input_physical.partition_spec().num_partitions; + let num_partitions = num_partitions.unwrap_or(input_num_partitions); match scheme { PartitionScheme::Unknown => { - let input_num_partitions = input.partition_spec().num_partitions; - if *num_partitions > input_num_partitions { + if num_partitions > input_num_partitions { // Split input partitions into num_partitions. let split_op = PhysicalPlan::Split(Split::new( input_physical, input_num_partitions, - *num_partitions, + num_partitions, )); Ok(PhysicalPlan::Flatten(Flatten::new(split_op.into()))) } else { @@ -161,21 +182,21 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { Ok(PhysicalPlan::Coalesce(Coalesce::new( input_physical, input_num_partitions, - *num_partitions, + num_partitions, ))) } } PartitionScheme::Random => { let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( input_physical, - *num_partitions, + num_partitions, )); Ok(PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))) } PartitionScheme::Hash => { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( input_physical, - *num_partitions, + num_partitions, partition_by.clone(), )); Ok(PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))) @@ -196,7 +217,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { vec![], col_exprs.clone(), )); - let num_partitions = logical_plan.partition_spec().num_partitions; + let num_partitions = agg_op.partition_spec().num_partitions; if num_partitions > 1 { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( agg_op.into(), @@ -223,7 +244,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { use daft_dsl::Expr::Column; let input_plan = plan(input)?; - let num_input_partitions = input.partition_spec().num_partitions; + let num_input_partitions = input_plan.partition_spec().num_partitions; let result_plan = match num_input_partitions { 1 => PhysicalPlan::Aggregate(Aggregate::new( @@ -398,11 +419,13 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { groupby.clone(), )); - PhysicalPlan::Project(Project::new( + let partition_spec = second_stage_agg.partition_spec().clone(); + PhysicalPlan::Project(Project::try_new( second_stage_agg.into(), final_exprs, Default::default(), - )) + partition_spec, + )?) } }; @@ -426,8 +449,8 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { }) => { let mut left_physical = plan(left)?; let mut right_physical = plan(right)?; - let left_pspec = left.partition_spec(); - let right_pspec = right.partition_spec(); + let left_pspec = left_physical.partition_spec(); + let right_pspec = right_physical.partition_spec(); let num_partitions = max(left_pspec.num_partitions, right_pspec.num_partitions); let new_left_pspec = Arc::new(PartitionSpec::new_internal( PartitionScheme::Hash, diff --git a/src/daft-plan/src/source_info/mod.rs b/src/daft-plan/src/source_info/mod.rs index 13a52a268c..d2fc46c329 100644 --- a/src/daft-plan/src/source_info/mod.rs +++ b/src/daft-plan/src/source_info/mod.rs @@ -39,15 +39,22 @@ pub struct InMemoryInfo { deserialize_with = "deserialize_py_object" )] pub cache_entry: PyObject, + pub num_partitions: usize, } #[cfg(feature = "python")] impl InMemoryInfo { - pub fn new(source_schema: SchemaRef, cache_key: String, cache_entry: PyObject) -> Self { + pub fn new( + source_schema: SchemaRef, + cache_key: String, + cache_entry: PyObject, + num_partitions: usize, + ) -> Self { Self { source_schema, cache_key, cache_entry, + num_partitions, } } } diff --git a/tests/dataframe/test_accessors.py b/tests/dataframe/test_accessors.py index 3299507e7f..6886881c3a 100644 --- a/tests/dataframe/test_accessors.py +++ b/tests/dataframe/test_accessors.py @@ -11,13 +11,6 @@ def df(): return daft.from_pydict({"foo": [1, 2, 3]}) -def test_num_partitions(df): - assert df.num_partitions() == 1 - - df2 = df.repartition(2) - assert df2.num_partitions() == 2 - - def test_schema(df): fields = [f for f in df.schema()] assert len(fields) == 1 From a5834013db37408e18514c3e6ed59398616f3e78 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Fri, 27 Oct 2023 12:57:51 -0700 Subject: [PATCH 03/10] Add back support for df.num_partitions(). --- daft/daft.pyi | 1 + daft/dataframe/dataframe.py | 3 +++ daft/plan_scheduler/physical_plan_scheduler.py | 3 +++ src/daft-plan/src/physical_plan.rs | 3 +++ tests/dataframe/test_accessors.py | 7 +++++++ 5 files changed, 17 insertions(+) diff --git a/daft/daft.pyi b/daft/daft.pyi index b67797ba37..b54f90352a 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -790,6 +790,7 @@ class PhysicalPlanScheduler: A work scheduler for physical query plans. """ + def num_partitions(self) -> int: ... def to_partition_tasks( self, psets: dict[str, list[PartitionT]], is_ray_runner: bool ) -> physical_plan.MaterializedPhysicalPlan: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index cd7f458378..6f98ab8856 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -123,6 +123,9 @@ def explain(self, show_optimized: bool = False, simple=False) -> None: builder = builder.optimize() print(builder.pretty_print(simple)) + def num_partitions(self) -> int: + return self.__builder.to_physical_plan_scheduler().num_partitions() + @DataframePublicAPI def schema(self) -> Schema: """Returns the Schema of the DataFrame, which provides information about each column diff --git a/daft/plan_scheduler/physical_plan_scheduler.py b/daft/plan_scheduler/physical_plan_scheduler.py index 332cb5d1d0..0668597b19 100644 --- a/daft/plan_scheduler/physical_plan_scheduler.py +++ b/daft/plan_scheduler/physical_plan_scheduler.py @@ -13,6 +13,9 @@ class PhysicalPlanScheduler: def __init__(self, scheduler: _PhysicalPlanScheduler): self._scheduler = scheduler + def num_partitions(self) -> int: + return self._scheduler.num_partitions() + def to_partition_tasks( self, psets: dict[str, list[PartitionT]], is_ray_runner: bool ) -> physical_plan.MaterializedPhysicalPlan: diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index f90d26d457..1db96bc53c 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -170,6 +170,9 @@ pub struct PhysicalPlanScheduler { #[cfg(feature = "python")] #[pymethods] impl PhysicalPlanScheduler { + pub fn num_partitions(&self) -> PyResult { + self.plan.partition_spec().get_num_partitions() + } /// Converts the contained physical plan into an iterator of executable partition tasks. pub fn to_partition_tasks( &self, diff --git a/tests/dataframe/test_accessors.py b/tests/dataframe/test_accessors.py index 6886881c3a..3299507e7f 100644 --- a/tests/dataframe/test_accessors.py +++ b/tests/dataframe/test_accessors.py @@ -11,6 +11,13 @@ def df(): return daft.from_pydict({"foo": [1, 2, 3]}) +def test_num_partitions(df): + assert df.num_partitions() == 1 + + df2 = df.repartition(2) + assert df2.num_partitions() == 2 + + def test_schema(df): fields = [f for f in df.schema()] assert len(fields) == 1 From 1c0ecc2e030bdd4f7f92087ca02d85cd4241a71d Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Fri, 27 Oct 2023 13:04:50 -0700 Subject: [PATCH 04/10] Fix optimization tests. --- src/daft-plan/src/logical_ops/repartition.rs | 7 ++++++- src/daft-plan/src/optimization/optimizer.rs | 2 +- .../src/optimization/rules/push_down_filter.rs | 8 ++++---- .../src/optimization/rules/push_down_limit.rs | 2 +- .../src/optimization/rules/push_down_projection.rs | 14 +++++++------- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/daft-plan/src/logical_ops/repartition.rs b/src/daft-plan/src/logical_ops/repartition.rs index de21600fa8..b666c13030 100644 --- a/src/daft-plan/src/logical_ops/repartition.rs +++ b/src/daft-plan/src/logical_ops/repartition.rs @@ -31,7 +31,12 @@ impl Repartition { pub fn multiline_display(&self) -> Vec { let mut res = vec![]; res.push(format!("Repartition: Scheme = {:?}", self.scheme)); - res.push(format!("Number of partitions = {:?}", self.num_partitions)); + res.push(format!( + "Number of partitions = {}", + self.num_partitions + .map(|n| n.to_string()) + .unwrap_or("None".to_string()) + )); if !self.partition_by.is_empty() { res.push(format!( "Partition by = {}", diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index 0456a9ccb3..a24a35c1a5 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -533,7 +533,7 @@ mod tests { assert_eq!(pass_count, 6); let expected = "\ Filter: [[[col(a) < lit(2)] | lit(false)] | lit(false)] & lit(true)\ - \n Project: col(a) + lit(3) AS c, col(a) + lit(1), col(a) + lit(2) AS b, Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + \n Project: col(a) + lit(3) AS c, col(a) + lit(1), col(a) + lit(2) AS b\ \n Source: Json, File paths = [/foo], File schema = a (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64)"; assert_eq!(opt_plan.repr_indent(), expected); Ok(()) diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index 1d3d078044..6fda32511a 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -277,7 +277,7 @@ mod tests { .filter(col("a").lt(&lit(2)))? .build(); let expected = "\ - Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a)\ \n Filter: col(a) < lit(2)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; @@ -295,7 +295,7 @@ mod tests { .filter(col("a").lt(&lit(2)).and(&col("b").eq(&lit("foo"))))? .build(); let expected = "\ - Project: col(a), col(b), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a), col(b)\ \n Filter: [col(a) < lit(2)] & [col(b) == lit(\"foo\")]\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; @@ -316,7 +316,7 @@ mod tests { // Filter should NOT commute with Project, since this would involve redundant computation. let expected = "\ Filter: col(a) < lit(2)\ - \n Project: col(a) + lit(1), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + \n Project: col(a) + lit(1)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) @@ -336,7 +336,7 @@ mod tests { .filter(col("a").lt(&lit(2)))? .build(); let expected = "\ - Project: col(a) + lit(1), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a) + lit(1)\ \n Filter: [col(a) + lit(1)] < lit(2)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 15b2bfa165..73085c1a88 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -220,7 +220,7 @@ mod tests { .limit(5, false)? .build(); let expected = "\ - Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a)\ \n Limit: 5\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8), Limit = 5"; assert_optimized_plan_eq(plan, expected)?; diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index a5e66d3afb..c80e7e5fbd 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -559,7 +559,7 @@ mod tests { .build(); let expected = "\ - Project: [col(a) + lit(1)] + lit(3), col(b) + lit(2), col(a) + lit(4), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: [col(a) + lit(1)] + lit(3), col(b) + lit(2), col(a) + lit(4)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -592,7 +592,7 @@ mod tests { .build(); let expected = "\ - Project: col(b), col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(b), col(a)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; @@ -610,7 +610,7 @@ mod tests { .build(); let expected = "\ - Project: col(b) + lit(3), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(b) + lit(3)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; @@ -635,8 +635,8 @@ mod tests { .build(); let expected = "\ - Project: col(a), col(b), col(b) AS c, Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ - \n Project: col(b) + lit(3), col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a), col(b), col(b) AS c\ + \n Project: col(b) + lit(3), col(a)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; @@ -656,7 +656,7 @@ mod tests { .build(); let expected = "\ - Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a)\ \n Aggregation: mean(col(a)), Group by = col(c), Output schema = c (Int64), a (Float64)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), c (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; @@ -677,7 +677,7 @@ mod tests { .build(); let expected = "\ - Project: col(a), Partition spec = PartitionSpec { scheme: Unknown, num_partitions: 1, by: None }\ + Project: col(a)\ \n Filter: col(b)\ \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Boolean), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Boolean)"; assert_optimized_plan_eq(unoptimized, expected)?; From 5e248f59e52554f92f42a767a8d6a8e37964d5f5 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Fri, 27 Oct 2023 14:16:43 -0700 Subject: [PATCH 05/10] Add test coverage for physical Project op's partition spec translation. --- Cargo.lock | 1 + Cargo.toml | 1 + src/daft-csv/Cargo.toml | 2 +- src/daft-plan/Cargo.toml | 3 + src/daft-plan/src/physical_ops/project.rs | 113 ++++++++++++++++++++++ 5 files changed, 119 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a098e5b3a9..94f5088b92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1274,6 +1274,7 @@ dependencies = [ "log", "pyo3", "pyo3-log", + "rstest", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index c0f3663c4c..b3c97eedf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ num-traits = "0.2" prettytable-rs = "0.10" rand = "^0.8" rayon = "1.7.0" +rstest = "0.18.2" serde_json = "1.0.104" snafu = {version = "0.7.4", features = ["futures"]} tokio = {version = "1.32.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]} diff --git a/src/daft-csv/Cargo.toml b/src/daft-csv/Cargo.toml index 099637401b..36f97fb765 100644 --- a/src/daft-csv/Cargo.toml +++ b/src/daft-csv/Cargo.toml @@ -26,7 +26,7 @@ tokio-util = {workspace = true} url = {workspace = true} [dev-dependencies] -rstest = "0.18.2" +rstest = {workspace = true} [features] default = ["python"] diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index 62b0721f74..079fd75715 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -14,6 +14,9 @@ serde = {workspace = true, features = ["rc"]} serde_json = {workspace = true} snafu = {workspace = true} +[dev-dependencies] +rstest = {workspace = true} + [features] default = ["python"] python = ["dep:pyo3", "common-error/python", "common-io-config/python", "daft-core/python", "daft-dsl/python", "daft-table/python"] diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index 5959ef0d29..50000e4eea 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -183,3 +183,116 @@ impl Project { } } } + +#[cfg(test)] +mod tests { + use common_error::DaftResult; + use daft_core::{datatypes::Field, DataType}; + use daft_dsl::{col, lit, Expr}; + use rstest::rstest; + + use crate::{planner::plan, test::dummy_scan_node, PartitionScheme, PartitionSpec}; + + /// Test that projections preserving column inputs, even through aliasing, + /// do not destroy the partition spec. + #[test] + fn test_partition_spec_preserving() -> DaftResult<()> { + let expressions = vec![ + (col("a") % lit(2)), // this is now "a" + col("b"), + col("a").alias("aa"), + ]; + let logical_plan = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Int64), + Field::new("c", DataType::Int64), + ]) + .repartition( + Some(3), + vec![Expr::Column("a".into()), Expr::Column("b".into())], + PartitionScheme::Hash, + )? + .project(expressions, Default::default())? + .build(); + + let physical_plan = plan(&logical_plan)?; + + let expected_pspec = + PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("aa"), col("b")])); + + assert_eq!( + expected_pspec, + physical_plan.partition_spec().as_ref().clone() + ); + + Ok(()) + } + + /// Test that projections destroying even a single column input from the partition spec + /// destroys the entire partition spec. + #[rstest] + fn test_partition_spec_destroying( + #[values( + vec![col("a"), col("c").alias("b")], // original "b" is gone even though "b" is present + vec![col("b")], // original "a" dropped + vec![col("a") % lit(2), col("b")], // original "a" gone + vec![col("c")], // everything gone + )] + projection: Vec, + ) -> DaftResult<()> { + let logical_plan = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Int64), + Field::new("c", DataType::Int64), + ]) + .repartition( + Some(3), + vec![Expr::Column("a".into()), Expr::Column("b".into())], + PartitionScheme::Hash, + )? + .project(projection, Default::default())? + .build(); + + let physical_plan = plan(&logical_plan)?; + + let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Unknown, 3, None); + assert_eq!( + expected_pspec, + physical_plan.partition_spec().as_ref().clone() + ); + + Ok(()) + } + + /// Test that new partition specs favor existing instead of new names. + /// i.e. ("a", "a" as "b") remains partitioned by "a", not "b" + #[test] + fn test_partition_spec_prefer_existing_names() -> DaftResult<()> { + let expressions = vec![col("a").alias("y"), col("a"), col("a").alias("z"), col("b")]; + + let logical_plan = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Int64), + Field::new("c", DataType::Int64), + ]) + .repartition( + Some(3), + vec![Expr::Column("a".into()), Expr::Column("b".into())], + PartitionScheme::Hash, + )? + .project(expressions, Default::default())? + .build(); + + let physical_plan = plan(&logical_plan)?; + + let expected_pspec = + PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("a"), col("b")])); + + assert_eq!( + expected_pspec, + physical_plan.partition_spec().as_ref().clone() + ); + + Ok(()) + } +} From 49a7b79baff810d2161e02af0c9ff10448f7390c Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Fri, 27 Oct 2023 16:19:21 -0700 Subject: [PATCH 06/10] Add DropRepartitions as logical optimization + planner split. --- src/daft-plan/src/lib.rs | 1 + src/daft-plan/src/optimization/optimizer.rs | 4 +- .../optimization/rules/drop_repartition.rs | 110 +------------ src/daft-plan/src/optimization/rules/mod.rs | 4 +- src/daft-plan/src/physical_ops/csv.rs | 4 +- src/daft-plan/src/physical_ops/in_memory.rs | 2 +- src/daft-plan/src/physical_ops/json.rs | 4 +- src/daft-plan/src/physical_ops/parquet.rs | 4 +- src/daft-plan/src/physical_plan.rs | 2 +- src/daft-plan/src/planner.rs | 149 ++++++++++++++++-- 10 files changed, 153 insertions(+), 131 deletions(-) diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index 9e3547c908..a72214e4d8 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -1,4 +1,5 @@ #![feature(let_chains)] +#![feature(assert_matches)] mod builder; mod display; diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index a24a35c1a5..346c93d523 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -7,7 +7,8 @@ use crate::LogicalPlan; use super::{ logical_plan_tracker::LogicalPlanTracker, rules::{ - ApplyOrder, OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, Transformed, + ApplyOrder, DropRepartition, OptimizerRule, PushDownFilter, PushDownLimit, + PushDownProjection, Transformed, }, }; @@ -130,6 +131,7 @@ impl Optimizer { // Default rule batches. let rule_batches: Vec = vec![RuleBatch::new( vec![ + Box::new(DropRepartition::new()), Box::new(PushDownFilter::new()), Box::new(PushDownProjection::new()), Box::new(PushDownLimit::new()), diff --git a/src/daft-plan/src/optimization/rules/drop_repartition.rs b/src/daft-plan/src/optimization/rules/drop_repartition.rs index 01b0f54c59..c4df749f3c 100644 --- a/src/daft-plan/src/optimization/rules/drop_repartition.rs +++ b/src/daft-plan/src/optimization/rules/drop_repartition.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use common_error::DaftResult; -use crate::{LogicalPlan, PartitionScheme}; +use crate::LogicalPlan; use super::{ApplyOrder, OptimizerRule, Transformed}; @@ -34,22 +34,7 @@ impl OptimizerRule for DropRepartition { // Repartition1-Repartition2 -> Repartition1 plan.with_new_children(&[child_plan.children()[0].clone()]) } - _ => { - // Drop a repartition if it would produce the same partition spec as is already produced by the child. - let parent_partition_spec = plan.partition_spec(); - let child_partition_spec = child_plan.partition_spec(); - if (parent_partition_spec.num_partitions == 1 - && child_partition_spec.num_partitions == 1) - || (child_partition_spec == parent_partition_spec - && !matches!(parent_partition_spec.scheme, PartitionScheme::Range)) - { - // We directly clone the downstream Repartition child rather than creating a new Arc on child_plan to elide - // an extra copy/Arc. - repartition.input.clone() - } else { - return Ok(Transformed::No(plan)); - } - } + _ => return Ok(Transformed::No(plan)), }; Ok(Transformed::Yes(new_plan)) } @@ -59,7 +44,7 @@ impl OptimizerRule for DropRepartition { mod tests { use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; - use daft_dsl::{col, lit, AggExpr, Expr}; + use daft_dsl::col; use std::sync::Arc; use crate::{ @@ -105,8 +90,8 @@ mod tests { Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]) - .repartition(10, vec![col("a")], PartitionScheme::Hash)? - .repartition(5, vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .repartition(Some(5), vec![col("a")], PartitionScheme::Hash)? .build(); let expected = "\ Repartition: Scheme = Hash, Number of partitions = 5, Partition by = col(a)\ @@ -114,89 +99,4 @@ mod tests { assert_optimized_plan_eq(plan, expected)?; Ok(()) } - - /// Tests that DropRepartition drops a Repartition if both the Repartition and the child have a single partition. - /// - /// Repartition-LogicalPlan -> LogicalPlan - #[test] - fn repartition_dropped_single_partition() -> DaftResult<()> { - // dummy_scan_node() will create the default PartitionSpec, which only has a single partition. - let builder = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Utf8), - ]); - assert_eq!(builder.partition_spec().num_partitions, 1); - let plan = builder - .repartition(1, vec![col("a")], PartitionScheme::Hash)? - .build(); - let expected = "\ - Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; - assert_optimized_plan_eq(plan, expected)?; - Ok(()) - } - - /// Tests that DropRepartition drops a Repartition if both the Repartition and the child have the same partition spec. - /// - /// Repartition-LogicalPlan -> LogicalPlan - #[test] - fn repartition_dropped_same_partition_spec() -> DaftResult<()> { - let plan = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Utf8), - ]) - .repartition(10, vec![col("a")], PartitionScheme::Hash)? - .filter(col("a").lt(&lit(2)))? - .repartition(10, vec![col("a")], PartitionScheme::Hash)? - .build(); - let expected = "\ - Filter: col(a) < lit(2)\ - \n Repartition: Scheme = Hash, Number of partitions = 10, Partition by = col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; - assert_optimized_plan_eq(plan, expected)?; - Ok(()) - } - - /// Tests that DropRepartition drops a Repartition if both the Repartition and the upstream Aggregation have the same partition spec. - /// - /// Repartition-Aggregation -> Aggregation - #[test] - fn repartition_dropped_same_partition_spec_agg() -> DaftResult<()> { - let plan = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Int64), - ]) - .repartition(10, vec![col("a")], PartitionScheme::Hash)? - .aggregate( - vec![Expr::Agg(AggExpr::Sum(col("a").into()))], - vec![col("b")], - )? - .repartition(10, vec![col("b")], PartitionScheme::Hash)? - .build(); - let expected = "\ - Aggregation: sum(col(a)), Group by = col(b), Output schema = b (Int64), a (Int64)\ - \n Repartition: Scheme = Hash, Number of partitions = 10, Partition by = col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Int64)"; - assert_optimized_plan_eq(plan, expected)?; - Ok(()) - } - - /// Tests that DropRepartition does NOT drop a Repartition if both the Repartition and the child have the same partition spec but they are range-partitioned. - #[test] - fn repartition_not_dropped_same_partition_spec() -> DaftResult<()> { - let plan = dummy_scan_node(vec![ - Field::new("a", DataType::Int64), - Field::new("b", DataType::Utf8), - ]) - .repartition(10, vec![col("a")], PartitionScheme::Hash)? - .sort(vec![col("a")], vec![true])? - .repartition(10, vec![col("a")], PartitionScheme::Range)? - .build(); - let expected = "\ - Repartition: Scheme = Range, Number of partitions = 10, Partition by = col(a)\ - \n Sort: Sort by = (col(a), descending)\ - \n Repartition: Scheme = Hash, Number of partitions = 10, Partition by = col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None }), Output schema = a (Int64), b (Utf8)"; - assert_optimized_plan_eq(plan, expected)?; - Ok(()) - } } diff --git a/src/daft-plan/src/optimization/rules/mod.rs b/src/daft-plan/src/optimization/rules/mod.rs index 80616352b0..fd1578af73 100644 --- a/src/daft-plan/src/optimization/rules/mod.rs +++ b/src/daft-plan/src/optimization/rules/mod.rs @@ -1,11 +1,11 @@ -// mod drop_repartition; +mod drop_repartition; mod push_down_filter; mod push_down_limit; mod push_down_projection; mod rule; mod utils; -// pub use drop_repartition::DropRepartition; +pub use drop_repartition::DropRepartition; pub use push_down_filter::PushDownFilter; pub use push_down_limit::PushDownLimit; pub use push_down_projection::PushDownProjection; diff --git a/src/daft-plan/src/physical_ops/csv.rs b/src/daft-plan/src/physical_ops/csv.rs index 3435a03eb6..cccedd45d1 100644 --- a/src/daft-plan/src/physical_ops/csv.rs +++ b/src/daft-plan/src/physical_ops/csv.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct TabularScanCsv { pub projection_schema: SchemaRef, pub external_info: ExternalInfo, @@ -36,7 +36,7 @@ impl TabularScanCsv { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularWriteCsv { pub schema: SchemaRef, pub file_info: OutputFileInfo, diff --git a/src/daft-plan/src/physical_ops/in_memory.rs b/src/daft-plan/src/physical_ops/in_memory.rs index 95387bb46f..cfe8fe154f 100644 --- a/src/daft-plan/src/physical_ops/in_memory.rs +++ b/src/daft-plan/src/physical_ops/in_memory.rs @@ -3,7 +3,7 @@ use daft_core::schema::SchemaRef; use serde::{Deserialize, Serialize}; use std::sync::Arc; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct InMemoryScan { pub schema: SchemaRef, pub in_memory_info: InMemoryInfo, diff --git a/src/daft-plan/src/physical_ops/json.rs b/src/daft-plan/src/physical_ops/json.rs index 0d2142e856..0d8fce535f 100644 --- a/src/daft-plan/src/physical_ops/json.rs +++ b/src/daft-plan/src/physical_ops/json.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularScanJson { pub projection_schema: SchemaRef, pub external_info: ExternalInfo, @@ -36,7 +36,7 @@ impl TabularScanJson { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularWriteJson { pub schema: SchemaRef, pub file_info: OutputFileInfo, diff --git a/src/daft-plan/src/physical_ops/parquet.rs b/src/daft-plan/src/physical_ops/parquet.rs index 1d4140f325..f461432d3f 100644 --- a/src/daft-plan/src/physical_ops/parquet.rs +++ b/src/daft-plan/src/physical_ops/parquet.rs @@ -9,7 +9,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularScanParquet { pub projection_schema: SchemaRef, pub external_info: ExternalSourceInfo, @@ -36,7 +36,7 @@ impl TabularScanParquet { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TabularWriteParquet { pub schema: SchemaRef, pub file_info: OutputFileInfo, diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 1db96bc53c..eca30d3869 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -25,7 +25,7 @@ use std::{cmp::max, sync::Arc}; use crate::{physical_ops::*, PartitionScheme, PartitionSpec}; /// Physical plan for a Daft query. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum PhysicalPlan { #[cfg(feature = "python")] InMemoryScan(InMemoryScan), diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 24a2f5b211..9e54ad4ab3 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::sync::Arc; use std::{cmp::max, collections::HashMap}; @@ -165,43 +166,57 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { scheme, }) => { let input_physical = Arc::new(plan(input)?); - let input_num_partitions = input_physical.partition_spec().num_partitions; + let input_partition_spec = input_physical.partition_spec(); + let input_num_partitions = input_partition_spec.num_partitions; let num_partitions = num_partitions.unwrap_or(input_num_partitions); - match scheme { - PartitionScheme::Unknown => { - if num_partitions > input_num_partitions { + let repartitioned_plan = match scheme { + PartitionScheme::Unknown => match num_partitions.cmp(&input_num_partitions) { + Ordering::Greater => { // Split input partitions into num_partitions. let split_op = PhysicalPlan::Split(Split::new( - input_physical, + input_physical.clone(), input_num_partitions, num_partitions, )); - Ok(PhysicalPlan::Flatten(Flatten::new(split_op.into()))) - } else { + PhysicalPlan::Flatten(Flatten::new(split_op.into())) + } + Ordering::Less => { // Coalesce input partitions into num_partitions. - Ok(PhysicalPlan::Coalesce(Coalesce::new( - input_physical, + PhysicalPlan::Coalesce(Coalesce::new( + input_physical.clone(), input_num_partitions, num_partitions, - ))) + )) } - } + Ordering::Equal => { + // # of output partitions == # of input partitions, so we drop the redundant repartition. + return Ok(input_physical.as_ref().clone()); + } + }, PartitionScheme::Random => { let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( - input_physical, + input_physical.clone(), num_partitions, )); - Ok(PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))) + PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) } PartitionScheme::Hash => { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - input_physical, + input_physical.clone(), num_partitions, partition_by.clone(), )); - Ok(PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))) + PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) } PartitionScheme::Range => unreachable!("Repartitioning by range is not supported"), + }; + let repartitioned_partition_spec = repartitioned_plan.partition_spec(); + if (input_num_partitions == 1 && repartitioned_partition_spec.num_partitions == 1) + || (repartitioned_partition_spec == input_partition_spec) + { + Ok(input_physical.as_ref().clone()) + } else { + Ok(repartitioned_plan) } } LogicalPlan::Distinct(LogicalDistinct { input }) => { @@ -524,3 +539,107 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { } } } + +#[cfg(test)] +mod tests { + use common_error::DaftResult; + use daft_core::{datatypes::Field, DataType}; + use daft_dsl::{col, lit, AggExpr, Expr}; + use std::assert_matches::assert_matches; + + use crate::physical_plan::PhysicalPlan; + use crate::planner::plan; + use crate::{test::dummy_scan_node, PartitionScheme}; + + /// Tests that planner drops a simple Repartition (e.g. df.into_partitions()) the child already has the desired number of partitions. + /// + /// Repartition-upstream_op -> upstream_op + #[test] + fn repartition_dropped_redundant_into_partitions() -> DaftResult<()> { + // dummy_scan_node() will create the default PartitionSpec, which only has a single partition. + let builder = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ]) + .repartition(Some(10), vec![], PartitionScheme::Unknown)? + .filter(col("a").lt(&lit(2)))?; + assert_eq!( + plan(builder.build().as_ref())? + .partition_spec() + .num_partitions, + 10 + ); + let logical_plan = builder + .repartition(Some(10), vec![], PartitionScheme::Unknown)? + .build(); + let physical_plan = plan(logical_plan.as_ref())?; + // Check that the last repartition was dropped (the last op should be the filter). + assert_matches!(physical_plan, PhysicalPlan::Filter(_)); + Ok(()) + } + + /// Tests that planner drops a Repartition if both the Repartition and the child have a single partition. + /// + /// Repartition-upstream_op -> upstream_op + #[test] + fn repartition_dropped_single_partition() -> DaftResult<()> { + // dummy_scan_node() will create the default PartitionSpec, which only has a single partition. + let builder = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ]); + assert_eq!( + plan(builder.build().as_ref())? + .partition_spec() + .num_partitions, + 1 + ); + let logical_plan = builder + .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? + .build(); + let physical_plan = plan(logical_plan.as_ref())?; + assert_matches!(physical_plan, PhysicalPlan::TabularScanJson(_)); + Ok(()) + } + + /// Tests that planner drops a Repartition if both the Repartition and the child have the same partition spec. + /// + /// Repartition-upstream_op -> upstream_op + #[test] + fn repartition_dropped_same_partition_spec() -> DaftResult<()> { + let logical_plan = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Utf8), + ]) + .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .filter(col("a").lt(&lit(2)))? + .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .build(); + let physical_plan = plan(logical_plan.as_ref())?; + // Check that the last repartition was dropped (the last op should be the filter). + assert_matches!(physical_plan, PhysicalPlan::Filter(_)); + Ok(()) + } + + /// Tests that planner drops a Repartition if both the Repartition and the upstream Aggregation have the same partition spec. + /// + /// Repartition-Aggregation -> Aggregation + #[test] + fn repartition_dropped_same_partition_spec_agg() -> DaftResult<()> { + let logical_plan = dummy_scan_node(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Int64), + ]) + .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? + .aggregate( + vec![Expr::Agg(AggExpr::Sum(col("a").into()))], + vec![col("b")], + )? + .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? + .build(); + let physical_plan = plan(logical_plan.as_ref())?; + // Check that the last repartition was dropped (the last op should be a projection for a multi-partition aggregation). + assert_matches!(physical_plan, PhysicalPlan::Project(_)); + Ok(()) + } +} From 4b79560139042fa1d4f42971bcef6e644caf9a6e Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Fri, 27 Oct 2023 16:42:42 -0700 Subject: [PATCH 07/10] Apply suggestions from code review Co-authored-by: Jay Chia <17691182+jaychia@users.noreply.github.com> --- daft/dataframe/dataframe.py | 2 +- src/daft-plan/src/logical_ops/repartition.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 6f98ab8856..ddf73b291a 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -85,7 +85,7 @@ def _builder(self) -> LogicalPlanBuilder: else: num_partitions = self._result_cache.num_partitions() # Partition set should always be set on cache entry. - assert num_partitions is not None + assert num_partitions is not None, "Partition set should always be set on cache entry" return self.__builder.from_in_memory_scan( self._result_cache, self.__builder.schema(), num_partitions=num_partitions ) diff --git a/src/daft-plan/src/logical_ops/repartition.rs b/src/daft-plan/src/logical_ops/repartition.rs index b666c13030..2fc6b715e0 100644 --- a/src/daft-plan/src/logical_ops/repartition.rs +++ b/src/daft-plan/src/logical_ops/repartition.rs @@ -35,7 +35,7 @@ impl Repartition { "Number of partitions = {}", self.num_partitions .map(|n| n.to_string()) - .unwrap_or("None".to_string()) + .unwrap_or("Unknown".to_string()) )); if !self.partition_by.is_empty() { res.push(format!( From 4de97f494887c4f5130b69bc1dd75815b158e42e Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Mon, 30 Oct 2023 13:59:12 -0700 Subject: [PATCH 08/10] Disallow repartitioning with Range scheme at builder stage. --- src/daft-plan/src/builder.rs | 10 +++++++--- src/daft-plan/src/logical_ops/repartition.rs | 14 ++++++++++---- src/daft-plan/src/logical_plan.rs | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 1a5645a640..bd89bb720d 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -131,9 +131,13 @@ impl LogicalPlanBuilder { partition_by: Vec, scheme: PartitionScheme, ) -> DaftResult { - let logical_plan: LogicalPlan = - logical_ops::Repartition::new(self.plan.clone(), num_partitions, partition_by, scheme) - .into(); + let logical_plan: LogicalPlan = logical_ops::Repartition::try_new( + self.plan.clone(), + num_partitions, + partition_by, + scheme, + )? + .into(); Ok(logical_plan.into()) } diff --git a/src/daft-plan/src/logical_ops/repartition.rs b/src/daft-plan/src/logical_ops/repartition.rs index 2fc6b715e0..b65a33bd35 100644 --- a/src/daft-plan/src/logical_ops/repartition.rs +++ b/src/daft-plan/src/logical_ops/repartition.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_error::{DaftError, DaftResult}; use daft_dsl::Expr; use crate::{LogicalPlan, PartitionScheme}; @@ -14,18 +15,23 @@ pub struct Repartition { } impl Repartition { - pub(crate) fn new( + pub(crate) fn try_new( input: Arc, num_partitions: Option, partition_by: Vec, scheme: PartitionScheme, - ) -> Self { - Self { + ) -> DaftResult { + if matches!(scheme, PartitionScheme::Range) { + return Err(DaftError::ValueError( + "Repartitioning with the Range partition scheme is not supported.".to_string(), + )); + } + Ok(Self { input, num_partitions, partition_by, scheme, - } + }) } pub fn multiline_display(&self) -> Vec { diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index dfccd30458..903334c24b 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -149,7 +149,7 @@ impl LogicalPlan { Self::Limit(Limit { limit, eager, .. }) => Self::Limit(Limit::new(input.clone(), *limit, *eager)), Self::Explode(Explode { to_explode, .. }) => Self::Explode(Explode::try_new(input.clone(), to_explode.clone()).unwrap()), Self::Sort(Sort { sort_by, descending, .. }) => Self::Sort(Sort::try_new(input.clone(), sort_by.clone(), descending.clone()).unwrap()), - Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone())), + Self::Repartition(Repartition { num_partitions, partition_by, scheme, .. }) => Self::Repartition(Repartition::try_new(input.clone(), *num_partitions, partition_by.clone(), scheme.clone()).unwrap()), Self::Distinct(_) => Self::Distinct(Distinct::new(input.clone())), Self::Aggregate(Aggregate { aggregations, groupby, ..}) => Self::Aggregate(Aggregate::try_new(input.clone(), aggregations.clone(), groupby.clone()).unwrap()), Self::Sink(Sink { schema, sink_info, .. }) => Self::Sink(Sink::new(input.clone(), schema.clone(), sink_info.clone())), From 06d45c07f4ef2975eddfccd5960be32dca9831ef Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Mon, 30 Oct 2023 14:31:40 -0700 Subject: [PATCH 09/10] Refactor to consolidated + eager drop repartitioning short-circuit. --- src/daft-plan/src/planner.rs | 45 ++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 9e54ad4ab3..8410ad0e1b 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -165,16 +165,37 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { partition_by, scheme, }) => { - let input_physical = Arc::new(plan(input)?); + // Below partition-dropping optimization assumes we are NOT repartitioning using a range partitioning scheme. + // A range repartitioning of an existing range-partitioned DataFrame is only redundant if the partition boundaries + // are consistent, which is only the case if boundary sampling is deterministic within a query. + assert!(!matches!(scheme, PartitionScheme::Range)); + + let input_physical = plan(input)?; let input_partition_spec = input_physical.partition_spec(); let input_num_partitions = input_partition_spec.num_partitions; let num_partitions = num_partitions.unwrap_or(input_num_partitions); + // Partition spec after repartitioning. + let repartitioned_partition_spec = PartitionSpec::new_internal( + scheme.clone(), + num_partitions, + Some(partition_by.clone()), + ); + // Drop the repartition if the output of the repartition would yield the same partitioning as the input. + if (input_num_partitions == 1 && num_partitions == 1) + // Simple split/coalesce repartition to the same # of partitions is a no-op, no matter the upstream partitioning scheme. + || (num_partitions == input_num_partitions && matches!(scheme, PartitionScheme::Unknown)) + // Repartitioning to the same partition spec as the input is always a no-op. + || (&repartitioned_partition_spec == input_partition_spec.as_ref()) + { + return Ok(input_physical); + } + let input_physical = Arc::new(input_physical); let repartitioned_plan = match scheme { PartitionScheme::Unknown => match num_partitions.cmp(&input_num_partitions) { Ordering::Greater => { // Split input partitions into num_partitions. let split_op = PhysicalPlan::Split(Split::new( - input_physical.clone(), + input_physical, input_num_partitions, num_partitions, )); @@ -183,26 +204,27 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { Ordering::Less => { // Coalesce input partitions into num_partitions. PhysicalPlan::Coalesce(Coalesce::new( - input_physical.clone(), + input_physical, input_num_partitions, num_partitions, )) } Ordering::Equal => { - // # of output partitions == # of input partitions, so we drop the redundant repartition. - return Ok(input_physical.as_ref().clone()); + // # of output partitions == # of input partitions; this should have already short-circuited with + // a repartition drop above. + unreachable!("Simple repartitioning with same # of output partitions as the input; this should have been dropped.") } }, PartitionScheme::Random => { let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( - input_physical.clone(), + input_physical, num_partitions, )); PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())) } PartitionScheme::Hash => { let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( - input_physical.clone(), + input_physical, num_partitions, partition_by.clone(), )); @@ -210,14 +232,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { } PartitionScheme::Range => unreachable!("Repartitioning by range is not supported"), }; - let repartitioned_partition_spec = repartitioned_plan.partition_spec(); - if (input_num_partitions == 1 && repartitioned_partition_spec.num_partitions == 1) - || (repartitioned_partition_spec == input_partition_spec) - { - Ok(input_physical.as_ref().clone()) - } else { - Ok(repartitioned_plan) - } + Ok(repartitioned_plan) } LogicalPlan::Distinct(LogicalDistinct { input }) => { let input_physical = plan(input)?; From 824d5b5033541f33fab25bb65dd2562b85b2069a Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Mon, 30 Oct 2023 14:33:57 -0700 Subject: [PATCH 10/10] Add DropRepartition comment about logical -> physical optimization. --- src/daft-plan/src/optimization/rules/drop_repartition.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/daft-plan/src/optimization/rules/drop_repartition.rs b/src/daft-plan/src/optimization/rules/drop_repartition.rs index c4df749f3c..a5a59a5099 100644 --- a/src/daft-plan/src/optimization/rules/drop_repartition.rs +++ b/src/daft-plan/src/optimization/rules/drop_repartition.rs @@ -7,6 +7,9 @@ use crate::LogicalPlan; use super::{ApplyOrder, OptimizerRule, Transformed}; /// Optimization rules for dropping unnecessary Repartitions. +/// +/// Dropping of Repartitions that would yield the same partitioning as their input +/// happens during logical -> physical plan translation. #[derive(Default, Debug)] pub struct DropRepartition {}