Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Expr::InSubquery and Expr::ScalarSubquery #2342

Merged
merged 8 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
| Expr::GetIndexedField { .. }
| Expr::Case { .. } => Recursion::Continue(self),

Expand Down
50 changes: 50 additions & 0 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,56 @@ mod tests {
Ok(())
}

#[test]
fn filter_in_subquery() -> Result<()> {
let foo = test_table_scan_with_name("foo")?;
let bar = test_table_scan_with_name("bar")?;

let subquery = LogicalPlanBuilder::from(foo)
.project(vec![col("a")])?
.filter(col("a").eq(col("bar.a")))?
.build()?;

// SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
let outer_query = LogicalPlanBuilder::from(bar)
.project(vec![col("a")])?
.filter(in_subquery(col("a"), Arc::new(subquery)))?
.build()?;

let expected = "Filter: #bar.a IN (Subquery: Filter: #foo.a = #bar.a\
\n Projection: #foo.a\
\n TableScan: foo projection=None)\
\n Projection: #bar.a\
\n TableScan: bar projection=None";
assert_eq!(expected, format!("{:?}", outer_query));

Ok(())
}

#[test]
fn select_scalar_subquery() -> Result<()> {
let foo = test_table_scan_with_name("foo")?;
let bar = test_table_scan_with_name("bar")?;

let subquery = LogicalPlanBuilder::from(foo)
.project(vec![col("b")])?
.filter(col("a").eq(col("bar.a")))?
.build()?;

// SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
let outer_query = LogicalPlanBuilder::from(bar)
.project(vec![scalar_subquery(Arc::new(subquery))])?
.build()?;

let expected = "Projection: (Subquery: Filter: #foo.a = #bar.a\
\n Projection: #foo.b\
\n TableScan: foo projection=None)\
\n TableScan: bar projection=None";
assert_eq!(expected, format!("{:?}", outer_query));

Ok(())
}

