Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window Linear Mode use smaller buffers #9597

Merged

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Mar 13, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

Background: In Linear mode of the BoundedWindowAggExec none of the partition by expressions are ordered. In these cases, we can generate result for a partition as long as a new row with same partition is received. Otherwise, result cannot be generated for the partition. As an example consider the table,

sn hash
0 2
1 2
2 2
3 2
4 1
5 1
6 1
7 1
8 0
9 0

Assume following query is executed on this table

SELECT *, COUNT(*) OVER(PARTITION BY hash ORDER BY sn RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) as count
FROM test;

datafusion will generate following plan

"ProjectionExec: expr=[sn@0 as sn,  hash@1 as hash, COUNT([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"hash\", index: 1 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as count]",
"  BoundedWindowAggExec: wdw=[COUNT([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"duplicated_hash\", index: 3 }, options: SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name: \"COUNT([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"duplicated_hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \\\"hash\\\", index: 1 }, options: SortOptions { descending: false, nulls_first: true } }]]\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(1)) }], mode=[Linear]",
"    SourceExec: partition_sizes=1, projection=[sn, hash], output_ordering=[sn@0 ASC NULLS LAST]",

where BoundedWindowAggExec is in Linear mode. Partition with hash=2 receives following section from the input table

sn hash
0 2
1 2
2 2
3 2

query above can generate following result for the section above

sn hash count
0 2 2
1 2 2
2 2
3 2

Since in query we have range between unbounded preceding and 1 following for sn=2 we cannot generate result until sn=4 is received (where it is guaranteed end range for the sn=3). Same thing applies to other partitions with different hash values. However, from the input data, it can be seen that for the partition: hash=2 possible future rows cannot have sn=3,sn=4, etc. (where most recent data the input is sn=9). If we can use this information, we can generate early results for different partitions. Also this enables us to use less memory. With this information we can generate the following
result

sn hash
0 2
1 2
2 2
3 2
4 1
5 1
6 1
7 1
8
9

instead of current behaviour with result

sn hash
0 2
1 2
2
3
4 1
5 1
6
7
8
9

This enables to use less memory when cardinality is high for partition by expressions, and window frame query is either RANGE or GROUPS query (For ROWS queries we need the new row that belong to same partition anyway.).

What changes are included in this PR?

Are these changes tested?

Yes.

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate labels Mar 13, 2024
@mustafasrepo mustafasrepo marked this pull request as draft March 13, 2024 14:13
@mustafasrepo mustafasrepo marked this pull request as ready for review March 13, 2024 14:34
@@ -566,6 +567,50 @@ fn get_random_window_frame(rng: &mut StdRng, is_linear: bool) -> WindowFrame {
// should work only with WindowAggExec
window_frame
}
};
window_frame.start_bound =
Copy link
Contributor Author

@mustafasrepo mustafasrepo Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes introduced to be able to test BoundedWindowAggExec with current row bound.

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this carefully and LGTM

@ozankabak
Copy link
Contributor

There seems to be an issue with how we print plans making CI fail. I will let @mustafasrepo fix it and then merge

@ozankabak ozankabak merged commit 7d3747c into apache:main Mar 17, 2024
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants