Skip to content

Commit

Permalink
finish expr rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Feb 4, 2022
1 parent ad84513 commit b11f2bd
Showing 1 changed file with 126 additions and 22 deletions.
148 changes: 126 additions & 22 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,28 +527,132 @@ impl Expr {
/// rewrite the subquery to apply to the outer logical plan
pub fn rewrite_to(&self, outer: &mut LogicalPlan) -> Result<Expr> {
Ok(match self {
Expr::Alias(_, _) => todo!(),
Expr::Column(_) => todo!(),
Expr::ScalarVariable(_) => todo!(),
Expr::Literal(_) => todo!(),
Expr::BinaryExpr { .. } => todo!(),
Expr::Not(_) => todo!(),
Expr::IsNotNull(_) => todo!(),
Expr::IsNull(_) => todo!(),
Expr::Negative(_) => todo!(),
Expr::GetIndexedField { .. } => todo!(),
Expr::Between { .. } => todo!(),
Expr::Case { .. } => todo!(),
Expr::Cast { .. } => todo!(),
Expr::TryCast { .. } => todo!(),
Expr::Sort { .. } => todo!(),
Expr::ScalarFunction { .. } => todo!(),
Expr::ScalarUDF { .. } => todo!(),
Expr::AggregateFunction { .. } => todo!(),
Expr::WindowFunction { .. } => todo!(),
Expr::AggregateUDF { .. } => todo!(),
Expr::InList { .. } => todo!(),
Expr::Wildcard => todo!(),
Expr::Alias(expr, name) => {
Expr::Alias(Box::new(expr.rewrite_to(outer).unwrap()), name.clone())
}
Expr::Column(col) => Expr::Column(col.clone()),
Expr::ScalarVariable(vars) => Expr::ScalarVariable(vars.clone()),
Expr::Literal(val) => Expr::Literal(val.clone()),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
left: Box::new(left.rewrite_to(outer).unwrap()),
op: op.clone(),
right: Box::new(right.rewrite_to(outer).unwrap()),
},
Expr::Not(expr) => Expr::Not(Box::new(expr.rewrite_to(outer).unwrap())),
Expr::IsNotNull(expr) => {
Expr::IsNotNull(Box::new(expr.rewrite_to(outer).unwrap()))
}
Expr::IsNull(expr) => Expr::IsNull(Box::new(expr.rewrite_to(outer).unwrap())),
Expr::Negative(expr) => {
Expr::Negative(Box::new(expr.rewrite_to(outer).unwrap()))
}
Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
expr: Box::new(expr.rewrite_to(outer).unwrap()),
key: key.clone(),
},
Expr::Between {
expr,
negated,
low,
high,
} => Expr::Between {
expr: Box::new(expr.rewrite_to(outer).unwrap()),
negated: *negated,
low: Box::new(low.rewrite_to(outer).unwrap()),
high: Box::new(high.rewrite_to(outer).unwrap()),
},
Expr::Case {
expr,
when_then_expr,
else_expr,
} => Expr::Case {
expr: if let Some(e) = expr {
Option::from(Box::new(e.rewrite_to(outer).unwrap()))
} else {
None
},
when_then_expr: when_then_expr
.into_iter()
.map(|(when, then)| {
(
Box::new(when.rewrite_to(outer).unwrap()),
Box::new(then.rewrite_to(outer).unwrap()),
)
})
.collect(),
else_expr: if let Some(e) = else_expr {
Option::from(Box::new(e.rewrite_to(outer).unwrap()))
} else {
None
},
},
Expr::Cast { expr, data_type } => Expr::Cast {
expr: Box::new(expr.rewrite_to(outer).unwrap()),
data_type: data_type.clone(),
},
Expr::TryCast { expr, data_type } => Expr::TryCast {
expr: Box::new(expr.rewrite_to(outer).unwrap()),
data_type: data_type.clone(),
},
Expr::Sort {
expr,
asc,
nulls_first,
} => Expr::Sort {
expr: Box::new(expr.rewrite_to(outer).unwrap()),
asc: *asc,
nulls_first: *nulls_first,
},
Expr::ScalarFunction { fun, args } => Expr::ScalarFunction {
fun: fun.clone(),
args: args.iter().map(|a| a.rewrite_to(outer).unwrap()).collect(),
},
Expr::ScalarUDF { fun, args } => Expr::ScalarUDF {
fun: fun.clone(),
args: args.iter().map(|a| a.rewrite_to(outer).unwrap()).collect(),
},
Expr::AggregateFunction {
fun,
args,
distinct,
} => Expr::AggregateFunction {
fun: fun.clone(),
args: args.iter().map(|a| a.rewrite_to(outer).unwrap()).collect(),
distinct: *distinct,
},
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
} => Expr::WindowFunction {
fun: fun.clone(),
args: args.iter().map(|a| a.rewrite_to(outer).unwrap()).collect(),
partition_by: partition_by
.iter()
.map(|e| e.rewrite_to(outer).unwrap())
.collect(),
order_by: order_by
.iter()
.map(|e| e.rewrite_to(outer).unwrap())
.collect(),
window_frame: window_frame.clone(),
},
Expr::AggregateUDF { fun, args } => Expr::AggregateUDF {
fun: fun.clone(),
args: args.iter().map(|a| a.rewrite_to(outer).unwrap()).collect(),
},
Expr::InList {
expr,
list,
negated,
} => Expr::InList {
expr: Box::new(expr.rewrite_to(outer).unwrap()),
list: list.iter().map(|e| e.rewrite_to(outer).unwrap()).collect(),
negated: *negated,
},
Expr::Wildcard => Expr::Wildcard,
Expr::Exists(logical_plan) => {
*outer = outer
.clone()
Expand Down

0 comments on commit b11f2bd

Please sign in to comment.