-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change required input ordering physical plan API to allow any NULLS FIRST / LAST and ASC / DESC #5772
Conversation
LGTM |
pub(crate) fn calc_requirements( | ||
partition_by_exprs: &[Arc<dyn PhysicalExpr>], | ||
orderby_sort_exprs: &[PhysicalSortExpr], | ||
) -> Option<Vec<PhysicalSortRequirement>> { | ||
let mut sort_reqs = vec![]; | ||
for partition_by in partition_by_exprs { | ||
sort_reqs.push(PhysicalSortRequirement { | ||
expr: partition_by.clone(), | ||
options: None, | ||
}); | ||
} | ||
for PhysicalSortExpr { expr, options } in orderby_sort_exprs { | ||
let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr)); | ||
if !contains { | ||
sort_reqs.push(PhysicalSortRequirement { | ||
expr: expr.clone(), | ||
options: Some(*options), | ||
}); | ||
} | ||
} | ||
// Convert empty result to None. Otherwise wrap result inside Some() | ||
(!sort_reqs.is_empty()).then_some(sort_reqs) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is overlap between partition by keys and sort keys, I think we should respect the sort key's requirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of PARTITION BY a, ORDER BY a
. We do not add requirement for column a
. The reason is that all partition would consist of the same value of a
. Hence ORDER BY
doesn't really define a direction(all a
values would be same.). Above operation can produce correct result when its input is ordered according a ASC
or a DESC
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if the original SQL is PARTITION BY a, ORDER BY a DESC
, should we respect the direction? I'm not sure for this.
It think for cases like ROWNUM() over
, the direction matters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ORDER BY
is local to each partition, I think we are fine here. There is some related discussion here: https://stackoverflow.com/questions/50364818/using-the-same-column-in-partition-by-and-order-by-with-dense-rank
I am still taking a note to remind us that we may want to revisit this if we find out information indicating otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Computing rownum()
over PARTITION BY a, ORDER BY a DESC
I think will result in arbitrary assignments of row numbers (as @ozankabak and @mustafasrepo say above, the value of a
within each partition is the same)
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mingmwang and @mustafasrepo
FWIW I will try and update IOx to use this new API shortly after the PR is merged. I found it challenging last time (see related discussion https://github.com/apache/arrow-datafusion/pull/5661/files#r1148410281) as the signature
Vec<Option<Vec<PhysicalSortRequirement>>>
Imposes a significant cognitive load. However given this PR moves things forward and we don't have a competing alternate proposal I think we should merge the API as is.
for partition_by in partition_by_exprs { | ||
sort_reqs.push(PhysicalSortRequirement { | ||
expr: partition_by.clone(), | ||
options: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct that being able to express options:None
for partitioning operations is the key rationale (thing we can't do before) for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I vaguely remember us discussing internally that there are also some other use cases where some sort of ordering is required but the options don't matter, but I can't recall now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another example that the sorting direction is not that important is the SortMergeJoin, but the additional requirements
is for all its input, the ordering should be aligned, should be ASC
or DESC
altogether. I think the current PhysicalSortRequirement
API can not represent this alignment constraints explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other example is SortBasedAggregator, currently we do not support SortBasedAggregator in DataFusion
pub(crate) fn calc_requirements( | ||
partition_by_exprs: &[Arc<dyn PhysicalExpr>], | ||
orderby_sort_exprs: &[PhysicalSortExpr], | ||
) -> Option<Vec<PhysicalSortRequirement>> { | ||
let mut sort_reqs = vec![]; | ||
for partition_by in partition_by_exprs { | ||
sort_reqs.push(PhysicalSortRequirement { | ||
expr: partition_by.clone(), | ||
options: None, | ||
}); | ||
} | ||
for PhysicalSortExpr { expr, options } in orderby_sort_exprs { | ||
let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr)); | ||
if !contains { | ||
sort_reqs.push(PhysicalSortRequirement { | ||
expr: expr.clone(), | ||
options: Some(*options), | ||
}); | ||
} | ||
} | ||
// Convert empty result to None. Otherwise wrap result inside Some() | ||
(!sort_reqs.is_empty()).then_some(sort_reqs) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Computing rownum()
over PARTITION BY a, ORDER BY a DESC
I think will result in arbitrary assignments of row numbers (as @ozankabak and @mustafasrepo say above, the value of a
within each partition is the same)
|
||
/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the | ||
/// provided [`PhysicalSortExpr`]s. | ||
pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalenceProperties>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be pub
? It seems like callers should always use ordering_satisfy_requirement
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although it is not absolutely necessary for both ordering_satisfy_requirement
and ordering_satisfy_requirement_concrete
to be public. Their API is different, we may want to use both of them. Actually, sort_enforcement
uses ordering_satisfy_requirement_concrete
.
Thanks @alamb -- I will create a separate follow-on that leverages type aliases to simplify things after this and its sister PR merges. |
I'll plan to merge this tomorrow unless anyone else would like time to comment |
Which issue does this PR close?
N.A
Note
The changes in this PR is received from the #5290 by @mingmwang.
Rationale for this change
For some executors it is enough that its input is ordered, however direction of ordering isn't important (such as
PARTITION BY
columns in theWindowAggExec
). Currentrequired_input_ordering
APIVec<Option<&[PhysicalSortExpr]>>
doesn't supports this subtlety. WherePhysicalSortExpr
is a struct encapsulatesexpr: Arc<dyn PhysicalExpr>
andoptions: SortOptions
.What changes are included in this PR?
In this PR we change
required_input_ordering
from
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>>
to
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
, where PhysicalSortRequirement is a struct encapsulatesexpr: Arc<dyn PhysicalExpr>
andoptions: Option<SortOptions>
. Ifoptions
field isNone
. It means that executor expects its input to be ordered but direction doesn't not matter.Also some util functions to convert in between these structs are added.
Are these changes tested?
Existing tests should work.
Are there any user-facing changes?
api change