From da21e9522af8ca5bfb2d2da276cf3e2eceb9eff5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 07:50:18 +0200 Subject: [PATCH 01/14] Cross join implementation --- datafusion/README.md | 4 +- datafusion/src/logical_plan/builder.rs | 14 + datafusion/src/logical_plan/plan.rs | 27 +- datafusion/src/optimizer/constant_folding.rs | 3 +- .../src/optimizer/hash_build_probe_order.rs | 27 ++ .../src/optimizer/projection_push_down.rs | 1 + datafusion/src/optimizer/utils.rs | 5 + datafusion/src/physical_plan/cross_join.rs | 283 ++++++++++++++++++ datafusion/src/physical_plan/hash_utils.rs | 5 - datafusion/src/physical_plan/mod.rs | 1 + datafusion/src/physical_plan/planner.rs | 9 +- datafusion/src/sql/planner.rs | 26 +- datafusion/tests/sql.rs | 40 ++- 13 files changed, 423 insertions(+), 22 deletions(-) create mode 100644 datafusion/src/physical_plan/cross_join.rs diff --git a/datafusion/README.md b/datafusion/README.md index 9e6b7a2a78b5..ff0b26d7bf03 100644 --- a/datafusion/README.md +++ b/datafusion/README.md @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the - [ ] MINUS - [x] Joins - [x] INNER JOIN - - [ ] CROSS JOIN + - [x] LEFT JOIN + - [x] RIGHT JOIN + - [x] CROSS JOIN - [ ] OUTER JOIN - [ ] Window diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index fed82fd23b81..68798d77a566 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -270,6 +270,20 @@ impl LogicalPlanBuilder { })) } } + /// Apply a cross join + pub fn cross_join(&self, right: &LogicalPlan) -> Result { + let left_fields = self.plan.schema().fields().iter(); + let right_fields = right.schema().fields(); + let fields = left_fields.chain(right_fields).cloned().collect(); + + let schema = DFSchema::new(fields)?; + + Ok(Self::from(&LogicalPlan::CrossJoin { + left: Arc::new(self.plan.clone()), + right: Arc::new(right.clone()), + schema: DFSchemaRef::new(schema), + })) + } /// Repartition pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result { diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index d1b9b827a5a3..606ef1e22275 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -113,6 +113,15 @@ pub enum LogicalPlan { /// The output schema, containing fields from the left and right inputs schema: DFSchemaRef, }, + /// Apply Cross Join to two logical plans + CrossJoin { + /// Left input + left: Arc, + /// Right input + right: Arc, + /// The output schema, containing fields from the left and right inputs + schema: DFSchemaRef, + }, /// Repartition the plan based on a partitioning scheme. Repartition { /// The incoming logical plan @@ -203,6 +212,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { schema, .. } => &schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => &schema, + LogicalPlan::CrossJoin { schema, .. } => &schema, LogicalPlan::Repartition { input, .. } => input.schema(), LogicalPlan::Limit { input, .. } => input.schema(), LogicalPlan::CreateExternalTable { schema, .. } => &schema, @@ -229,6 +239,11 @@ impl LogicalPlan { right, schema, .. + } + | LogicalPlan::CrossJoin { + left, + right, + schema, } => { let mut schemas = left.all_schemas(); schemas.extend(right.all_schemas()); @@ -290,8 +305,9 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Limit { .. } | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => vec![], - LogicalPlan::Union { .. } => { + | LogicalPlan::CrossJoin { .. } + | LogicalPlan::Explain { .. } + | LogicalPlan::Union { .. } => { vec![] } } @@ -307,6 +323,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], + LogicalPlan::CrossJoin { left, right, .. } => vec![left, right], LogicalPlan::Limit { input, .. } => vec![input], LogicalPlan::Extension { node } => node.inputs(), LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), @@ -396,7 +413,8 @@ impl LogicalPlan { LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, - LogicalPlan::Join { left, right, .. } => { + LogicalPlan::Join { left, right, .. } + | LogicalPlan::CrossJoin { left, right, .. } => { left.accept(visitor)? && right.accept(visitor)? } LogicalPlan::Union { inputs, .. } => { @@ -669,6 +687,9 @@ impl LogicalPlan { keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect(); write!(f, "Join: {}", join_expr.join(", ")) } + LogicalPlan::CrossJoin { .. } => { + write!(f, "CrossJoin:") + } LogicalPlan::Repartition { partitioning_scheme, .. diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 2fa03eb5c709..32db6ebd5722 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding { | LogicalPlan::Explain { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } => { + | LogicalPlan::Join { .. } + | LogicalPlan::CrossJoin { .. } => { // apply the optimization to all inputs of the plan let inputs = plan.inputs(); let new_inputs = inputs diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index f44050f0b72e..086e2f03196b 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -67,6 +67,10 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { // we cannot predict the cardinality of the join output None } + LogicalPlan::CrossJoin { left, right, .. } => { + // number of rows is equal to num_left * num_right + get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y)) + } LogicalPlan::Repartition { .. } => { // we cannot predict how rows will be repartitioned None @@ -138,6 +142,29 @@ impl OptimizerRule for HashBuildProbeOrder { }) } } + LogicalPlan::CrossJoin { + left, + right, + schema, + } => { + let left = self.optimize(left)?; + let right = self.optimize(right)?; + if should_swap_join_order(&left, &right) { + // Swap left and right + Ok(LogicalPlan::CrossJoin { + left: Arc::new(right), + right: Arc::new(left), + schema: schema.clone(), + }) + } else { + // Keep join as is + Ok(LogicalPlan::CrossJoin { + left: Arc::new(left), + right: Arc::new(right), + schema: schema.clone(), + }) + } + } // Rest: recurse into plan, apply optimization where possible LogicalPlan::Projection { .. } | LogicalPlan::Aggregate { .. } diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 6b1cdfe18ca7..7243fa52d9b3 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -270,6 +270,7 @@ fn optimize_plan( | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Union { .. } + | LogicalPlan::CrossJoin { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); // collect all required columns by this plan diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index fe1d02381917..0ec3fa7c02a1 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -208,6 +208,11 @@ pub fn from_plan( on: on.clone(), schema: schema.clone(), }), + LogicalPlan::CrossJoin { schema, .. } => Ok(LogicalPlan::CrossJoin { + left: Arc::new(inputs[0].clone()), + right: Arc::new(inputs[1].clone()), + schema: schema.clone(), + }), LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit { n: *n, input: Arc::new(inputs[0].clone()), diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs new file mode 100644 index 000000000000..d39e03f23cd2 --- /dev/null +++ b/datafusion/src/physical_plan/cross_join.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use futures::StreamExt; +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use futures::{Stream, TryStreamExt}; + +use futures::lock::Mutex; + +use super::{hash_utils::check_join_is_valid, merge::MergeExec}; +use crate::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; +use async_trait::async_trait; +use std::time::Instant; + +use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; +use log::debug; + +/// Data of the left side +type JoinLeftData = Vec; + +/// executes partitions in parallel and combines them into a set of +/// partitions by combining all values from the left with all values on the right +#[derive(Debug)] +pub struct CrossJoinExec { + /// left (build) side which gets loaded in memory + left: Arc, + /// right (probe) side which are combined with left side + right: Arc, + /// The schema once the join is applied + schema: SchemaRef, + /// Build-side data + build_side: Arc>>, +} + +impl CrossJoinExec { + /// Tries to create a new [CrossJoinExec]. + /// # Error + /// This function errors when it is not possible to join the left and right sides on keys `on`. + pub fn try_new( + left: Arc, + right: Arc, + ) -> Result { + let left_schema = left.schema(); + let right_schema = right.schema(); + check_join_is_valid(&left_schema, &right_schema, &[])?; + + let left_schema = left.schema(); + let left_fields = left_schema.fields().iter(); + let right_schema = right.schema(); + + let right_fields = right_schema.fields().iter(); + + // left then right + let all_columns = left_fields.chain(right_fields).cloned().collect(); + + let schema = Arc::new(Schema::new(all_columns)); + + Ok(CrossJoinExec { + left, + right, + schema, + build_side: Arc::new(Mutex::new(None)), + }) + } + + /// left (build) side which gets loaded in memory + pub fn left(&self) -> &Arc { + &self.left + } + + /// right side which gets combined with left side + pub fn right(&self) -> &Arc { + &self.right + } +} + +#[async_trait] +impl ExecutionPlan for CrossJoinExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 2 => Ok(Arc::new(CrossJoinExec::try_new( + children[0].clone(), + children[1].clone(), + )?)), + _ => Err(DataFusionError::Internal( + "CrossJoinExec wrong number of children".to_string(), + )), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.right.output_partitioning() + } + + async fn execute(&self, partition: usize) -> Result { + // we only want to compute the build side once + let left_data = { + let mut build_side = self.build_side.lock().await; + + match build_side.as_ref() { + Some(stream) => stream.clone(), + None => { + let start = Instant::now(); + + // merge all left parts into a single stream + let merge = MergeExec::new(self.left.clone()); + let stream = merge.execute(0).await?; + + // Load all batches and count the rows + let (batches, num_rows) = stream + .try_fold((Vec::new(), 0usize), |mut acc, batch| async { + acc.1 += batch.num_rows(); + acc.0.push(batch); + Ok(acc) + }) + .await?; + + *build_side = Some(batches.clone()); + + debug!( + "Built build-side of cartesian join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + + batches + } + } + }; + + let stream = self.right.execute(partition).await?; + + Ok(Box::pin(CrossJoinStream { + schema: self.schema.clone(), + left_data, + right: stream, + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + join_time: 0, + })) + } +} + +/// A stream that issues [RecordBatch]es as they arrive from the right of the join. +struct CrossJoinStream { + /// Input schema + schema: Arc, + /// data from the left side + left_data: JoinLeftData, + /// right + right: SendableRecordBatchStream, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for joining probe-side batches to the build-side batches + join_time: usize, +} + +impl RecordBatchStream for CrossJoinStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} +fn build_batch( + batch: &RecordBatch, + left_data: &[RecordBatch], + schema: &Schema, +) -> ArrowResult { + let mut batches = Vec::new(); + let mut num_rows = 0; + for left in left_data.iter() { + for i in 0..left.num_rows() { + // for each value on the left, repeat the value of the right + let arrays = left + .columns() + .iter() + .map(|arr| { + let scalar = ScalarValue::try_from_array(arr, i)?; + Ok(scalar.to_array_of_size(batch.num_rows())) + }) + .collect::>>() + .map_err(|x| x.into_arrow_external_error())?; + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + arrays + .iter() + .chain(batch.columns().iter()) + .cloned() + .collect(), + )?; + + batches.push(batch); + num_rows += left.num_rows(); + } + } + concat_batches(&Arc::new(schema.clone()), &batches, num_rows) +} + +impl Stream for CrossJoinStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.right + .poll_next_unpin(cx) + .map(|maybe_batch| match maybe_batch { + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch(&batch, &self.left_data, &self.schema); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + Some(result) + } + other => { + debug!( + "Processed {} probe-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.join_time + ); + other + } + }) + } +} diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index b26ff9bb5fc2..a38cc092123d 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -52,11 +52,6 @@ fn check_join_set_is_valid( right: &HashSet, on: &JoinOn, ) -> Result<()> { - if on.is_empty() { - return Err(DataFusionError::Plan( - "The 'on' clause of a join cannot be empty".to_string(), - )); - } let on_left = &on.iter().map(|on| on.0.to_string()).collect::>(); let left_missing = on_left.difference(left).collect::>(); diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 5036dcb921bb..5b43bcb6c9bc 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -333,6 +333,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; +pub mod cross_join; pub mod coalesce_batches; pub mod common; #[cfg(feature = "crypto_expressions")] diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index f9279ae48f0c..ae6ad5075d87 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use super::{ - aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, + aggregates, cross_join::CrossJoinExec, empty::EmptyExec, expressions::binary, + functions, hash_join::PartitionMode, udaf, union::UnionExec, }; use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; @@ -328,6 +328,11 @@ impl DefaultPhysicalPlanner { )?)) } } + LogicalPlan::CrossJoin { left, right, .. } => { + let left = self.create_initial_plan(left, ctx_state)?; + let right = self.create_initial_plan(right, ctx_state)?; + Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) + } LogicalPlan::EmptyRelation { produce_one_row, schema, diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index f3cba232a23a..8a2f1cb75207 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -355,12 +355,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { JoinOperator::Inner(constraint) => { self.parse_join(left, &right, constraint, JoinType::Inner) } + JoinOperator::CrossJoin => self.parse_cross_join(left, &right), other => Err(DataFusionError::NotImplemented(format!( "Unsupported JOIN operator {:?}", other ))), } } + fn parse_cross_join( + &self, + left: &LogicalPlan, + right: &LogicalPlan, + ) -> Result { + LogicalPlanBuilder::from(&left) + .cross_join(&right)? + .build() + } fn parse_join( &self, @@ -489,9 +499,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } if join_keys.is_empty() { - return Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )); + left = LogicalPlanBuilder::from(&left) + .cross_join(right)? + .build()?; } else { let left_keys: Vec<_> = join_keys.iter().map(|(l, _)| *l).collect(); @@ -517,9 +527,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if plans.len() == 1 { Ok(plans[0].clone()) } else { - Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )) + let mut left = plans[0].clone(); + for right in plans.iter().skip(1) { + left = LogicalPlanBuilder::from(&left) + .cross_join(right)? + .build()?; + } + Ok(left) } } }; diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index f4d4e65f3a4e..10ba8cd2bfd3 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1290,13 +1290,45 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { #[tokio::test] async fn cartesian_join() -> Result<()> { - let ctx = create_join_context("t1_id", "t2_id")?; + let mut ctx = create_join_context("t1_id", "t2_id")?; + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; - let maybe_plan = ctx.create_logical_plan(&sql); + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + assert_eq!( - "This feature is not implemented: Cartesian joins are not supported", - &format!("{}", maybe_plan.err().unwrap()) + actual, + [ + ["11", "a", "z"], + ["11", "a", "y"], + ["11", "a", "x"], + ["11", "a", "w"], + ["22", "b", "z"], + ["22", "b", "y"], + ["22", "b", "x"], + ["22", "b", "w"], + ["33", "c", "z"], + ["33", "c", "y"], + ["33", "c", "x"], + ["33", "c", "w"], + ["44", "d", "z"], + ["44", "d", "y"], + ["44", "d", "x"], + ["44", "d", "w"] + ] ); + Ok(()) } From ba0176181ad52923b27be5fc984c98ad05041304 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 07:58:02 +0200 Subject: [PATCH 02/14] Add to ballista, debug line --- ballista/rust/core/src/serde/logical_plan/to_proto.rs | 1 + datafusion/src/physical_plan/cross_join.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a181f98b6eb6..222b76739feb 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -940,6 +940,7 @@ impl TryInto for &LogicalPlan { } LogicalPlan::Extension { .. } => unimplemented!(), LogicalPlan::Union { .. } => unimplemented!(), + LogicalPlan::CrossJoin { .. } => unimplemented!(), } } } diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index d39e03f23cd2..a2889124bcf4 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -158,7 +158,7 @@ impl ExecutionPlan for CrossJoinExec { *build_side = Some(batches.clone()); debug!( - "Built build-side of cartesian join containing {} rows in {} ms", + "Built build-side of cross join containing {} rows in {} ms", num_rows, start.elapsed().as_millis() ); From 4f0ab2cb1f4e921ab2f14272875f7de97a63e97c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 08:00:39 +0200 Subject: [PATCH 03/14] Add to tpch test, format --- benchmarks/src/bin/tpch.rs | 5 +++++ datafusion/src/physical_plan/mod.rs | 2 +- datafusion/src/sql/planner.rs | 14 +++++--------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 328a68dd6a6f..b203ceb3f741 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1374,6 +1374,11 @@ mod tests { run_query(6).await } + #[tokio::test] + async fn run_q9() -> Result<()> { + run_query(9).await + } + #[tokio::test] async fn run_q10() -> Result<()> { run_query(10).await diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 5b43bcb6c9bc..3eb59e911675 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -333,9 +333,9 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; -pub mod cross_join; pub mod coalesce_batches; pub mod common; +pub mod cross_join; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod csv; diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 8a2f1cb75207..a40d0becdcb4 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -367,9 +367,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { left: &LogicalPlan, right: &LogicalPlan, ) -> Result { - LogicalPlanBuilder::from(&left) - .cross_join(&right)? - .build() + LogicalPlanBuilder::from(&left).cross_join(&right)?.build() } fn parse_join( @@ -499,9 +497,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } if join_keys.is_empty() { - left = LogicalPlanBuilder::from(&left) - .cross_join(right)? - .build()?; + left = + LogicalPlanBuilder::from(&left).cross_join(right)?.build()?; } else { let left_keys: Vec<_> = join_keys.iter().map(|(l, _)| *l).collect(); @@ -529,9 +526,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } else { let mut left = plans[0].clone(); for right in plans.iter().skip(1) { - left = LogicalPlanBuilder::from(&left) - .cross_join(right)? - .build()?; + left = + LogicalPlanBuilder::from(&left).cross_join(right)?.build()?; } Ok(left) } From 54564857dfba2e8c9ae11b490170a6b7529bc6f7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 08:29:33 +0200 Subject: [PATCH 04/14] Simplify a bit --- datafusion/src/physical_plan/cross_join.rs | 56 +++++++++++----------- datafusion/tests/sql.rs | 2 +- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index a2889124bcf4..22d002bc4a58 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -41,7 +41,7 @@ use crate::physical_plan::coalesce_batches::concat_batches; use log::debug; /// Data of the left side -type JoinLeftData = Vec; +type JoinLeftData = RecordBatch; /// executes partitions in parallel and combines them into a set of /// partitions by combining all values from the left with all values on the right @@ -154,8 +154,9 @@ impl ExecutionPlan for CrossJoinExec { Ok(acc) }) .await?; - - *build_side = Some(batches.clone()); + let merged_batch = + concat_batches(&self.left.schema(), &batches, num_rows)?; + *build_side = Some(merged_batch.clone()); debug!( "Built build-side of cross join containing {} rows in {} ms", @@ -163,7 +164,7 @@ impl ExecutionPlan for CrossJoinExec { start.elapsed().as_millis() ); - batches + merged_batch } } }; @@ -210,36 +211,33 @@ impl RecordBatchStream for CrossJoinStream { } fn build_batch( batch: &RecordBatch, - left_data: &[RecordBatch], + left_data: &RecordBatch, schema: &Schema, ) -> ArrowResult { let mut batches = Vec::new(); let mut num_rows = 0; - for left in left_data.iter() { - for i in 0..left.num_rows() { - // for each value on the left, repeat the value of the right - let arrays = left - .columns() + for i in 0..left_data.num_rows() { + // for each value on the left, repeat the value of the right + let arrays = left_data + .columns() + .iter() + .map(|arr| { + let scalar = ScalarValue::try_from_array(arr, i)?; + Ok(scalar.to_array_of_size(batch.num_rows())) + }) + .collect::>>() + .map_err(|x| x.into_arrow_external_error())?; + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + arrays .iter() - .map(|arr| { - let scalar = ScalarValue::try_from_array(arr, i)?; - Ok(scalar.to_array_of_size(batch.num_rows())) - }) - .collect::>>() - .map_err(|x| x.into_arrow_external_error())?; - - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - arrays - .iter() - .chain(batch.columns().iter()) - .cloned() - .collect(), - )?; - - batches.push(batch); - num_rows += left.num_rows(); - } + .chain(batch.columns().iter()) + .cloned() + .collect(), + )?; + num_rows += batch.num_rows(); + batches.push(batch); } concat_batches(&Arc::new(schema.clone()), &batches, num_rows) } diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 10ba8cd2bfd3..58a80d1c4bd4 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1289,7 +1289,7 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { } #[tokio::test] -async fn cartesian_join() -> Result<()> { +async fn cross_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; From 2dc88dff508c868f6f6b00fab0726e4e8d51c74e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 08:57:12 +0200 Subject: [PATCH 05/14] Row-by-row processing for the left side to keep memory down --- datafusion/src/physical_plan/cross_join.rs | 84 ++++++++++++++-------- 1 file changed, 54 insertions(+), 30 deletions(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 22d002bc4a58..7ef7f1459045 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -18,16 +18,14 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use futures::StreamExt; -use std::{any::Any, sync::Arc}; +use futures::{lock::Mutex, StreamExt}; +use std::{any::Any, sync::Arc, task::Poll}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use futures::{Stream, TryStreamExt}; -use futures::lock::Mutex; - use super::{hash_utils::check_join_is_valid, merge::MergeExec}; use crate::{ error::{DataFusionError, Result}, @@ -175,6 +173,8 @@ impl ExecutionPlan for CrossJoinExec { schema: self.schema.clone(), left_data, right: stream, + right_batch: Arc::new(std::sync::Mutex::new(None)), + left_index: 0, num_input_batches: 0, num_input_rows: 0, num_output_batches: 0, @@ -192,6 +192,10 @@ struct CrossJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// Current value on the left + left_index: usize, + /// Current batch being processed from the right side + right_batch: Arc>>, /// number of input batches num_input_batches: usize, /// number of input rows @@ -210,38 +214,33 @@ impl RecordBatchStream for CrossJoinStream { } } fn build_batch( + left_index: usize, batch: &RecordBatch, left_data: &RecordBatch, schema: &Schema, ) -> ArrowResult { - let mut batches = Vec::new(); - let mut num_rows = 0; - for i in 0..left_data.num_rows() { - // for each value on the left, repeat the value of the right - let arrays = left_data - .columns() + // Repeat value on the left n times + let arrays = left_data + .columns() + .iter() + .map(|arr| { + let scalar = ScalarValue::try_from_array(arr, left_index)?; + Ok(scalar.to_array_of_size(batch.num_rows())) + }) + .collect::>>() + .map_err(|x| x.into_arrow_external_error())?; + + RecordBatch::try_new( + Arc::new(schema.clone()), + arrays .iter() - .map(|arr| { - let scalar = ScalarValue::try_from_array(arr, i)?; - Ok(scalar.to_array_of_size(batch.num_rows())) - }) - .collect::>>() - .map_err(|x| x.into_arrow_external_error())?; - - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - arrays - .iter() - .chain(batch.columns().iter()) - .cloned() - .collect(), - )?; - num_rows += batch.num_rows(); - batches.push(batch); - } - concat_batches(&Arc::new(schema.clone()), &batches, num_rows) + .chain(batch.columns().iter()) + .cloned() + .collect(), + ) } +#[async_trait] impl Stream for CrossJoinStream { type Item = ArrowResult; @@ -249,12 +248,32 @@ impl Stream for CrossJoinStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + if self.left_index > 0 && self.left_index < self.left_data.num_rows() { + let start = Instant::now(); + let right_batch = self.right_batch.lock().unwrap().clone().unwrap(); + let result = + build_batch(self.left_index, &right_batch, &self.left_data, &self.schema); + self.num_input_rows += right_batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + self.left_index += 1; + return Poll::Ready(Some(result)); + } + self.left_index = 0; self.right .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { let start = Instant::now(); - let result = build_batch(&batch, &self.left_data, &self.schema); + let result = build_batch( + self.left_index, + &batch, + &self.left_data, + &self.schema, + ); self.num_input_batches += 1; self.num_input_rows += batch.num_rows(); if let Ok(ref batch) = result { @@ -262,6 +281,11 @@ impl Stream for CrossJoinStream { self.num_output_batches += 1; self.num_output_rows += batch.num_rows(); } + self.left_index = 1; + + let mut right_batch = self.right_batch.lock().unwrap(); + *right_batch = Some(batch.clone()); + Some(result) } other => { From 129a00ba1db9e1fee2d4d8ddcf4bf1361e2c61cf Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 09:53:04 +0200 Subject: [PATCH 06/14] Fix --- datafusion/src/optimizer/filter_push_down.rs | 2 +- datafusion/src/physical_plan/cross_join.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index ec260a41dc57..5383ae0421ee 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -314,7 +314,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { .collect::>(); issue_filters(state, used_columns, plan) } - LogicalPlan::Join { left, right, .. } => { + LogicalPlan::Join { left, right, .. } | LogicalPlan::CrossJoin {left, right, ..} => { let (pushable_to_left, pushable_to_right, keep) = get_join_predicates(&state, &left.schema(), &right.schema()); diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 7ef7f1459045..162276862ec0 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -21,9 +21,11 @@ use futures::{lock::Mutex, StreamExt}; use std::{any::Any, sync::Arc, task::Poll}; +use crate::physical_plan::memory::MemoryStream; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; + use futures::{Stream, TryStreamExt}; use super::{hash_utils::check_join_is_valid, merge::MergeExec}; @@ -169,6 +171,14 @@ impl ExecutionPlan for CrossJoinExec { let stream = self.right.execute(partition).await?; + if left_data.num_rows() == 0 { + return Ok(Box::pin(MemoryStream::try_new( + vec![], + self.schema.clone(), + None, + )?)) + } + Ok(Box::pin(CrossJoinStream { schema: self.schema.clone(), left_data, From c07e8698a911ff7a7f25b1b9a7e2343601ddb280 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 09:56:13 +0200 Subject: [PATCH 07/14] Fmt --- datafusion/src/optimizer/filter_push_down.rs | 3 ++- datafusion/src/physical_plan/cross_join.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 5383ae0421ee..4622e9fc62dc 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -314,7 +314,8 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { .collect::>(); issue_filters(state, used_columns, plan) } - LogicalPlan::Join { left, right, .. } | LogicalPlan::CrossJoin {left, right, ..} => { + LogicalPlan::Join { left, right, .. } + | LogicalPlan::CrossJoin { left, right, .. } => { let (pushable_to_left, pushable_to_right, keep) = get_join_predicates(&state, &left.schema(), &right.schema()); diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 162276862ec0..bb7f38789e51 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -176,7 +176,7 @@ impl ExecutionPlan for CrossJoinExec { vec![], self.schema.clone(), None, - )?)) + )?)); } Ok(Box::pin(CrossJoinStream { From f6b762f41b4e8e60a3c2ada2e287803435cac6d1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 10:15:04 +0200 Subject: [PATCH 08/14] Clippy --- datafusion/src/physical_plan/cross_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index bb7f38789e51..500e8e8c9dca 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -294,7 +294,7 @@ impl Stream for CrossJoinStream { self.left_index = 1; let mut right_batch = self.right_batch.lock().unwrap(); - *right_batch = Some(batch.clone()); + *right_batch = Some(batch); Some(result) } From a6cef8f005ba1fd06e488238d551b067087872c8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 18:13:05 +0200 Subject: [PATCH 09/14] Fix doc, don't include as much debug info in memoryexec debug --- datafusion/src/physical_plan/cross_join.rs | 4 ++-- datafusion/src/physical_plan/memory.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 500e8e8c9dca..27f7ee7f80e5 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Defines the join plan for executing partitions in parallel and then joining the results -//! into a set of partitions. +//! Defines the cross join plan for loading the left side of the cross join +//! and producing batches in parallel for the right partitions use futures::{lock::Mutex, StreamExt}; use std::{any::Any, sync::Arc, task::Poll}; diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index bef9bcc62dff..cbd323e18dc4 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -17,6 +17,7 @@ //! Execution plan for reading in-memory batches of data +use core::fmt; use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; @@ -31,7 +32,6 @@ use async_trait::async_trait; use futures::Stream; /// Execution plan for reading in-memory batches of data -#[derive(Debug)] pub struct MemoryExec { /// The partitions to query partitions: Vec>, @@ -41,6 +41,15 @@ pub struct MemoryExec { projection: Option>, } +impl fmt::Debug for MemoryExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "partitions: [...]")?; + write!(f, "schema: {:?}", self.schema)?; + write!(f, "projection: {:?}", self.projection) + + } +} + #[async_trait] impl ExecutionPlan for MemoryExec { /// Return a reference to Any that can be used for downcasting From 01eec41e385a094fb9a3e263cccd3368fec3b818 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 18:16:11 +0200 Subject: [PATCH 10/14] Use join --- datafusion/src/logical_plan/builder.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 68798d77a566..b6017b743ed7 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -272,11 +272,7 @@ impl LogicalPlanBuilder { } /// Apply a cross join pub fn cross_join(&self, right: &LogicalPlan) -> Result { - let left_fields = self.plan.schema().fields().iter(); - let right_fields = right.schema().fields(); - let fields = left_fields.chain(right_fields).cloned().collect(); - - let schema = DFSchema::new(fields)?; + let schema = self.plan.schema().join(right.schema())?; Ok(Self::from(&LogicalPlan::CrossJoin { left: Arc::new(self.plan.clone()), From 9a692d60de71a6e67bc41f0fce8dad05b1db4dc5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 20:48:15 +0200 Subject: [PATCH 11/14] Fix doc --- datafusion/src/physical_plan/cross_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 27f7ee7f80e5..8287f675892c 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -60,7 +60,7 @@ pub struct CrossJoinExec { impl CrossJoinExec { /// Tries to create a new [CrossJoinExec]. /// # Error - /// This function errors when it is not possible to join the left and right sides on keys `on`. + /// This function errors when left and right schema's can't be combined pub fn try_new( left: Arc, right: Arc, From 178a349f4dc476c4ad3b77742e2d462dd66e92cb Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 19 Apr 2021 21:19:26 +0200 Subject: [PATCH 12/14] Add test cases with partitions --- datafusion/src/physical_plan/memory.rs | 1 - datafusion/tests/sql.rs | 16 +++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index cbd323e18dc4..9022077559ac 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -46,7 +46,6 @@ impl fmt::Debug for MemoryExec { write!(f, "partitions: [...]")?; write!(f, "schema: {:?}", self.schema)?; write!(f, "projection: {:?}", self.projection) - } } diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 58a80d1c4bd4..70baffc700ba 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1289,8 +1289,8 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { } #[tokio::test] -async fn cross_join() -> Result<()> { - let mut ctx = create_join_context("t1_id", "t2_id")?; +async fn cross_join() { + let mut ctx = create_join_context("t1_id", "t2_id").unwrap(); let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; @@ -1329,7 +1329,17 @@ async fn cross_join() -> Result<()> { ] ); - Ok(()) + // Two partitions (from UNION) on the left + let sql = "SELECT * FROM (SELECT t1_id, t1_name FROM t1 UNION ALL SELECT t1_id, t1_name FROM t1) t1 CROSS JOIN t2"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4 * 2, actual.len()); + + // Two partitions (from UNION) on the right + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN (SELECT t2_name FROM t2 UNION ALL SELECT t2_name FROM t2)"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4 * 2, actual.len()); } fn create_join_context( From 844cb4b0fda1a0575ec2bda110de3f4e47a5b8d5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Apr 2021 20:06:33 +0200 Subject: [PATCH 13/14] Make clear that mutex is locked for very short amount of time --- datafusion/src/physical_plan/cross_join.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 8287f675892c..a55f907fd19a 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -260,7 +260,10 @@ impl Stream for CrossJoinStream { ) -> std::task::Poll> { if self.left_index > 0 && self.left_index < self.left_data.num_rows() { let start = Instant::now(); - let right_batch = self.right_batch.lock().unwrap().clone().unwrap(); + let right_batch = { + let right_batch = self.right_batch.lock(); + right_batch.clone().unwrap() + }; let result = build_batch(self.left_index, &right_batch, &self.left_data, &self.schema); self.num_input_rows += right_batch.num_rows(); From fd115e073c483f7d6e5b6518ca4eb0745971c577 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Tue, 20 Apr 2021 20:17:39 +0200 Subject: [PATCH 14/14] Unwrap the lock --- datafusion/src/physical_plan/cross_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index a55f907fd19a..4372352d6ecf 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -261,7 +261,7 @@ impl Stream for CrossJoinStream { if self.left_index > 0 && self.left_index < self.left_data.num_rows() { let start = Instant::now(); let right_batch = { - let right_batch = self.right_batch.lock(); + let right_batch = self.right_batch.lock().unwrap(); right_batch.clone().unwrap() }; let result =