diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a181f98b6eb6..222b76739feb 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -940,6 +940,7 @@ impl TryInto for &LogicalPlan { } LogicalPlan::Extension { .. } => unimplemented!(), LogicalPlan::Union { .. } => unimplemented!(), + LogicalPlan::CrossJoin { .. } => unimplemented!(), } } } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 328a68dd6a6f..b203ceb3f741 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1374,6 +1374,11 @@ mod tests { run_query(6).await } + #[tokio::test] + async fn run_q9() -> Result<()> { + run_query(9).await + } + #[tokio::test] async fn run_q10() -> Result<()> { run_query(10).await diff --git a/datafusion/README.md b/datafusion/README.md index 9e6b7a2a78b5..ff0b26d7bf03 100644 --- a/datafusion/README.md +++ b/datafusion/README.md @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the - [ ] MINUS - [x] Joins - [x] INNER JOIN - - [ ] CROSS JOIN + - [x] LEFT JOIN + - [x] RIGHT JOIN + - [x] CROSS JOIN - [ ] OUTER JOIN - [ ] Window diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index fed82fd23b81..b6017b743ed7 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -270,6 +270,16 @@ impl LogicalPlanBuilder { })) } } + /// Apply a cross join + pub fn cross_join(&self, right: &LogicalPlan) -> Result { + let schema = self.plan.schema().join(right.schema())?; + + Ok(Self::from(&LogicalPlan::CrossJoin { + left: Arc::new(self.plan.clone()), + right: Arc::new(right.clone()), + schema: DFSchemaRef::new(schema), + })) + } /// Repartition pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result { diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index d1b9b827a5a3..606ef1e22275 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -113,6 +113,15 @@ pub enum LogicalPlan { /// The output schema, containing fields from the left and right inputs schema: DFSchemaRef, }, + /// Apply Cross Join to two logical plans + CrossJoin { + /// Left input + left: Arc, + /// Right input + right: Arc, + /// The output schema, containing fields from the left and right inputs + schema: DFSchemaRef, + }, /// Repartition the plan based on a partitioning scheme. Repartition { /// The incoming logical plan @@ -203,6 +212,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { schema, .. } => &schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => &schema, + LogicalPlan::CrossJoin { schema, .. } => &schema, LogicalPlan::Repartition { input, .. } => input.schema(), LogicalPlan::Limit { input, .. } => input.schema(), LogicalPlan::CreateExternalTable { schema, .. } => &schema, @@ -229,6 +239,11 @@ impl LogicalPlan { right, schema, .. + } + | LogicalPlan::CrossJoin { + left, + right, + schema, } => { let mut schemas = left.all_schemas(); schemas.extend(right.all_schemas()); @@ -290,8 +305,9 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Limit { .. } | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => vec![], - LogicalPlan::Union { .. } => { + | LogicalPlan::CrossJoin { .. } + | LogicalPlan::Explain { .. } + | LogicalPlan::Union { .. } => { vec![] } } @@ -307,6 +323,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], + LogicalPlan::CrossJoin { left, right, .. } => vec![left, right], LogicalPlan::Limit { input, .. } => vec![input], LogicalPlan::Extension { node } => node.inputs(), LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), @@ -396,7 +413,8 @@ impl LogicalPlan { LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, - LogicalPlan::Join { left, right, .. } => { + LogicalPlan::Join { left, right, .. } + | LogicalPlan::CrossJoin { left, right, .. } => { left.accept(visitor)? && right.accept(visitor)? } LogicalPlan::Union { inputs, .. } => { @@ -669,6 +687,9 @@ impl LogicalPlan { keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect(); write!(f, "Join: {}", join_expr.join(", ")) } + LogicalPlan::CrossJoin { .. } => { + write!(f, "CrossJoin:") + } LogicalPlan::Repartition { partitioning_scheme, .. diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 2fa03eb5c709..32db6ebd5722 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding { | LogicalPlan::Explain { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } => { + | LogicalPlan::Join { .. } + | LogicalPlan::CrossJoin { .. } => { // apply the optimization to all inputs of the plan let inputs = plan.inputs(); let new_inputs = inputs diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index ec260a41dc57..4622e9fc62dc 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { .collect::>(); issue_filters(state, used_columns, plan) } - LogicalPlan::Join { left, right, .. } => { + LogicalPlan::Join { left, right, .. } + | LogicalPlan::CrossJoin { left, right, .. } => { let (pushable_to_left, pushable_to_right, keep) = get_join_predicates(&state, &left.schema(), &right.schema()); diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index f44050f0b72e..086e2f03196b 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { // we cannot predict the cardinality of the join output None } + LogicalPlan::CrossJoin { left, right, .. } => { + // number of rows is equal to num_left * num_right + get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y)) + } LogicalPlan::Repartition { .. } => { // we cannot predict how rows will be repartitioned None @@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder { }) } } + LogicalPlan::CrossJoin { + left, + right, + schema, + } => { + let left = self.optimize(left)?; + let right = self.optimize(right)?; + if should_swap_join_order(&left, &right) { + // Swap left and right + Ok(LogicalPlan::CrossJoin { + left: Arc::new(right), + right: Arc::new(left), + schema: schema.clone(), + }) + } else { + // Keep join as is + Ok(LogicalPlan::CrossJoin { + left: Arc::new(left), + right: Arc::new(right), + schema: schema.clone(), + }) + } + } // Rest: recurse into plan, apply optimization where possible LogicalPlan::Projection { .. } | LogicalPlan::Aggregate { .. } diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 6b1cdfe18ca7..7243fa52d9b3 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -270,6 +270,7 @@ fn optimize_plan( | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Union { .. } + | LogicalPlan::CrossJoin { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); // collect all required columns by this plan diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index fe1d02381917..0ec3fa7c02a1 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -208,6 +208,11 @@ pub fn from_plan( on: on.clone(), schema: schema.clone(), }), + LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin { + left: Arc::new(inputs[0].clone()), + right: Arc::new(inputs[1].clone()), + schema: schema.clone(), + }), LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit { n: *n, input: Arc::new(inputs[0].clone()), diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs new file mode 100644 index 000000000000..4372352d6ecf --- /dev/null +++ b/datafusion/src/physical_plan/cross_join.rs @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the cross join plan for loading the left side of the cross join +//! and producing batches in parallel for the right partitions + +use futures::{lock::Mutex, StreamExt}; +use std::{any::Any, sync::Arc, task::Poll}; + +use crate::physical_plan::memory::MemoryStream; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +use futures::{Stream, TryStreamExt}; + +use super::{hash_utils::check_join_is_valid, merge::MergeExec}; +use crate::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; +use async_trait::async_trait; +use std::time::Instant; + +use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; +use log::debug; + +/// Data of the left side +type JoinLeftData = RecordBatch; + +/// executes partitions in parallel and combines them into a set of +/// partitions by combining all values from the left with all values on the right +#[derive(Debug)] +pub struct CrossJoinExec { + /// left (build) side which gets loaded in memory + left: Arc, + /// right (probe) side which are combined with left side + right: Arc, + /// The schema once the join is applied + schema: SchemaRef, + /// Build-side data + build_side: Arc>>, +} + +impl CrossJoinExec { + /// Tries to create a new [CrossJoinExec]. + /// # Error + /// This function errors when left and right schema's can't be combined + pub fn try_new( + left: Arc, + right: Arc, + ) -> Result { + let left_schema = left.schema(); + let right_schema = right.schema(); + check_join_is_valid(&left_schema, &right_schema, &[])?; + + let left_schema = left.schema(); + let left_fields = left_schema.fields().iter(); + let right_schema = right.schema(); + + let right_fields = right_schema.fields().iter(); + + // left then right + let all_columns = left_fields.chain(right_fields).cloned().collect(); + + let schema = Arc::new(Schema::new(all_columns)); + + Ok(CrossJoinExec { + left, + right, + schema, + build_side: Arc::new(Mutex::new(None)), + }) + } + + /// left (build) side which gets loaded in memory + pub fn left(&self) -> &Arc { + &self.left + } + + /// right side which gets combined with left side + pub fn right(&self) -> &Arc { + &self.right + } +} + +#[async_trait] +impl ExecutionPlan for CrossJoinExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 2 => Ok(Arc::new(CrossJoinExec::try_new( + children[0].clone(), + children[1].clone(), + )?)), + _ => Err(DataFusionError::Internal( + "CrossJoinExec wrong number of children".to_string(), + )), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.right.output_partitioning() + } + + async fn execute(&self, partition: usize) -> Result { + // we only want to compute the build side once + let left_data = { + let mut build_side = self.build_side.lock().await; + + match build_side.as_ref() { + Some(stream) => stream.clone(), + None => { + let start = Instant::now(); + + // merge all left parts into a single stream + let merge = MergeExec::new(self.left.clone()); + let stream = merge.execute(0).await?; + + // Load all batches and count the rows + let (batches, num_rows) = stream + .try_fold((Vec::new(), 0usize), |mut acc, batch| async { + acc.1 += batch.num_rows(); + acc.0.push(batch); + Ok(acc) + }) + .await?; + let merged_batch = + concat_batches(&self.left.schema(), &batches, num_rows)?; + *build_side = Some(merged_batch.clone()); + + debug!( + "Built build-side of cross join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + + merged_batch + } + } + }; + + let stream = self.right.execute(partition).await?; + + if left_data.num_rows() == 0 { + return Ok(Box::pin(MemoryStream::try_new( + vec![], + self.schema.clone(), + None, + )?)); + } + + Ok(Box::pin(CrossJoinStream { + schema: self.schema.clone(), + left_data, + right: stream, + right_batch: Arc::new(std::sync::Mutex::new(None)), + left_index: 0, + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + join_time: 0, + })) + } +} + +/// A stream that issues [RecordBatch]es as they arrive from the right of the join. +struct CrossJoinStream { + /// Input schema + schema: Arc, + /// data from the left side + left_data: JoinLeftData, + /// right + right: SendableRecordBatchStream, + /// Current value on the left + left_index: usize, + /// Current batch being processed from the right side + right_batch: Arc>>, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for joining probe-side batches to the build-side batches + join_time: usize, +} + +impl RecordBatchStream for CrossJoinStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} +fn build_batch( + left_index: usize, + batch: &RecordBatch, + left_data: &RecordBatch, + schema: &Schema, +) -> ArrowResult { + // Repeat value on the left n times + let arrays = left_data + .columns() + .iter() + .map(|arr| { + let scalar = ScalarValue::try_from_array(arr, left_index)?; + Ok(scalar.to_array_of_size(batch.num_rows())) + }) + .collect::>>() + .map_err(|x| x.into_arrow_external_error())?; + + RecordBatch::try_new( + Arc::new(schema.clone()), + arrays + .iter() + .chain(batch.columns().iter()) + .cloned() + .collect(), + ) +} + +#[async_trait] +impl Stream for CrossJoinStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.left_index > 0 && self.left_index < self.left_data.num_rows() { + let start = Instant::now(); + let right_batch = { + let right_batch = self.right_batch.lock().unwrap(); + right_batch.clone().unwrap() + }; + let result = + build_batch(self.left_index, &right_batch, &self.left_data, &self.schema); + self.num_input_rows += right_batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + self.left_index += 1; + return Poll::Ready(Some(result)); + } + self.left_index = 0; + self.right + .poll_next_unpin(cx) + .map(|maybe_batch| match maybe_batch { + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch( + self.left_index, + &batch, + &self.left_data, + &self.schema, + ); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + self.left_index = 1; + + let mut right_batch = self.right_batch.lock().unwrap(); + *right_batch = Some(batch); + + Some(result) + } + other => { + debug!( + "Processed {} probe-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.join_time + ); + other + } + }) + } +} diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index b26ff9bb5fc2..a38cc092123d 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -52,11 +52,6 @@ fn check_join_set_is_valid( right: &HashSet, on: &JoinOn, ) -> Result<()> { - if on.is_empty() { - return Err(DataFusionError::Plan( - "The 'on' clause of a join cannot be empty".to_string(), - )); - } let on_left = &on.iter().map(|on| on.0.to_string()).collect::>(); let left_missing = on_left.difference(left).collect::>(); diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index bef9bcc62dff..9022077559ac 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -17,6 +17,7 @@ //! Execution plan for reading in-memory batches of data +use core::fmt; use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; @@ -31,7 +32,6 @@ use async_trait::async_trait; use futures::Stream; /// Execution plan for reading in-memory batches of data -#[derive(Debug)] pub struct MemoryExec { /// The partitions to query partitions: Vec>, @@ -41,6 +41,14 @@ pub struct MemoryExec { projection: Option>, } +impl fmt::Debug for MemoryExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "partitions: [...]")?; + write!(f, "schema: {:?}", self.schema)?; + write!(f, "projection: {:?}", self.projection) + } +} + #[async_trait] impl ExecutionPlan for MemoryExec { /// Return a reference to Any that can be used for downcasting diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 5036dcb921bb..3eb59e911675 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -335,6 +335,7 @@ pub mod aggregates; pub mod array_expressions; pub mod coalesce_batches; pub mod common; +pub mod cross_join; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod csv; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index f9279ae48f0c..ae6ad5075d87 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use super::{ - aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, + aggregates, cross_join::CrossJoinExec, empty::EmptyExec, expressions::binary, + functions, hash_join::PartitionMode, udaf, union::UnionExec, }; use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; @@ -328,6 +328,11 @@ impl DefaultPhysicalPlanner { )?)) } } + LogicalPlan::CrossJoin { left, right, .. } => { + let left = self.create_initial_plan(left, ctx_state)?; + let right = self.create_initial_plan(right, ctx_state)?; + Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) + } LogicalPlan::EmptyRelation { produce_one_row, schema, diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index f3cba232a23a..a40d0becdcb4 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -355,12 +355,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { JoinOperator::Inner(constraint) => { self.parse_join(left, &right, constraint, JoinType::Inner) } + JoinOperator::CrossJoin => self.parse_cross_join(left, &right), other => Err(DataFusionError::NotImplemented(format!( "Unsupported JOIN operator {:?}", other ))), } } + fn parse_cross_join( + &self, + left: &LogicalPlan, + right: &LogicalPlan, + ) -> Result { + LogicalPlanBuilder::from(&left).cross_join(&right)?.build() + } fn parse_join( &self, @@ -489,9 +497,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } if join_keys.is_empty() { - return Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )); + left = + LogicalPlanBuilder::from(&left).cross_join(right)?.build()?; } else { let left_keys: Vec<_> = join_keys.iter().map(|(l, _)| *l).collect(); @@ -517,9 +524,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if plans.len() == 1 { Ok(plans[0].clone()) } else { - Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )) + let mut left = plans[0].clone(); + for right in plans.iter().skip(1) { + left = + LogicalPlanBuilder::from(&left).cross_join(right)?.build()?; + } + Ok(left) } } }; diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index f4d4e65f3a4e..70baffc700ba 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1289,15 +1289,57 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { } #[tokio::test] -async fn cartesian_join() -> Result<()> { - let ctx = create_join_context("t1_id", "t2_id")?; +async fn cross_join() { + let mut ctx = create_join_context("t1_id", "t2_id").unwrap(); + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; - let maybe_plan = ctx.create_logical_plan(&sql); + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + assert_eq!( - "This feature is not implemented: Cartesian joins are not supported", - &format!("{}", maybe_plan.err().unwrap()) + actual, + [ + ["11", "a", "z"], + ["11", "a", "y"], + ["11", "a", "x"], + ["11", "a", "w"], + ["22", "b", "z"], + ["22", "b", "y"], + ["22", "b", "x"], + ["22", "b", "w"], + ["33", "c", "z"], + ["33", "c", "y"], + ["33", "c", "x"], + ["33", "c", "w"], + ["44", "d", "z"], + ["44", "d", "y"], + ["44", "d", "x"], + ["44", "d", "w"] + ] ); - Ok(()) + + // Two partitions (from UNION) on the left + let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT t1_id, t1_name FROM t1) t1 CROSS JOIN t2"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4 * 2, actual.len()); + + // Two partitions (from UNION) on the right + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT t2_name FROM t2 UNION ALL SELECT t2_name FROM t2)"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4 * 2, actual.len()); } fn create_join_context(