#[test]
fn projection_non_unique_names() -> Result<()> {
let plan = LogicalPlanBuilder::scan_empty(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
Expr::Alias(inner_expr, name) => {
Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name)
}
Expr::ScalarSubquery(_) => e.clone(),
_ => match e.name(input_schema) {
Ok(name) => match input_schema.field_with_unqualified_name(&name) {
Ok(field) => Expr::Column(field.qualified_column()),
Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/src/logical_plan/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,17 @@ impl ExprRewritable for Expr {
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(_) => self.clone(),
Expr::Exists(_) => self.clone(),
Expr::Exists { .. } => self.clone(),
Expr::InSubquery {
expr,
subquery,
negated,
} => Expr::InSubquery {
expr: rewrite_boxed(expr, rewriter)?,
subquery,
negated,
},
Expr::ScalarSubquery(_) => self.clone(),
Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/logical_plan/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ impl ExprVisitable for Expr {
Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::Exists(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => Ok(visitor),
Expr::BinaryExpr { left, right, .. } => {
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ pub use expr::{
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit,
lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now,
now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum,
sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
exists, exp, exprlist_to_fields, floor, in_list, in_subquery, initcap, left, length,
lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min,
not_exists, not_in_subquery, now, now_expr, nullif, octet_length, or, random,
regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim,
scalar_subquery, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt,
starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper,
when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,16 @@ impl ExprIdentifierVisitor<'_> {
desc.push_str("InList-");
desc.push_str(&negated.to_string());
}
Expr::Exists(_) => {
Expr::Exists { negated, .. } => {
desc.push_str("Exists-");
desc.push_str(&negated.to_string());
}
Expr::InSubquery { negated, .. } => {
desc.push_str("InSubquery-");
desc.push_str(&negated.to_string());
}
Expr::ScalarSubquery(_) => {
desc.push_str("ScalarSubquery-");
}
Expr::Wildcard => {
desc.push_str("Wildcard-");
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ impl<'a> ConstEvaluator<'a> {
| Expr::AggregateUDF { .. }
| Expr::ScalarVariable(_, _)
| Expr::Column(_)
| Expr::Exists(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point it would be cool to be able to evaluate scalar subqueries at plan time, "but not now" 😆

| Expr::WindowFunction { .. }
| Expr::Sort { .. }
| Expr::Wildcard
Expand Down
12 changes: 9 additions & 3 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
| Expr::Exists(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::GetIndexedField { .. } => {}
Expand Down Expand Up @@ -371,7 +373,9 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
}
Ok(expr_list)
}
Expr::Exists(_) => Ok(vec![]),
Expr::Exists { .. } => Ok(vec![]),
Expr::InSubquery { expr, .. } => Ok(vec![expr.as_ref().to_owned()]),
Expr::ScalarSubquery(_) => Ok(vec![]),
Expr::Wildcard { .. } => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
Expand Down Expand Up @@ -506,7 +510,9 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
Expr::Column(_)
| Expr::Literal(_)
| Expr::InList { .. }
| Expr::Exists(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _) => Ok(expr.clone()),
Expr::Sort {
asc, nulls_first, ..
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,15 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Ok(format!("{} IN ({:?})", expr, list))
}
}
Expr::Exists(_) => Err(DataFusionError::NotImplemented(
Expr::Exists { .. } => Err(DataFusionError::NotImplemented(
"EXISTS is not yet supported in the physical plan".to_string(),
)),
Expr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
"IN subquery is not yet supported in the physical plan".to_string(),
)),
Expr::ScalarSubquery(_) => Err(DataFusionError::NotImplemented(
"Scalar subqueries are not yet supported in the physical plan".to_string(),
)),
Expr::Between {
expr,
negated,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ pub use crate::execution::options::{
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
exists, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
octet_length, random, regexp_match, regexp_replace, repeat, replace, reverse, right,
rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr,
sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
exists, in_list, in_subquery, initcap, left, length, lit, lower, lpad, ltrim, max,
md5, min, not_exists, not_in_subquery, now, octet_length, random, regexp_match,
regexp_replace, repeat, replace, reverse, right, rpad, rtrim, scalar_subquery,
sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, sum, to_hex,
translate, trim, upper, Column, JoinType, Partitioning,
};
12 changes: 11 additions & 1 deletion datafusion/core/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,17 @@ where
Expr::Column { .. }
| Expr::Literal(_)
| Expr::ScalarVariable(_, _)
| Expr::Exists(_) => Ok(expr.clone()),
| Expr::Exists { .. }
| Expr::ScalarSubquery(_) => Ok(expr.clone()),
Expr::InSubquery {
expr: nested_expr,
subquery,
negated,
} => Ok(Expr::InSubquery {
expr: Box::new(clone_with_replacement(&**nested_expr, replacement_fn)?),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

subquery: subquery.clone(),
negated: *negated,
}),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField {
Expand Down
44 changes: 41 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,23 @@ pub enum Expr {
negated: bool,
},
/// EXISTS subquery
Exists(Subquery),
Exists {
/// subquery that will produce a single column of data
subquery: Subquery,
/// Whether the expression is negated
negated: bool,
},
/// IN subquery
InSubquery {
/// The expression to compare
expr: Box<Expr>,
/// subquery that will produce a single column of data to compare against
subquery: Subquery,
/// Whether the expression is negated
negated: bool,
},
/// Scalar subquery
ScalarSubquery(Subquery),
/// Represents a reference to all fields in a schema.
Wildcard,
/// Represents a reference to all fields in a specific schema.
Expand Down Expand Up @@ -434,7 +450,25 @@ impl fmt::Debug for Expr {
Expr::Negative(expr) => write!(f, "(- {:?})", expr),
Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
Expr::Exists(subquery) => write!(f, "EXISTS ({:?})", subquery),
Expr::Exists { subquery, negated } => {
if *negated {
write!(f, "NOT EXISTS ({:?})", subquery)
} else {
write!(f, "EXISTS ({:?})", subquery)
}
}
andygrove marked this conversation as resolved.
Show resolved Hide resolved
Expr::InSubquery {
expr,
subquery,
negated,
} => {
if *negated {
write!(f, "{:?} NOT IN ({:?})", expr, subquery)
} else {
write!(f, "{:?} IN ({:?})", expr, subquery)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same negated: true / negated: false could be applied to Expr::InSubquery as was applied to Expr::Exists

Expr::ScalarSubquery(subquery) => write!(f, "({:?})", subquery),
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right)
}
Expand Down Expand Up @@ -622,7 +656,11 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::Exists(_) => Ok("EXISTS".to_string()),
Expr::Exists { .. } => Ok("EXISTS".to_string()),
Expr::InSubquery { .. } => Ok("IN".to_string()),
andygrove marked this conversation as resolved.
Show resolved Hide resolved
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).name().clone())
}
Expr::GetIndexedField { expr, key } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{}[{}]", expr, key))
Expand Down
36 changes: 35 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,41 @@ pub fn approx_percentile_cont_with_weight(

/// Create an EXISTS subquery expression
pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
Expr::Exists(Subquery { subquery })
Expr::Exists {
subquery: Subquery { subquery },
negated: false,
}
}

/// Create a NOT EXISTS subquery expression
pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
Expr::Exists {
subquery: Subquery { subquery },
negated: true,
}
}

/// Create an IN subquery expression
pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
Expr::InSubquery {
expr: Box::new(expr),
subquery: Subquery { subquery },
negated: false,
}
}

/// Create a NOT IN subquery expression
pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
Expr::InSubquery {
expr: Box::new(expr),
subquery: Subquery { subquery },
negated: true,
}
}

/// Create a scalar subquery expression
pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr {
Expr::ScalarSubquery(Subquery { subquery })
}

// TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many
Expand Down
14 changes: 12 additions & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ impl ExprSchemable for Expr {
}
Expr::Not(_)
| Expr::IsNull(_)
| Expr::Exists(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).data_type().clone())
}
Expr::BinaryExpr {
ref left,
ref right,
Expand Down Expand Up @@ -173,7 +177,13 @@ impl ExprSchemable for Expr {
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. } => Ok(true),
Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists(_) => Ok(false),
Expr::IsNull(_)
| Expr::IsNotNull(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. } => Ok(false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think aIN can be nullable if its exprs are nullable:

alamb=# select null IN (select * from a);
 ?column? 
----------
 
(1 row)

So maybe Expr::InSubquery should return expr.nullable(input_schema) too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed.

Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).is_nullable())
}
Expr::BinaryExpr {
ref left,
ref right,
Expand Down
13 changes: 7 additions & 6 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ pub use expr_fn::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, case, ceil, character_length, chr, coalesce, col, concat,
concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, date_part,
date_trunc, digest, exists, exp, floor, in_list, initcap, left, length, ln, log10,
log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or,
random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad,
rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with,
strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_seconds, translate, trim, trunc, upper, when,
date_trunc, digest, exists, exp, floor, in_list, in_subquery, initcap, left, length,
ln, log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now,
now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, upper, when,
};
pub use expr_schema::ExprSchemable;
pub use function::{
Expand Down