Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest with single expression #9069

Merged
merged 14 commits into from
Feb 4, 2024
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(VisitRecursion::Stop)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ fn physical_name(e: &Expr) -> Result<String> {

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
Expand Down
13 changes: 13 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ pub enum Expr {
/// A place holder which hold a reference to a qualified field
/// in the outer query, used for correlated sub queries.
OuterReferenceColumn(DataType, Column),
/// Unnest expression
Unnest(Unnest),
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub exprs: Vec<Expr>,
}

/// Alias expression
Expand Down Expand Up @@ -917,6 +924,7 @@ impl Expr {
Expr::TryCast { .. } => "TryCast",
Expr::WindowFunction { .. } => "WindowFunction",
Expr::Wildcard { .. } => "Wildcard",
Expr::Unnest { .. } => "Unnest",
}
}

Expand Down Expand Up @@ -1307,6 +1315,7 @@ impl Expr {
| Expr::Negative(..)
| Expr::OuterReferenceColumn(_, _)
| Expr::TryCast(..)
| Expr::Unnest(..)
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
Expand Down Expand Up @@ -1561,6 +1570,9 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { exprs }) => {
write!(f, "UNNEST({exprs:?})")
}
}
}
}
Expand Down Expand Up @@ -1748,6 +1760,7 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest { exprs }) => Ok(format!("UNNEST({exprs:?})")),
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, &fun.args),
Expr::WindowFunction(WindowFunction {
fun,
Expand Down
12 changes: 11 additions & 1 deletion datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Expression rewriter

use crate::expr::Alias;
use crate::expr::{Alias, Unnest};
use crate::logical_plan::Projection;
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
Expand Down Expand Up @@ -75,6 +75,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
schemas: &[&[&DFSchema]],
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
// Normalize column inside Unnest
if let Expr::Unnest(Unnest { exprs }) = expr {
let e = normalize_col_with_schemas_and_ambiguity_check(
exprs[0].clone(),
schemas,
using_columns,
)?;
return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
}

expr.transform(&|expr| {
Ok({
if let Expr::Column(c) = expr {
Expand Down
10 changes: 9 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, BinaryExpr, Cast,
GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction,
ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
};
use crate::field_util::GetFieldAccessSchema;
use crate::type_coercion::binary::get_result_type;
Expand Down Expand Up @@ -82,6 +82,13 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
Expr::Unnest(Unnest { exprs }) => {
let arg_data_types = exprs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need iter here as we refer for the first element only?

Copy link
Contributor Author

@jayzhan211 jayzhan211 Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep this, since there will be multiple exprs in the future

.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok(arg_data_types[0].clone())
}
Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let arg_data_types = args
.iter()
Expand Down Expand Up @@ -250,6 +257,7 @@ impl ExprSchemable for Expr {
| Expr::ScalarFunction(..)
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::Unnest(_)
| Expr::Placeholder(_) => Ok(true),
Expr::IsNull(_)
| Expr::IsNotNull(_)
Expand Down
6 changes: 4 additions & 2 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Case,
Cast, GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder,
ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
};
use crate::{Expr, GetFieldAccess};

Expand All @@ -33,7 +33,7 @@ impl TreeNode for Expr {
op: &mut F,
) -> Result<VisitRecursion> {
let children = match self {
Expr::Alias(Alias{expr, .. })
Expr::Alias(Alias{expr,..})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand All @@ -58,6 +58,7 @@ impl TreeNode for Expr {
GetFieldAccess::NamedStructField { .. } => vec![expr],
}
}
Expr::Unnest(Unnest { exprs }) |
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
Expand Down Expand Up @@ -151,6 +152,7 @@ impl TreeNode for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::Unnest(_)
| Expr::Literal(_) => self,
Expr::Alias(Alias {
expr,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
// Use explicit pattern match instead of a default
// implementation, so that in the future if someone adds
// new Expr types, they will check here as well
Expr::ScalarVariable(_, _)
Expr::Unnest(_)
| Expr::ScalarVariable(_, _)
| Expr::Alias(_)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
Expand Down
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ impl TreeNodeRewriter for TypeCoercionRewriter {

fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::Unnest(_) => internal_err!(
"Unnest should be rewritten to LogicalPlan::Unnest before type coercion"
),
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::Unnest(_)
| Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(_),
..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ impl<'a> ConstEvaluator<'a> {
ScalarFunctionDefinition::Name(_) => false,
},
Expr::Literal(_)
| Expr::Unnest(_)
| Expr::BinaryExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ message LogicalExprNode {

PlaceholderNode placeholder = 34;

Unnest unnest = 35;

}
}

Expand Down Expand Up @@ -531,6 +533,10 @@ message NegativeNode {
LogicalExprNode expr = 1;
}

message Unnest {
repeated LogicalExprNode exprs = 1;
}

message InListNode {
LogicalExprNode expr = 1;
repeated LogicalExprNode list = 2;
Expand Down
104 changes: 104 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading