-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move source repartitioning into ExecutionPlan::repartition
#7936
Changes from 2 commits
c8e184a
86f3015
bd9d261
bdc8834
bee0ba5
ef30b3d
9dab596
34a0d29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,9 +26,6 @@ use std::fmt::Formatter; | |
use std::sync::Arc; | ||
|
||
use crate::config::ConfigOptions; | ||
use crate::datasource::physical_plan::CsvExec; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now enforce distribution is not dependent on the specific operators 🎉 |
||
#[cfg(feature = "parquet")] | ||
use crate::datasource::physical_plan::ParquetExec; | ||
use crate::error::Result; | ||
use crate::physical_optimizer::utils::{ | ||
add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, | ||
|
@@ -1188,7 +1185,6 @@ fn ensure_distribution( | |
// When `false`, round robin repartition will not be added to increase parallelism | ||
let enable_round_robin = config.optimizer.enable_round_robin_repartition; | ||
let repartition_file_scans = config.optimizer.repartition_file_scans; | ||
let repartition_file_min_size = config.optimizer.repartition_file_min_size; | ||
let batch_size = config.execution.batch_size; | ||
let is_unbounded = unbounded_output(&dist_context.plan); | ||
// Use order preserving variants either of the conditions true | ||
|
@@ -1265,25 +1261,13 @@ fn ensure_distribution( | |
// Unless partitioning doesn't increase the partition count, it is not beneficial: | ||
&& child.output_partitioning().partition_count() < target_partitions | ||
{ | ||
// When `repartition_file_scans` is set, leverage source operators | ||
// (`ParquetExec`, `CsvExec` etc.) to increase parallelism at the source. | ||
// When `repartition_file_scans` is set, attempt to increase | ||
// parallelism at the source. | ||
if repartition_file_scans { | ||
#[cfg(feature = "parquet")] | ||
if let Some(parquet_exec) = | ||
child.as_any().downcast_ref::<ParquetExec>() | ||
if let Some(new_child) = | ||
child.repartitioned(target_partitions, config)? | ||
{ | ||
child = Arc::new(parquet_exec.get_repartitioned( | ||
target_partitions, | ||
repartition_file_min_size, | ||
)); | ||
} | ||
if let Some(csv_exec) = child.as_any().downcast_ref::<CsvExec>() { | ||
if let Some(csv_exec) = csv_exec.get_repartitioned( | ||
target_partitions, | ||
repartition_file_min_size, | ||
) { | ||
child = Arc::new(csv_exec); | ||
} | ||
child = new_child; | ||
} | ||
} | ||
// Increase parallelism by adding round-robin repartitioning | ||
|
@@ -1642,9 +1626,9 @@ mod tests { | |
use crate::datasource::file_format::file_compression_type::FileCompressionType; | ||
use crate::datasource::listing::PartitionedFile; | ||
use crate::datasource::object_store::ObjectStoreUrl; | ||
use crate::datasource::physical_plan::FileScanConfig; | ||
#[cfg(feature = "parquet")] | ||
use crate::datasource::physical_plan::ParquetExec; | ||
use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; | ||
use crate::physical_optimizer::enforce_sorting::EnforceSorting; | ||
use crate::physical_optimizer::output_requirements::OutputRequirements; | ||
use crate::physical_plan::aggregates::{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,6 +76,7 @@ pub use crate::metrics::Metric; | |
pub use crate::topk::TopK; | ||
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; | ||
|
||
use datafusion_common::config::ConfigOptions; | ||
pub use datafusion_common::hash_utils; | ||
pub use datafusion_common::utils::project_schema; | ||
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; | ||
|
@@ -209,15 +210,34 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { | |
children: Vec<Arc<dyn ExecutionPlan>>, | ||
) -> Result<Arc<dyn ExecutionPlan>>; | ||
|
||
/// creates an iterator | ||
/// If supported, changes the partitioning of this `ExecutionPlan` to | ||
/// produce `target_partitions` partitions. | ||
/// | ||
/// If the `ExecutionPlan` does not support changing its partitioning, | ||
/// returns `Ok(None)` (the default). | ||
/// | ||
/// The DataFusion optimizer attempts to use as many threads as possible by | ||
/// repartitioning its inputs to match the target number of threads | ||
/// available (`target_partitions`). Some data sources, such as the built in | ||
/// CSV and Parquet readers, are able to read from their input files in | ||
/// parallel, regardless of how the source data is split amongst files. | ||
fn repartitioned( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR hoists this common code to the |
||
&self, | ||
_target_partitions: usize, | ||
_config: &ConfigOptions, | ||
) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
Ok(None) | ||
} | ||
|
||
/// Begin execution of `partition`, returning a stream of [`RecordBatch`]es. | ||
fn execute( | ||
&self, | ||
partition: usize, | ||
context: Arc<TaskContext>, | ||
) -> Result<SendableRecordBatchStream>; | ||
|
||
/// Return a snapshot of the set of [`Metric`]s for this | ||
/// [`ExecutionPlan`]. | ||
/// [`ExecutionPlan`]. If no `Metric`s are available, return None. | ||
/// | ||
/// While the values of the metrics in the returned | ||
/// [`MetricsSet`]s may change as execution progresses, the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just was moved into the
impl ExecutionPlan
and the signature is changed