Skip to content

Commit

Permalink
[CBO] JoinSelection Rule, select HashJoin Partition Mode based on the…
Browse files Browse the repository at this point in the history
… Join Type and available statistics, option for SortMergeJoin (#4219)

* [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the available statistics

* Fix HashJoin CollectLeft bug, refine UT to cover 'enable'/'disable' repartition_joins

* add comments

* ignore 0 stats

* Resolve review comments, add intg UT for SMJ

* fix conflicts

* tiny fix to doc

* refine swap_join_filter()

* update configs.md

* fix configs.md
  • Loading branch information
mingmwang authored Nov 24, 2022
1 parent 22fdbcf commit 561be4f
Show file tree
Hide file tree
Showing 17 changed files with 2,100 additions and 1,225 deletions.
27 changes: 27 additions & 0 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";
/// Type of `TableProvider` to use when loading `default` schema
pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";

/// Configuration option "datafusion.optimizer.top_down_join_key_reordering"
pub const OPT_TOP_DOWN_JOIN_KEY_REORDERING: &str =
"datafusion.optimizer.top_down_join_key_reordering";

/// Configuration option "datafusion.optimizer.prefer_hash_join"
pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";

/// Configuration option "atafusion.optimizer.hash_join_single_partition_threshold"
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
"datafusion.optimizer.hash_join_single_partition_threshold";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -266,6 +277,22 @@ impl BuiltInConfigs {
"Type of `TableProvider` to use when loading `default` schema. Defaults to None",
None,
),
ConfigDefinition::new_bool(
OPT_TOP_DOWN_JOIN_KEY_REORDERING,
"When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true",
true,
),
ConfigDefinition::new_bool(
OPT_PREFER_HASH_JOIN,
"When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently\
than SortMergeJoin but consumes more memory. Defaults to true",
true,
),
ConfigDefinition::new_u64(
OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD,
"The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
1024 * 1024,
),
]
}
}
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::{
logical_expr::{PlanType, ToStringifiedPlan},
optimizer::optimizer::Optimizer,
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
aggregate_statistics::AggregateStatistics, join_selection::JoinSelection,
optimizer::PhysicalOptimizerRule,
},
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -1166,8 +1166,6 @@ pub struct SessionConfig {
pub parquet_pruning: bool,
/// Should DataFusion collect statistics after listing files
pub collect_statistics: bool,
/// Should DataFusion optimizer run a top down process to reorder the join keys
pub top_down_join_key_reordering: bool,
/// Configuration options
pub config_options: Arc<RwLock<ConfigOptions>>,
/// Opaque extensions.
Expand All @@ -1187,7 +1185,6 @@ impl Default for SessionConfig {
repartition_windows: true,
parquet_pruning: true,
collect_statistics: false,
top_down_join_key_reordering: true,
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
Expand Down Expand Up @@ -1508,7 +1505,7 @@ impl SessionState {

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
Arc::new(JoinSelection::new()),
];
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
if config
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
//!
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
Expand Down Expand Up @@ -72,7 +73,11 @@ impl PhysicalOptimizerRule for BasicEnforcement {
config: &SessionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.target_partitions;
let top_down_join_key_reordering = config.top_down_join_key_reordering;
let top_down_join_key_reordering = config
.config_options()
.read()
.get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
.unwrap_or_default();
let new_plan = if top_down_join_key_reordering {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new(plan);
Expand Down Expand Up @@ -209,6 +214,12 @@ fn adjust_input_keys_ordering(
request_key_ordering: vec![None, new_right_request],
}))
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and generate new empty requirements
Ok(Some(PlanWithKeyRequirements::new(
requirements.plan.clone(),
)))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
plan_any.downcast_ref::<CrossJoinExec>()
Expand Down
Loading

0 comments on commit 561be4f

Please sign in to comment.