-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move repartition_file_scans
out of enable_round_robin
check in EnforceDistribution
rule
#8731
Changes from all commits
b7395a3
ebec14e
5ea68e6
d702b0f
a4ab9c2
aeb3f42
210e8df
f95e29e
17d2b87
54f7bf0
74a687f
aba4dc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1198,32 +1198,33 @@ fn ensure_distribution( | |
) | ||
.map( | ||
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| { | ||
// Don't need to apply when the returned row count is not greater than 1: | ||
// Don't need to apply when the returned row count is not greater than batch size | ||
let num_rows = child.plan.statistics()?.num_rows; | ||
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) { | ||
num_rows | ||
.get_value() | ||
.map(|value| value > &batch_size) | ||
.unwrap_or(true) | ||
.unwrap() // safe to unwrap since is_exact() is true | ||
} else { | ||
true | ||
}; | ||
|
||
// When `repartition_file_scans` is set, attempt to increase | ||
// parallelism at the source. | ||
if repartition_file_scans && repartition_beneficial_stats { | ||
if let Some(new_child) = | ||
child.plan.repartitioned(target_partitions, config)? | ||
{ | ||
child.plan = new_child; | ||
} | ||
} | ||
|
||
if enable_round_robin | ||
// Operator benefits from partitioning (e.g. filter): | ||
&& (would_benefit && repartition_beneficial_stats) | ||
// Unless partitioning doesn't increase the partition count, it is not beneficial: | ||
&& child.plan.output_partitioning().partition_count() < target_partitions | ||
{ | ||
// When `repartition_file_scans` is set, attempt to increase | ||
// parallelism at the source. | ||
if repartition_file_scans { | ||
if let Some(new_child) = | ||
child.plan.repartitioned(target_partitions, config)? | ||
{ | ||
child.plan = new_child; | ||
} | ||
Comment on lines
-1218
to
-1225
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
// Increase parallelism by adding round-robin repartitioning | ||
// on top of the operator. Note that we only do this if the | ||
// partition count is not already equal to the desired partition | ||
|
@@ -1362,17 +1363,10 @@ impl DistributionContext { | |
|
||
fn update_children(mut self) -> Result<Self> { | ||
for child_context in self.children_nodes.iter_mut() { | ||
child_context.distribution_connection = match child_context.plan.as_any() { | ||
plan_any if plan_any.is::<RepartitionExec>() => matches!( | ||
plan_any | ||
.downcast_ref::<RepartitionExec>() | ||
.unwrap() | ||
.partitioning(), | ||
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _) | ||
), | ||
Comment on lines
-1365
to
-1372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary matching. For all supported partitioning in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And changed to use utility functions like |
||
plan_any | ||
if plan_any.is::<SortPreservingMergeExec>() | ||
|| plan_any.is::<CoalescePartitionsExec>() => | ||
child_context.distribution_connection = match &child_context.plan { | ||
plan if is_repartition(plan) | ||
|| is_coalesce_partitions(plan) | ||
|| is_sort_preserving_merge(plan) => | ||
{ | ||
true | ||
} | ||
|
@@ -3871,14 +3865,14 @@ pub(crate) mod tests { | |
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", | ||
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", | ||
// Plan already has two partitions | ||
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]", | ||
"ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]", | ||
]; | ||
let expected_csv = [ | ||
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", | ||
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", | ||
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", | ||
// Plan already has two partitions | ||
"CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false", | ||
"CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false", | ||
]; | ||
|
||
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192 | |
--FilterExec: column1@0 != 42 | ||
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 | ||
|
||
# disable round robin repartitioning | ||
statement ok | ||
set datafusion.optimizer.enable_round_robin_repartition = false; | ||
|
||
## Expect to see the scan read the file as "4" groups with even sizes (offsets) again | ||
query TT | ||
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42; | ||
---- | ||
Comment on lines
+66
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I verified that this test case fails on current |
||
logical_plan | ||
Filter: parquet_table.column1 != Int32(42) | ||
--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] | ||
physical_plan | ||
CoalesceBatchesExec: target_batch_size=8192 | ||
--FilterExec: column1@0 != 42 | ||
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 | ||
|
||
# enable round robin repartitioning again | ||
statement ok | ||
set datafusion.optimizer.enable_round_robin_repartition = true; | ||
|
||
# create a second parquet file | ||
statement ok | ||
COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet' | ||
|
@@ -147,7 +167,7 @@ WITH HEADER ROW | |
LOCATION 'test_files/scratch/repartition_scan/csv_table/'; | ||
|
||
query I | ||
select * from csv_table; | ||
select * from csv_table ORDER BY column1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes the test result stable, otherwise every time I rerun it locally, it gets different results... |
||
---- | ||
1 | ||
2 | ||
|
@@ -190,7 +210,7 @@ STORED AS json | |
LOCATION 'test_files/scratch/repartition_scan/json_table/'; | ||
|
||
query I | ||
select * from "json_table"; | ||
select * from "json_table" ORDER BY column1; | ||
---- | ||
1 | ||
2 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3794,7 +3794,7 @@ select a, | |
1 1 | ||
2 1 | ||
|
||
# support scalar value in ORDER BY | ||
# support scalar value in ORDER BY | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simply ran all sqllogictests locally. Seems some space chars left there. |
||
query I | ||
select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x | ||
---- | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on what value is compared with (
batch_size
), I think this should bebatch_size
instead of1
.