Skip to content

Commit

Permalink
fix build
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Feb 3, 2022
1 parent e4a056f commit 956d7f1
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 37 deletions.
1 change: 1 addition & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.15", optional = true }
tempfile = "3"
parking_lot = "0.11"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
criterion = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard => {
| Expr::Wildcard
| Expr::Exists(_) => {
*self.is_applicable = false;
Recursion::Stop(self)
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/logical_plan/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::fmt::{Display, Formatter};
pub type DFSchemaRef = Arc<DFSchema>;

/// DFSchema wraps an Arrow schema and adds relation names
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DFSchema {
/// Fields
fields: Vec<DFField>,
Expand Down Expand Up @@ -403,7 +403,7 @@ impl Display for DFSchema {
}

/// DFField wraps an Arrow field and adds an optional qualifier
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DFField {
/// Optional qualifier (usually a table or relation name)
qualifier: Option<String>,
Expand Down
69 changes: 67 additions & 2 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ pub enum Expr {
},
/// Represents a reference to all fields in a schema.
Wildcard,
/// Exists subquery
Exists(Box<LogicalPlan>),
}

/// Fixed seed for the hashing so that Ords are consistent across runs
Expand Down Expand Up @@ -469,9 +471,55 @@ impl Expr {

get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
}
Expr::Exists(logical_plan) => {
let df_fields = logical_plan.schema().fields();
let mut fields = Vec::with_capacity(df_fields.len());
let _ = df_fields
.iter()
.map(|df_field| fields.push(df_field.field().clone()));
match fields.len() {
1 => Ok(fields[0].data_type().clone()),
_ => Ok(DataType::Struct(fields)),
}
}
}
}

pub fn rewrite_to(self, out_arity: usize, outer: &mut LogicalPlan) -> Result<Expr> {
Ok(match self {
Expr::Alias(_, _) => todo!(),
Expr::Column(_) => todo!(),
Expr::ScalarVariable(_) => todo!(),
Expr::Literal(_) => todo!(),
Expr::BinaryExpr { .. } => todo!(),
Expr::Not(_) => todo!(),
Expr::IsNotNull(_) => todo!(),
Expr::IsNull(_) => todo!(),
Expr::Negative(_) => todo!(),
Expr::GetIndexedField { .. } => todo!(),
Expr::Between { .. } => todo!(),
Expr::Case { .. } => todo!(),
Expr::Cast { .. } => todo!(),
Expr::TryCast { .. } => todo!(),
Expr::Sort { .. } => todo!(),
Expr::ScalarFunction { .. } => todo!(),
Expr::ScalarUDF { .. } => todo!(),
Expr::AggregateFunction { .. } => todo!(),
Expr::WindowFunction { .. } => todo!(),
Expr::AggregateUDF { .. } => todo!(),
Expr::InList { .. } => todo!(),
Expr::Wildcard => todo!(),
Expr::Exists(logicalPlan) => {
*outer = outer.process_subquery(logicalPlan.as_ref()).unwrap();
// finally, return the column contains `bool`
Expr::Column(Column {
relation: None,
name: "".to_string(),
})
}
})
}

/// Returns the nullability of the expression based on [arrow::datatypes::Schema].
///
/// # Errors
Expand Down Expand Up @@ -527,6 +575,14 @@ impl Expr {
let data_type = expr.get_type(input_schema)?;
get_indexed_field(&data_type, key).map(|x| x.is_nullable())
}
Expr::Exists(logical_plan) => {
for df_field in logical_plan.schema().fields() {
if !df_field.is_nullable() {
return Ok(false);
}
}
Ok(true)
}
}
}

Expand Down Expand Up @@ -706,7 +762,7 @@ impl Expr {
/// pre_visit(Column("foo"))
/// pre_visit(Column("bar"))
/// post_visit(Column("bar"))
/// post_visit(Column("bar"))
/// post_visit(Column("foo"))
/// post_visit(BinaryExpr(GT))
/// ```
///
Expand Down Expand Up @@ -800,6 +856,8 @@ impl Expr {
list.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))
}
Expr::Wildcard | Expr::Exists(_) => Ok(visitor),
Expr::GetIndexedField { ref expr, .. } => expr.accept(visitor),
}?;

visitor.post_visit(self)
Expand All @@ -826,7 +884,7 @@ impl Expr {
/// ```text
/// pre_visit(BinaryExpr(GT))
/// pre_visit(Column("foo"))
/// mutatate(Column("foo"))
/// mutate(Column("foo"))
/// pre_visit(Column("bar"))
/// mutate(Column("bar"))
/// mutate(BinaryExpr(GT))
Expand Down Expand Up @@ -964,6 +1022,7 @@ impl Expr {
expr: rewrite_boxed(expr, rewriter)?,
key,
},
Expr::Exists(logical_plan) => Expr::Exists(logical_plan),
};

// now rewrite this expression itself
Expand Down Expand Up @@ -2017,6 +2076,9 @@ impl fmt::Debug for Expr {
Expr::GetIndexedField { ref expr, key } => {
write!(f, "({:?})[{}]", expr, key)
}
Expr::Exists(logical_plan) => {
write!(f, "exists subquery: {:?}", logical_plan)
}
}
}
}
Expand Down Expand Up @@ -2178,6 +2240,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Expr::Wildcard => Err(DataFusionError::Internal(
"Create name does not support wildcard".to_string(),
)),
Expr::Exists(logical_plan) => {
Ok(format!("Exists subquery {}", logical_plan.display()))
}
}
}

Expand Down
Loading

0 comments on commit 956d7f1

Please sign in to comment.