diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 1d77502221..a44674db55 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -122,7 +122,7 @@ impl SplitActorPoolProjects { impl OptimizerRule for SplitActorPoolProjects { fn try_optimize(&self, plan: Arc) -> DaftResult>> { plan.transform_down(|node| match node.as_ref() { - LogicalPlan::Project(projection) => try_optimize_project(projection, node.clone(), 0), + LogicalPlan::Project(projection) => try_optimize_project(projection, node.clone()), _ => Ok(Transformed::no(node)), }) } @@ -370,8 +370,34 @@ fn split_projection( fn try_optimize_project( projection: &Project, plan: Arc, +) -> DaftResult>> { + // Add aliases to the expressions in the projection to preserve original names when splitting stateful UDFs. + // This is needed because when we split stateful UDFs, we create new names for intermediates, but we would like + // to have the same expression names as the original projection. + let aliased_projection_exprs = projection + .projection + .iter() + .map(|e| { + if has_stateful_udf(e) && !matches!(e.as_ref(), Expr::Alias(..)) { + e.alias(e.name()) + } else { + e.clone() + } + }) + .collect(); + + let aliased_projection = Project::try_new(projection.input.clone(), aliased_projection_exprs)?; + + recursive_optimize_project(&aliased_projection, plan, 0) +} + +fn recursive_optimize_project( + projection: &Project, + plan: Arc, recursive_count: usize, ) -> DaftResult>> { + // TODO: eliminate the need for recursive calls by doing a post-order traversal of the plan tree. + // Base case: no stateful UDFs at all let has_stateful_udfs = projection.projection.iter().any(has_stateful_udf); if !has_stateful_udfs { @@ -416,8 +442,11 @@ fn try_optimize_project( // Recursively run the rule on the new child Project let new_project = Project::try_new(projection.input.clone(), remaining)?; let new_child_project = LogicalPlan::Project(new_project.clone()).arced(); - let optimized_child_plan = - try_optimize_project(&new_project, new_child_project.clone(), recursive_count + 1)?; + let optimized_child_plan = recursive_optimize_project( + &new_project, + new_child_project.clone(), + recursive_count + 1, + )?; optimized_child_plan.data.clone() }; @@ -785,6 +814,67 @@ mod tests { Ok(()) } + #[test] + fn test_multiple_with_column_serial_no_alias() -> DaftResult<()> { + let scan_op = dummy_scan_operator(vec![Field::new("a", DataType::Utf8)]); + let scan_plan = dummy_scan_node(scan_op); + let stacked_stateful_project_expr = + create_stateful_udf(vec![create_stateful_udf(vec![col("a")])]); + + // Add a Projection with StatefulUDF and resource request + let project_plan = scan_plan + .select(vec![stacked_stateful_project_expr.clone()])? + .build(); + + let intermediate_name = "__TruncateRootStatefulUDF_0-0-0__"; + + let expected = scan_plan.select(vec![col("a")])?.build(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col("a"), + create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name), + ], + )?) + .arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name)])?).arced(); + let expected = + LogicalPlan::Project(Project::try_new(expected, vec![col(intermediate_name)])?).arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![ + col(intermediate_name), + create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("a"), + ], + )?) + .arced(); + let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("a")])?).arced(); + assert_optimized_plan_eq(project_plan.clone(), expected.clone())?; + + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + scan_plan.build(), + vec![create_stateful_udf(vec![col("a")]) + .clone() + .alias(intermediate_name)], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected, + vec![create_stateful_udf(vec![col(intermediate_name)]) + .clone() + .alias("a")], + )?) + .arced(); + assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?; + + Ok(()) + } + #[test] fn test_multiple_with_column_serial_multiarg() -> DaftResult<()> { let scan_op = dummy_scan_operator(vec![