Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid generate duplicate sort Keys from Window Expressions, fix bug when decide Window Expressions ordering #4643

Merged
merged 9 commits into from
Dec 19, 2022

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Dec 15, 2022

Which issue does this PR close?

Closes: #4635
Closes: #4641

Rationale for this change

The current implementation might generate duplicate sort keys from Window Expressions, and the window Expressions
ordering is not consistent with the PG.

This PR addressed those issues.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions sql SQL Planner labels Dec 15, 2022
@mingmwang
Copy link
Contributor Author

@yahoNanJing @Ted-Jiang @alamb @jackwener
Please help to take a look.

@github-actions github-actions bot added the physical-expr Physical Expressions label Dec 15, 2022
} => Ok(Expr::Sort {
expr: expr.clone(),
asc: true,
nulls_first: false,
Copy link
Member

@Ted-Jiang Ted-Jiang Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here normalized_order_by_keys, using the order_by_keys do sort partition inner each hashed partition. Only care about same value are adjacent. So no need call about the sort order ? am i right 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, here the normalized_order_by_keys are for dedup purpose. The generated sort keys for window expressions are partition_by_keys + sort_keys. There might be over lap between partition_by_keys and sort_keys. The original implement leverage the contains() method to do the dedup but this is not enough.

@Ted-Jiang
Copy link
Member

Thanks for ping me ❤️, I will carefully review this today.

},
];
let result = generate_sort_key(partition_by, order_by)?;
assert_eq!(expected, result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked with this commit it will
change :
[age ASC NULLS FIRST, name ASC NULLS FIRST, created_at ASC NULLS FIRST, age ASC NULLS LAST, name ASC NULLS LAST]
to:
[age DESC NULLS FIRST, name DESC NULLS FIRST, created_at ASC NULLS LAST]
👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this PR will do additional dedup to avoid generate duplicate sort keys.

let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_csv(&ctx).await?;

let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100";
Copy link
Member

@Ted-Jiang Ted-Jiang Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think without commit, this test runs well, Is Enforcement physical rule eliminate the ORDER BY c2 🤔 @mingmwang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here i found this also eliminate the RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this pr only deal with sort, who did this🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, without this PR, the original physical plan added additional RepartitionExecs. this was because the additional Sort causes the two WindowAggExecs can not be collapsed.

Copy link
Member

@Ted-Jiang Ted-Jiang Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining, print the origin plan for anyone need it:

[
    "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
    "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
    "    SortExec: [c1@1 ASC,c2@2 ASC NULLS LAST]",
    "      CoalesceBatchesExec: target_batch_size=4096",
    "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 1 }], 2)",
    "          WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
    "            SortExec: [c1@0 ASC,c2@1 ASC,c2@1 ASC NULLS LAST]",
    "              CoalesceBatchesExec: target_batch_size=4096",
    "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2)",
    "                  RepartitionExec: partitioning=RoundRobinBatch(2)",
]

After this pr, let mut groups = group_window_expr_by_sort_keys(&window_exprs)?; will generate one group, that's why WindowAggExecs are collapsed.

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingmwang This improvement make sense to me. 👍 Only some question about the optimizer and need fix the clippy.

@mingmwang
Copy link
Contributor Author

Thanks. Those clippy errors are not caused by this PR. I'm waiting for someone else to fix those in the master and I will rebase.

@alamb
Copy link
Contributor

alamb commented Dec 16, 2022

Thanks. Those clippy errors are not caused by this PR. I'm waiting for someone else to fix those in the master and I will rebase.

I believe @jackwener has fixed them in #4652

@alamb
Copy link
Contributor

alamb commented Dec 16, 2022

cc @metesynnada and @retikulum as I believe you have interest in and are working on window expressions as well

Copy link
Member

@jackwener jackwener left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, a nice job, and it consider many details carefully.👍

Comment on lines +902 to +915
// FIXME sort on LargeUtf8 String has bug.
// let sql =
// "SELECT d3, row_number() OVER (partition by d3) as rn1 FROM test";
// let actual = execute_to_batches(&ctx, sql).await;
// let expected = vec![
// "+-------+-----+",
// "| d3 | rn1 |",
// "+-------+-----+",
// "| | 1 |",
// "| One | 1 |",
// "| Three | 1 |",
// "+-------+-----+",
// ];
// assert_batches_eq!(expected, &actual);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we can add some test in sqllogicaltest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ -- if anyone wants to help we are working on porting tests as part of #4495

Comment on lines +253 to +257
// To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
// we compare the sort key themselves and if one window's sort keys are a prefix of another
// put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
// The sort_by() implementation here is a stable sort.
// Note that by this rule if there's an empty over, it'll be at the top level
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 891a800 into apache:master Dec 19, 2022
@alamb
Copy link
Contributor

alamb commented Dec 19, 2022

Thanks @mingmwang @jackwener and @Ted-Jiang

@ursabot
Copy link

ursabot commented Dec 19, 2022

Benchmark runs are scheduled for baseline = dba34fc and contender = 891a800. 891a800 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner
Projects
None yet
5 participants