Skip to content

Commit

Permalink
rename (#4284)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Nov 19, 2022
1 parent 446c2ea commit 880e6fc
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 99 deletions.
23 changes: 12 additions & 11 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ impl LogicalPlanBuilder {
Ok(Self::from(LogicalPlan::Projection(Projection::try_new(
new_expr,
Arc::new(sort_plan),
None,
)?)))
}

Expand Down Expand Up @@ -874,12 +873,12 @@ pub fn project_with_column_index_alias(
x => x.alias(schema.field(i).name()),
})
.collect::<Vec<_>>();
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
alias_expr, input, schema, alias,
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(alias_expr, input, schema, alias)?,
))
}

/// Union two logical plans with an optional alias.
/// Union two logical plans.
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
let left_col_num = left_plan.schema().fields().len();

Expand Down Expand Up @@ -986,12 +985,14 @@ pub fn project_with_alias(
None => input_schema,
};

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
projected_expr,
Arc::new(plan.clone()),
DFSchemaRef::new(schema),
alias,
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
projected_expr,
Arc::new(plan.clone()),
DFSchemaRef::new(schema),
alias,
)?,
))
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
Expand Down
21 changes: 12 additions & 9 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,20 +1118,28 @@ impl Projection {
pub fn try_new(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
alias: Option<String>,
) -> Result<Self, DataFusionError> {
let schema = Arc::new(DFSchema::new_with_metadata(
exprlist_to_fields(&expr, &input)?,
input.schema().metadata().clone(),
)?);
Self::try_new_with_schema(expr, input, schema, alias)
Self::try_new_with_schema(expr, input, schema)
}

/// Create a new Projection using the specified output schema
pub fn try_new_with_schema(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
) -> Result<Self, DataFusionError> {
Self::try_new_with_schema_alias(expr, input, schema, None)
}

/// Create a new Projection using the specified output schema
pub fn try_new_with_schema_alias(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
alias: Option<String>,
) -> Result<Self, DataFusionError> {
if expr.len() != schema.fields().len() {
Expand All @@ -1146,11 +1154,7 @@ impl Projection {
}

/// Create a new Projection using the specified output schema
pub fn new_from_schema(
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
alias: Option<String>,
) -> Self {
pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
let expr: Vec<Expr> = schema
.fields()
.iter()
Expand All @@ -1161,7 +1165,7 @@ impl Projection {
expr,
input,
schema,
alias,
alias: None,
}
}

Expand Down Expand Up @@ -1990,7 +1994,6 @@ mod tests {
schema: empty_schema.clone(),
})),
empty_schema,
None,
);
assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap()));
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,14 @@ pub fn from_plan(
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Projection(Projection { schema, alias, .. }) => {
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
LogicalPlan::Projection(Projection { schema, alias, .. }) => Ok(
LogicalPlan::Projection(Projection::try_new_with_schema_alias(
expr.to_vec(),
Arc::new(inputs[0].clone()),
schema.clone(),
alias.clone(),
)?))
}
)?),
),
LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
values: expr
Expand Down
15 changes: 8 additions & 7 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,14 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
alias.clone(),
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
alias.clone(),
)?,
))
}
LogicalPlan::Filter(filter) => {
let input = filter.input();
Expand Down Expand Up @@ -328,7 +330,6 @@ fn build_project_plan(
project_exprs,
Arc::new(input),
Arc::new(schema),
None,
)?))
}

Expand Down
24 changes: 13 additions & 11 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,19 @@ fn limit_push_down(
ancestor,
) => {
// Push down limit directly (projection doesn't change number of rows)
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
expr.clone(),
Arc::new(limit_push_down(
_optimizer,
ancestor,
input.as_ref(),
_optimizer_config,
)?),
schema.clone(),
alias.clone(),
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
expr.clone(),
Arc::new(limit_push_down(
_optimizer,
ancestor,
input.as_ref(),
_optimizer_config,
)?),
schema.clone(),
alias.clone(),
)?,
))
}
(
LogicalPlan::Union(Union { inputs, schema }),
Expand Down
23 changes: 12 additions & 11 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,16 @@ fn optimize_plan(
Ok(new_input)
} else {
let metadata = new_input.schema().metadata().clone();
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
new_expr,
Arc::new(new_input),
DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?),
alias.clone(),
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
new_expr,
Arc::new(new_input),
DFSchemaRef::new(DFSchema::new_with_metadata(
new_fields, metadata,
)?),
alias.clone(),
)?,
))
}
}
LogicalPlan::Join(Join {
Expand Down Expand Up @@ -836,11 +840,8 @@ mod tests {
// that the Column references are unqualified (e.g. their
// relation is `None`). PlanBuilder resolves the expressions
let expr = vec![col("a"), col("b")];
let plan = LogicalPlan::Projection(Projection::try_new(
expr,
Arc::new(table_scan),
None,
)?);
let plan =
LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);

assert_fields_eq(&plan, vec!["a", "b"]);

Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(child),
optimized_children_plan.schema().clone(),
None,
)))
}
} else {
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ impl OptimizerRule for SingleDistinctToGroupBy {
alias_expr,
Arc::new(outer_aggr),
schema.clone(),
None,
)?))
} else {
utils::optimize_children(self, plan, _optimizer_config)
Expand Down
Loading

0 comments on commit 880e6fc

Please sign in to comment.