From ed54a7967c20ac03438a5de0cbf4d602062c3a8a Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Tue, 28 Mar 2023 20:54:19 +0200 Subject: [PATCH 1/8] add assert on hash children partition count --- .../core/src/physical_plan/joins/hash_join.rs | 141 +++++++++--------- .../physical_plan/joins/sort_merge_join.rs | 11 +- datafusion/optimizer/src/push_down_limit.rs | 21 ++- 3 files changed, 97 insertions(+), 76 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 39acffa203ac..36eb1573a005 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -18,8 +18,12 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use ahash::RandomState; +use std::{any::Any, usize, vec}; +use std::fmt; +use std::sync::Arc; +use std::task::Poll; +use ahash::RandomState; use arrow::{ array::{ ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, @@ -34,66 +38,56 @@ use arrow::{ }, util::bit_util, }; -use smallvec::{smallvec, SmallVec}; -use std::sync::Arc; -use std::{any::Any, usize, vec}; - -use futures::{ready, Stream, StreamExt, TryStreamExt}; - -use arrow::array::Array; -use arrow::datatypes::{ArrowNativeType, DataType}; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; - use arrow::array::{ Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; - -use datafusion_common::cast::{as_dictionary_array, as_string_array}; - +use arrow::array::Array; +use arrow::datatypes::{ArrowNativeType, DataType}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use futures::{ready, Stream, StreamExt, TryStreamExt}; use hashbrown::raw::RawTable; +use smallvec::{smallvec, SmallVec}; -use crate::physical_plan::{ - coalesce_batches::concat_batches, - coalesce_partitions::CoalescePartitionsExec, - expressions::Column, - expressions::PhysicalSortExpr, - hash_utils::create_hashes, - joins::utils::{ - adjust_right_output_partitioning, build_join_schema, check_join_is_valid, - combine_join_equivalence_properties, estimate_join_statistics, - partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, - JoinFilter, JoinOn, - }, - metrics::{ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, - PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, -}; - -use crate::error::{DataFusionError, Result}; -use crate::logical_expr::JoinType; +use datafusion_common::cast::{as_dictionary_array, as_string_array}; use crate::arrow::array::BooleanBufferBuilder; use crate::arrow::datatypes::TimeUnit; +use crate::error::{DataFusionError, Result}; use crate::execution::{ context::TaskContext, memory_pool::{ MemoryConsumer, SharedMemoryReservation, SharedOptionalMemoryReservation, TryGrow, }, }; - -use super::{ - utils::{OnceAsync, OnceFut}, - PartitionMode, +use crate::logical_expr::JoinType; +use crate::physical_plan::{ + coalesce_batches::concat_batches, + coalesce_partitions::CoalescePartitionsExec, + DisplayFormatType, + Distribution, + EquivalenceProperties, + ExecutionPlan, + expressions::Column, + expressions::PhysicalSortExpr, hash_utils::create_hashes, joins::utils::{ + adjust_right_output_partitioning, build_join_schema, BuildProbeJoinMetrics, + check_join_is_valid, ColumnIndex, + combine_join_equivalence_properties, estimate_join_statistics, JoinFilter, + JoinOn, partitioned_join_output_partitioning, + }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, Partitioning, + PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::physical_plan::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, - get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide, + get_final_indices_from_bit_map, JoinSide, need_produce_result_in_final, +}; + +use super::{ + PartitionMode, + utils::{OnceAsync, OnceFut}, }; -use std::fmt; -use std::task::Poll; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. // @@ -281,7 +275,7 @@ impl ExecutionPlan for HashJoinExec { // JoinType::Full, JoinType::RightAnti types. let breaking = left || (right - && matches!( + && matches!( self.join_type, JoinType::Left | JoinType::Full @@ -377,6 +371,11 @@ impl ExecutionPlan for HashJoinExec { ) -> Result { let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); + if self.mode == PartitionMode::Partitioned && self.left.output_partitioning() != self.right.output_partitioning() { + return Err(DataFusionError::Plan(format!( + "Invalid HashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", + ))); + } let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); @@ -409,15 +408,17 @@ impl ExecutionPlan for HashJoinExec { Arc::new(self.reservation.clone()), ) }), - PartitionMode::Partitioned => OnceFut::new(collect_left_input( - Some(partition), - self.random_state.clone(), - self.left.clone(), - on_left.clone(), - context.clone(), - join_metrics.clone(), - Arc::new(reservation.clone()), - )), + PartitionMode::Partitioned => { + OnceFut::new(collect_left_input( + Some(partition), + self.random_state.clone(), + self.left.clone(), + on_left.clone(), + context.clone(), + join_metrics.clone(), + Arc::new(reservation.clone()), + )) + } PartitionMode::Auto => { return Err(DataFusionError::Plan(format!( "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()", @@ -1287,10 +1288,14 @@ impl Stream for HashJoinStream { mod tests { use std::sync::Arc; - use super::*; - use crate::execution::context::SessionConfig; - use crate::physical_expr::expressions::BinaryExpr; - use crate::prelude::SessionContext; + use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; + use arrow::datatypes::{DataType, Field, Schema}; + use smallvec::smallvec; + + use datafusion_common::ScalarValue; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::Literal; + use crate::{ assert_batches_sorted_eq, common::assert_contains, @@ -1303,16 +1308,14 @@ mod tests { memory::MemoryExec, repartition::RepartitionExec, }, - test::exec::MockExec, test::{build_table_i32, columns}, + test::exec::MockExec, }; - use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_expr::Operator; + use crate::execution::context::SessionConfig; + use crate::physical_expr::expressions::BinaryExpr; + use crate::prelude::SessionContext; - use datafusion_common::ScalarValue; - use datafusion_physical_expr::expressions::Literal; - use smallvec::smallvec; + use super::*; fn build_table( a: (&str, &Vec), @@ -1459,7 +1462,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); @@ -1504,7 +1507,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); @@ -1928,7 +1931,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); let expected = vec![ @@ -1972,7 +1975,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); let expected = vec![ @@ -3005,7 +3008,7 @@ mod tests { &join_type, false, ) - .unwrap(); + .unwrap(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -3090,7 +3093,7 @@ mod tests { left_batch.schema(), None, ) - .unwrap(), + .unwrap(), ); let right_batch = build_table_i32( ("a2", &vec![10, 11]), @@ -3103,7 +3106,7 @@ mod tests { right_batch.schema(), None, ) - .unwrap(), + .unwrap(), ); let on = vec![( Column::new_with_schema("b1", &left_batch.schema())?, diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 94f5c9e5ef60..83d11a6a65ee 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -300,6 +300,12 @@ impl ExecutionPlan for SortMergeJoinExec { partition: usize, context: Arc, ) -> Result { + if self.left.output_partitioning() != self.right.output_partitioning() { + return Err(DataFusionError::Plan(format!( + "Invalid SortMergeExec, partition count mismatch between children executors, consider using RepartitionExec", + ))); + } + let (streamed, buffered, on_streamed, on_buffered) = match self.join_type { JoinType::Inner | JoinType::Left @@ -476,6 +482,7 @@ struct StreamedBatch { // Index of currently scanned batch from buffered data pub buffered_batch_idx: Option, } + impl StreamedBatch { fn new(batch: RecordBatch, on_column: &[Column]) -> Self { let join_arrays = join_arrays(&batch, on_column); @@ -539,6 +546,7 @@ struct BufferedBatch { /// Size estimation used for reserving / releasing memory pub size_estimation: usize, } + impl BufferedBatch { fn new(batch: RecordBatch, range: Range, on_column: &[Column]) -> Self { let join_arrays = join_arrays(&batch, on_column); @@ -1153,6 +1161,7 @@ struct BufferedData { /// current scanning offset used in join_partial() pub scanning_offset: usize, } + impl BufferedData { pub fn head_batch(&self) -> &BufferedBatch { self.batches.front().unwrap() @@ -1740,7 +1749,7 @@ mod tests { vec![ SortOptions { descending: true, - nulls_first: false + nulls_first: false, }; 2 ], diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 6703a1d787a7..bec17fcfab45 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -17,14 +17,16 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) -use crate::optimizer::ApplyOrder; -use crate::{OptimizerConfig, OptimizerRule}; +use std::sync::Arc; + use datafusion_common::Result; use datafusion_expr::{ - logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union}, CrossJoin, + logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union}, }; -use std::sync::Arc; + +use crate::{OptimizerConfig, OptimizerRule}; +use crate::optimizer::ApplyOrder; /// Optimization rule that tries to push down LIMIT. #[derive(Default)] @@ -104,18 +106,22 @@ impl OptimizerRule for PushDownLimit { }; } + let fetch = match limit.fetch { Some(fetch) => fetch, None => return Ok(None), }; + println!("{fetch:?}"); let skip = limit.skip; let child_plan = &*limit.input; + println!("{child_plan:?}"); let plan = match child_plan { LogicalPlan::TableScan(scan) => { let limit = if fetch != 0 { fetch + skip } else { 0 }; let new_fetch = scan.fetch.map(|x| min(x, limit)).or(Some(limit)); if new_fetch == scan.fetch { + println!("here"); None } else { let new_input = LogicalPlan::TableScan(TableScan { @@ -126,6 +132,7 @@ impl OptimizerRule for PushDownLimit { fetch: scan.fetch.map(|x| min(x, limit)).or(Some(limit)), projected_schema: scan.projected_schema.clone(), }); + println!("new input"); Some(plan.with_new_inputs(&[new_input])?) } } @@ -275,14 +282,16 @@ fn push_down_join(join: &Join, limit: usize) -> Option { mod test { use std::vec; - use super::*; - use crate::test::*; use datafusion_expr::{ col, exists, logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan}, max, }; + use crate::test::*; + + use super::*; + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected) } From dd699d3e421fffaf68c86912016b367e94a13c4b Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Wed, 29 Mar 2023 20:20:00 +0200 Subject: [PATCH 2/8] fix fmt --- .../core/src/physical_plan/joins/hash_join.rs | 105 +++++++++--------- datafusion/optimizer/src/push_down_limit.rs | 9 +- 2 files changed, 54 insertions(+), 60 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 36eb1573a005..152720429ac4 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -18,12 +18,16 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. -use std::{any::Any, usize, vec}; -use std::fmt; -use std::sync::Arc; -use std::task::Poll; - use ahash::RandomState; +use arrow::array::Array; +use arrow::array::{ + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, +}; +use arrow::datatypes::{ArrowNativeType, DataType}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use arrow::{ array::{ ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, @@ -38,18 +42,13 @@ use arrow::{ }, util::bit_util, }; -use arrow::array::{ - Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, - UInt8Array, -}; -use arrow::array::Array; -use arrow::datatypes::{ArrowNativeType, DataType}; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; use futures::{ready, Stream, StreamExt, TryStreamExt}; use hashbrown::raw::RawTable; use smallvec::{smallvec, SmallVec}; +use std::fmt; +use std::sync::Arc; +use std::task::Poll; +use std::{any::Any, usize, vec}; use datafusion_common::cast::{as_dictionary_array, as_string_array}; @@ -63,30 +62,30 @@ use crate::execution::{ }, }; use crate::logical_expr::JoinType; +use crate::physical_plan::joins::utils::{ + adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, + get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide, +}; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, - DisplayFormatType, - Distribution, - EquivalenceProperties, - ExecutionPlan, expressions::Column, - expressions::PhysicalSortExpr, hash_utils::create_hashes, joins::utils::{ - adjust_right_output_partitioning, build_join_schema, BuildProbeJoinMetrics, - check_join_is_valid, ColumnIndex, - combine_join_equivalence_properties, estimate_join_statistics, JoinFilter, - JoinOn, partitioned_join_output_partitioning, - }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, Partitioning, + expressions::PhysicalSortExpr, + hash_utils::create_hashes, + joins::utils::{ + adjust_right_output_partitioning, build_join_schema, check_join_is_valid, + combine_join_equivalence_properties, estimate_join_statistics, + partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, + JoinFilter, JoinOn, + }, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::physical_plan::joins::utils::{ - adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, - get_final_indices_from_bit_map, JoinSide, need_produce_result_in_final, -}; use super::{ - PartitionMode, utils::{OnceAsync, OnceFut}, + PartitionMode, }; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. @@ -275,7 +274,7 @@ impl ExecutionPlan for HashJoinExec { // JoinType::Full, JoinType::RightAnti types. let breaking = left || (right - && matches!( + && matches!( self.join_type, JoinType::Left | JoinType::Full @@ -371,7 +370,9 @@ impl ExecutionPlan for HashJoinExec { ) -> Result { let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); - if self.mode == PartitionMode::Partitioned && self.left.output_partitioning() != self.right.output_partitioning() { + if self.mode == PartitionMode::Partitioned + && self.left.output_partitioning() != self.right.output_partitioning() + { return Err(DataFusionError::Plan(format!( "Invalid HashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", ))); @@ -408,17 +409,15 @@ impl ExecutionPlan for HashJoinExec { Arc::new(self.reservation.clone()), ) }), - PartitionMode::Partitioned => { - OnceFut::new(collect_left_input( - Some(partition), - self.random_state.clone(), - self.left.clone(), - on_left.clone(), - context.clone(), - join_metrics.clone(), - Arc::new(reservation.clone()), - )) - } + PartitionMode::Partitioned => OnceFut::new(collect_left_input( + Some(partition), + self.random_state.clone(), + self.left.clone(), + on_left.clone(), + context.clone(), + join_metrics.clone(), + Arc::new(reservation.clone()), + )), PartitionMode::Auto => { return Err(DataFusionError::Plan(format!( "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()", @@ -1296,6 +1295,9 @@ mod tests { use datafusion_expr::Operator; use datafusion_physical_expr::expressions::Literal; + use crate::execution::context::SessionConfig; + use crate::physical_expr::expressions::BinaryExpr; + use crate::prelude::SessionContext; use crate::{ assert_batches_sorted_eq, common::assert_contains, @@ -1308,12 +1310,9 @@ mod tests { memory::MemoryExec, repartition::RepartitionExec, }, - test::{build_table_i32, columns}, test::exec::MockExec, + test::{build_table_i32, columns}, }; - use crate::execution::context::SessionConfig; - use crate::physical_expr::expressions::BinaryExpr; - use crate::prelude::SessionContext; use super::*; @@ -1462,7 +1461,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); @@ -1507,7 +1506,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); @@ -1931,7 +1930,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); let expected = vec![ @@ -1975,7 +1974,7 @@ mod tests { false, task_ctx, ) - .await?; + .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); let expected = vec![ @@ -3008,7 +3007,7 @@ mod tests { &join_type, false, ) - .unwrap(); + .unwrap(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -3093,7 +3092,7 @@ mod tests { left_batch.schema(), None, ) - .unwrap(), + .unwrap(), ); let right_batch = build_table_i32( ("a2", &vec![10, 11]), @@ -3106,7 +3105,7 @@ mod tests { right_batch.schema(), None, ) - .unwrap(), + .unwrap(), ); let on = vec![( Column::new_with_schema("b1", &left_batch.schema())?, diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index bec17fcfab45..25accc966ed1 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -21,12 +21,12 @@ use std::sync::Arc; use datafusion_common::Result; use datafusion_expr::{ - CrossJoin, logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union}, + CrossJoin, }; -use crate::{OptimizerConfig, OptimizerRule}; use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; /// Optimization rule that tries to push down LIMIT. #[derive(Default)] @@ -106,22 +106,18 @@ impl OptimizerRule for PushDownLimit { }; } - let fetch = match limit.fetch { Some(fetch) => fetch, None => return Ok(None), }; - println!("{fetch:?}"); let skip = limit.skip; let child_plan = &*limit.input; - println!("{child_plan:?}"); let plan = match child_plan { LogicalPlan::TableScan(scan) => { let limit = if fetch != 0 { fetch + skip } else { 0 }; let new_fetch = scan.fetch.map(|x| min(x, limit)).or(Some(limit)); if new_fetch == scan.fetch { - println!("here"); None } else { let new_input = LogicalPlan::TableScan(TableScan { @@ -132,7 +128,6 @@ impl OptimizerRule for PushDownLimit { fetch: scan.fetch.map(|x| min(x, limit)).or(Some(limit)), projected_schema: scan.projected_schema.clone(), }); - println!("new input"); Some(plan.with_new_inputs(&[new_input])?) } } From dc00458269dc78d6b8e89654bf1be472dc43e99e Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Wed, 29 Mar 2023 20:22:33 +0200 Subject: [PATCH 3/8] revert fmt --- datafusion/optimizer/src/push_down_limit.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 25accc966ed1..6703a1d787a7 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -17,16 +17,14 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) -use std::sync::Arc; - +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union}, CrossJoin, }; - -use crate::optimizer::ApplyOrder; -use crate::{OptimizerConfig, OptimizerRule}; +use std::sync::Arc; /// Optimization rule that tries to push down LIMIT. #[derive(Default)] @@ -277,16 +275,14 @@ fn push_down_join(join: &Join, limit: usize) -> Option { mod test { use std::vec; + use super::*; + use crate::test::*; use datafusion_expr::{ col, exists, logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan}, max, }; - use crate::test::*; - - use super::*; - fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected) } From c3415de978c576b65ffa0fa71b5692b1a767c00d Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Fri, 31 Mar 2023 21:40:21 +0200 Subject: [PATCH 4/8] validate for symmetricHashJoinExec --- .../core/src/physical_plan/joins/symmetric_hash_join.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index dafd0bfd4940..0c1b3b64b4db 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -475,6 +475,11 @@ impl ExecutionPlan for SymmetricHashJoinExec { partition: usize, context: Arc, ) -> Result { + if self.left.output_partitioning() != self.right.output_partitioning() { + return Err(DataFusionError::Plan(format!( + "Invalid SymmetricHashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", + ))); + } // If `filter_state` and `filter` are both present, then calculate sorted filter expressions // for both sides, and build an expression graph if one is not already built. let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = From bae83355f87b6e7d02cf5daa2a1110e4a8556724 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Tue, 4 Apr 2023 19:42:32 +0200 Subject: [PATCH 5/8] change error type --- datafusion/core/src/physical_plan/joins/hash_join.rs | 2 +- datafusion/core/src/physical_plan/joins/sort_merge_join.rs | 2 +- datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 152720429ac4..e813f0561ee6 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -373,7 +373,7 @@ impl ExecutionPlan for HashJoinExec { if self.mode == PartitionMode::Partitioned && self.left.output_partitioning() != self.right.output_partitioning() { - return Err(DataFusionError::Plan(format!( + return Err(DataFusionError::Internal(format!( "Invalid HashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", ))); } diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 83d11a6a65ee..cbdc1c2638c6 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -301,7 +301,7 @@ impl ExecutionPlan for SortMergeJoinExec { context: Arc, ) -> Result { if self.left.output_partitioning() != self.right.output_partitioning() { - return Err(DataFusionError::Plan(format!( + return Err(DataFusionError::Internal(format!( "Invalid SortMergeExec, partition count mismatch between children executors, consider using RepartitionExec", ))); } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 0c1b3b64b4db..b922b84a0108 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -476,7 +476,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { context: Arc, ) -> Result { if self.left.output_partitioning() != self.right.output_partitioning() { - return Err(DataFusionError::Plan(format!( + return Err(DataFusionError::Internal(format!( "Invalid SymmetricHashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", ))); } From e31c3bab0eb45e7753ef72c0b5829238b59c3864 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Wed, 5 Apr 2023 22:29:37 +0200 Subject: [PATCH 6/8] compare based on partition count --- datafusion/core/src/physical_plan/joins/hash_join.rs | 3 ++- datafusion/core/src/physical_plan/joins/sort_merge_join.rs | 6 ++++-- .../core/src/physical_plan/joins/symmetric_hash_join.rs | 4 +++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index e813f0561ee6..adf48651b03f 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -371,7 +371,8 @@ impl ExecutionPlan for HashJoinExec { let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); if self.mode == PartitionMode::Partitioned - && self.left.output_partitioning() != self.right.output_partitioning() + && self.left.output_partitioning().partition_count() + != self.right.output_partitioning().partition_count() { return Err(DataFusionError::Internal(format!( "Invalid HashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index cbdc1c2638c6..cd73d9713d32 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -300,9 +300,11 @@ impl ExecutionPlan for SortMergeJoinExec { partition: usize, context: Arc, ) -> Result { - if self.left.output_partitioning() != self.right.output_partitioning() { + if self.left.output_partitioning().partition_count() + != self.right.output_partitioning().partition_count() + { return Err(DataFusionError::Internal(format!( - "Invalid SortMergeExec, partition count mismatch between children executors, consider using RepartitionExec", + "Invalid SortMergeJoinExec, partition count mismatch between children executors, consider using RepartitionExec", ))); } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index b922b84a0108..b56b7161fbfb 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -475,7 +475,9 @@ impl ExecutionPlan for SymmetricHashJoinExec { partition: usize, context: Arc, ) -> Result { - if self.left.output_partitioning() != self.right.output_partitioning() { + if self.left.output_partitioning().partition_count() + != self.right.output_partitioning().partition_count() + { return Err(DataFusionError::Internal(format!( "Invalid SymmetricHashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", ))); From a29658b1140d4917c78f4d397ac77da543deee37 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Apr 2023 14:45:14 -0400 Subject: [PATCH 7/8] fix clippy --- datafusion/core/src/physical_plan/joins/hash_join.rs | 10 +++++----- .../core/src/physical_plan/joins/sort_merge_join.rs | 9 +++++---- .../src/physical_plan/joins/symmetric_hash_join.rs | 9 +++++---- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index adf48651b03f..805c209f1c82 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -370,12 +370,12 @@ impl ExecutionPlan for HashJoinExec { ) -> Result { let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); - if self.mode == PartitionMode::Partitioned - && self.left.output_partitioning().partition_count() - != self.right.output_partitioning().partition_count() - { + let left_partitions = self.left.output_partitioning().partition_count(); + let right_partitions = self.right.output_partitioning().partition_count(); + if left_partitions != right_partitions { return Err(DataFusionError::Internal(format!( - "Invalid HashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", + "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\ + consider using RepartitionExec", ))); } diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index cd73d9713d32..ace46e38b5ab 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -300,11 +300,12 @@ impl ExecutionPlan for SortMergeJoinExec { partition: usize, context: Arc, ) -> Result { - if self.left.output_partitioning().partition_count() - != self.right.output_partitioning().partition_count() - { + let left_partitions = self.left.output_partitioning().partition_count(); + let right_partitions = self.right.output_partitioning().partition_count(); + if left_partitions != right_partitions { return Err(DataFusionError::Internal(format!( - "Invalid SortMergeJoinExec, partition count mismatch between children executors, consider using RepartitionExec", + "Invalid SortMergeJoinExec, partition count mismatch {left_partitions}!={right_partitions},\ + consider using RepartitionExec", ))); } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index b56b7161fbfb..d56931da9469 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -475,11 +475,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { partition: usize, context: Arc, ) -> Result { - if self.left.output_partitioning().partition_count() - != self.right.output_partitioning().partition_count() - { + let left_partitions = self.left.output_partitioning().partition_count(); + let right_partitions = self.right.output_partitioning().partition_count(); + if left_partitions != right_partitions { return Err(DataFusionError::Internal(format!( - "Invalid SymmetricHashJoinExec, partition count mismatch between children executors, consider using RepartitionExec", + "Invalid SymmetricHashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\ + consider using RepartitionExec", ))); } // If `filter_state` and `filter` are both present, then calculate sorted filter expressions From 5373f99285dcf3221a1128d856fff998a418691a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Apr 2023 15:04:51 -0400 Subject: [PATCH 8/8] =?UTF-8?q?fix=20bug=20=F0=9F=A4=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datafusion/core/src/physical_plan/joins/hash_join.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 805c209f1c82..46f2d92903f1 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -372,7 +372,8 @@ impl ExecutionPlan for HashJoinExec { let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); - if left_partitions != right_partitions { + if self.mode == PartitionMode::Partitioned && left_partitions != right_partitions + { return Err(DataFusionError::Internal(format!( "Invalid HashJoinExec, partition count mismatch {left_partitions}!={right_partitions},\ consider using RepartitionExec",