Skip to content

Commit

Permalink
Stop copying plans in LogicalPlan::with_param_values (#10016)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Apr 12, 2024
1 parent 118eecd commit 2def10f
Showing 1 changed file with 28 additions and 51 deletions.
79 changes: 28 additions & 51 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,14 +913,19 @@ impl LogicalPlan {
param_values: impl Into<ParamValues>,
) -> Result<LogicalPlan> {
let param_values = param_values.into();
match self {
LogicalPlan::Prepare(prepare_lp) => {
param_values.verify(&prepare_lp.data_types)?;
let input_plan = prepare_lp.input;
input_plan.replace_params_with_values(&param_values)
let plan_with_values = self.replace_params_with_values(&param_values)?;

// unwrap Prepare
Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
param_values.verify(&prepare_lp.data_types)?;
// try and take ownership of the input if is not shared, clone otherwise
match Arc::try_unwrap(prepare_lp.input) {
Ok(input) => input,
Err(arc_input) => arc_input.as_ref().clone(),
}
_ => self.replace_params_with_values(&param_values),
}
} else {
plan_with_values
})
}

/// Returns the maximum number of rows that this plan can output, if known.
Expand Down Expand Up @@ -1046,27 +1051,26 @@ impl LogicalPlan {
/// ...) replaced with corresponding values provided in
/// `params_values`
///
/// See [`Self::with_param_values`] for examples and usage
/// See [`Self::with_param_values`] for examples and usage with an owned
/// `ParamValues`
pub fn replace_params_with_values(
&self,
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
let new_exprs = self
.expressions()
.into_iter()
.map(|e| {
let e = e.infer_placeholder_types(self.schema())?;
Self::replace_placeholders_with_values(e, param_values)
self.transform_up_with_subqueries(&|plan| {
let schema = plan.schema().clone();
plan.map_expressions(|e| {
e.infer_placeholder_types(&schema)?.transform_up(&|e| {
if let Expr::Placeholder(Placeholder { id, .. }) = e {
let value = param_values.get_placeholders_with_values(&id)?;
Ok(Transformed::yes(Expr::Literal(value)))
} else {
Ok(Transformed::no(e))
}
})
})
.collect::<Result<Vec<_>>>()?;

let new_inputs_with_values = self
.inputs()
.into_iter()
.map(|inp| inp.replace_params_with_values(param_values))
.collect::<Result<Vec<_>>>()?;

self.with_new_exprs(new_exprs, new_inputs_with_values)
})
.map(|res| res.data)
}

/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
Expand Down Expand Up @@ -1099,33 +1103,6 @@ impl LogicalPlan {
.map(|_| param_types)
}

/// Return an Expr with all placeholders replaced with their
/// corresponding values provided in the params_values
fn replace_placeholders_with_values(
expr: Expr,
param_values: &ParamValues,
) -> Result<Expr> {
expr.transform(&|expr| {
match &expr {
Expr::Placeholder(Placeholder { id, .. }) => {
let value = param_values.get_placeholders_with_values(id)?;
// Replace the placeholder with the value
Ok(Transformed::yes(Expr::Literal(value)))
}
Expr::ScalarSubquery(qry) => {
let subquery =
Arc::new(qry.subquery.replace_params_with_values(param_values)?);
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns: qry.outer_ref_columns.clone(),
})))
}
_ => Ok(Transformed::no(expr)),
}
})
.data()
}

// ------------
// Various implementations for printing out LogicalPlans
// ------------
Expand Down

0 comments on commit 2def10f

Please sign in to comment.