Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor: make plan_from_tables return one plan instead of Vec #4336

Merged
merged 1 commit into from
Nov 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 47 additions & 60 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,16 +632,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

fn plan_from_tables(
&self,
from: Vec<TableWithJoins>,
mut from: Vec<TableWithJoins>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<LogicalPlan>> {
) -> Result<LogicalPlan> {
match from.len() {
0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
_ => from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>(),
0 => Ok(LogicalPlanBuilder::empty(true).build()?),
1 => {
let from = from.remove(0);
self.plan_table_with_joins(from, ctes, outer_query_schema)
}
_ => {
let plans = from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>()?;
let mut left = plans[0].clone();
for right in plans.iter().skip(1) {
left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
}
Ok(left)
}
}
}

Expand Down Expand Up @@ -944,34 +955,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn plan_selection(
&self,
selection: Option<SQLExpr>,
plans: Vec<LogicalPlan>,
plan: LogicalPlan,
outer_query_schema: Option<&DFSchema>,
ctes: &mut HashMap<String, LogicalPlan>,
) -> Result<LogicalPlan> {
let cross_join_plan = if plans.len() == 1 {
plans[0].clone()
} else {
let mut left = plans[0].clone();
for right in plans.iter().skip(1) {
left = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
}
left
};
match selection {
Some(predicate_expr) => {
let mut fields = vec![];
let mut metadata = HashMap::new();
for plan in &plans {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
}

let mut join_schema = DFSchema::new_with_metadata(fields, metadata)?;
let mut join_schema = (**plan.schema()).clone();
let mut all_schemas: Vec<DFSchemaRef> = vec![];
for plan in plans {
for schema in plan.all_schemas() {
all_schemas.push(schema.clone());
}
for schema in plan.all_schemas() {
all_schemas.push(schema.clone());
}
if let Some(outer) = outer_query_schema {
all_schemas.push(Arc::new(outer.clone()));
Expand All @@ -990,10 +983,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

Ok(LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(cross_join_plan),
Arc::new(plan),
)?))
}
None => Ok(cross_join_plan),
None => Ok(plan),
}
}

Expand All @@ -1020,45 +1013,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

// process `from` clause
let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));
let plan = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice refactor -- thank you @jackwener

let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
// build from schema for unqualifier column ambiguous check
// we should get only one field for this unqualifier column from schema.
let from_schema = {
let mut fields = vec![];
let mut metadata = std::collections::HashMap::new();
for plan in &plans {
if let LogicalPlan::Join(HashJoin {
join_constraint: HashJoinConstraint::Using,
on,
left,
..
}) = plan
{
// For query: select id from t1 join t2 using(id), this is legal.
// We should dedup the fields for cols in using clause.
let mut plan_fields = plan.schema().fields().clone();
for join_cols in on.iter() {
let left_field = left.schema().field_from_column(&join_cols.0)?;
plan_fields.retain(|field| {
field.unqualified_column().name
!= left_field.unqualified_column().name
});
plan_fields.push(left_field.clone());
}
fields.extend_from_slice(&plan_fields);
metadata.extend(plan.schema().metadata().clone());
} else {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
let mut fields = plan.schema().fields().clone();

let metadata = plan.schema().metadata().clone();
if let LogicalPlan::Join(HashJoin {
join_constraint: HashJoinConstraint::Using,
ref on,
ref left,
..
}) = plan
{
// For query: select id from t1 join t2 using(id), this is legal.
// We should dedup the fields for cols in using clause.
for join_cols in on.iter() {
let left_field = left.schema().field_from_column(&join_cols.0)?;
fields.retain(|field| {
field.unqualified_column().name
!= left_field.unqualified_column().name
});
fields.push(left_field.clone());
}
}

DFSchema::new_with_metadata(fields, metadata)?
};

// process `where` clause
let plan =
self.plan_selection(select.selection, plans, outer_query_schema, ctes)?;
self.plan_selection(select.selection, plan, outer_query_schema, ctes)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs = self.prepare_select_exprs(
Expand Down