Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARROW-12441] [DataFusion] Cross join implementation #11

Merged
merged 14 commits into from
Apr 22, 2021
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
}
LogicalPlan::Extension { .. } => unimplemented!(),
LogicalPlan::Union { .. } => unimplemented!(),
LogicalPlan::CrossJoin { .. } => unimplemented!(),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,11 @@ mod tests {
run_query(6).await
}

#[tokio::test]
async fn run_q9() -> Result<()> {
run_query(9).await
}

#[tokio::test]
async fn run_q10() -> Result<()> {
run_query(10).await
Expand Down
4 changes: 3 additions & 1 deletion datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [ ] MINUS
- [x] Joins
- [x] INNER JOIN
- [ ] CROSS JOIN
- [x] LEFT JOIN
- [x] RIGHT JOIN
- [x] CROSS JOIN
- [ ] OUTER JOIN
- [ ] Window

Expand Down
10 changes: 10 additions & 0 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ impl LogicalPlanBuilder {
}))
}
}
/// Apply a cross join
pub fn cross_join(&self, right: &LogicalPlan) -> Result<Self> {
let schema = self.plan.schema().join(right.schema())?;

Ok(Self::from(&LogicalPlan::CrossJoin {
left: Arc::new(self.plan.clone()),
right: Arc::new(right.clone()),
schema: DFSchemaRef::new(schema),
}))
}

/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
Expand Down
27 changes: 24 additions & 3 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ pub enum LogicalPlan {
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Apply Cross Join to two logical plans
CrossJoin {
/// Left input
left: Arc<LogicalPlan>,
/// Right input
right: Arc<LogicalPlan>,
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Repartition the plan based on a partitioning scheme.
Repartition {
/// The incoming logical plan
Expand Down Expand Up @@ -203,6 +212,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => &schema,
LogicalPlan::CrossJoin { schema, .. } => &schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => &schema,
Expand All @@ -229,6 +239,11 @@ impl LogicalPlan {
right,
schema,
..
}
| LogicalPlan::CrossJoin {
left,
right,
schema,
} => {
let mut schemas = left.all_schemas();
schemas.extend(right.all_schemas());
Expand Down Expand Up @@ -290,8 +305,9 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. } => vec![],
LogicalPlan::Union { .. } => {
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union { .. } => {
vec![]
}
}
Expand All @@ -307,6 +323,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
LogicalPlan::CrossJoin { left, right, .. } => vec![left, right],
LogicalPlan::Limit { input, .. } => vec![input],
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
Expand Down Expand Up @@ -396,7 +413,8 @@ impl LogicalPlan {
LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. } => {
LogicalPlan::Join { left, right, .. }
| LogicalPlan::CrossJoin { left, right, .. } => {
left.accept(visitor)? && right.accept(visitor)?
}
LogicalPlan::Union { inputs, .. } => {
Expand Down Expand Up @@ -669,6 +687,9 @@ impl LogicalPlan {
keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
write!(f, "Join: {}", join_expr.join(", "))
}
LogicalPlan::CrossJoin { .. } => {
write!(f, "CrossJoin:")
}
LogicalPlan::Repartition {
partitioning_scheme,
..
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Explain { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. } => {
| LogicalPlan::Join { .. }
| LogicalPlan::CrossJoin { .. } => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
.collect::<HashSet<_>>();
issue_filters(state, used_columns, plan)
}
LogicalPlan::Join { left, right, .. } => {
LogicalPlan::Join { left, right, .. }
| LogicalPlan::CrossJoin { left, right, .. } => {
let (pushable_to_left, pushable_to_right, keep) =
get_join_predicates(&state, &left.schema(), &right.schema());

Expand Down
27 changes: 27 additions & 0 deletions datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
// we cannot predict the cardinality of the join output
None
}
LogicalPlan::CrossJoin { left, right, .. } => {
// number of rows is equal to num_left * num_right
get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y))
}
LogicalPlan::Repartition { .. } => {
// we cannot predict how rows will be repartitioned
None
Expand Down Expand Up @@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder {
})
}
}
LogicalPlan::CrossJoin {
left,
right,
schema,
} => {
let left = self.optimize(left)?;
let right = self.optimize(right)?;
if should_swap_join_order(&left, &right) {
// Swap left and right
Ok(LogicalPlan::CrossJoin {
left: Arc::new(right),
right: Arc::new(left),
schema: schema.clone(),
})
} else {
// Keep join as is
Ok(LogicalPlan::CrossJoin {
left: Arc::new(left),
right: Arc::new(right),
schema: schema.clone(),
})
}
}
// Rest: recurse into plan, apply optimization where possible
LogicalPlan::Projection { .. }
| LogicalPlan::Aggregate { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
// collect all required columns by this plan
Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ pub fn from_plan(
on: on.clone(),
schema: schema.clone(),
}),
LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin {
left: Arc::new(inputs[0].clone()),
right: Arc::new(inputs[1].clone()),
schema: schema.clone(),
}),
LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
n: *n,
input: Arc::new(inputs[0].clone()),
Expand Down
Loading