Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubesql): Allow aggregation pushdown only for unlimited CubeScan #8929

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ impl RewriteRules for MemberRules {
"?old_members",
"?filters",
"?orders",
"?limit",
"?offset",
// If CubeScan already have limit and offset it would be incorrect to push aggregation into it
// Aggregate(CubeScan(limit, offset)) would run aggregation over limited rows
// CubeScan(aggregation, limit, offset) would return limited groups
"CubeScanLimit:None",
"CubeScanOffset:None",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
Expand All @@ -164,8 +167,8 @@ impl RewriteRules for MemberRules {
),
"?filters",
"?orders",
"?limit",
"?offset",
"CubeScanLimit:None",
"CubeScanOffset:None",
"?split",
"?new_pushdown_join",
"CubeScanWrapped:false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use crate::{
cube_scan, cube_scan_wrapper, rewrite,
rewriter::{CubeEGraph, CubeRewrite},
rules::wrapper::WrapperRules,
transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube, CubeScanUngrouped,
LogicalPlanLanguage, WrapperPullupReplacerAliasToCube, WrapperPullupReplacerPushToCube,
transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube, CubeScanLimit,
CubeScanOffset, CubeScanUngrouped, LogicalPlanLanguage, WrapperPullupReplacerAliasToCube,
WrapperPullupReplacerPushToCube,
},
var, var_iter,
};
Expand Down Expand Up @@ -51,6 +52,8 @@ impl WrapperRules {
self.transform_wrap_cube_scan(
"?members",
"?alias_to_cube",
"?limit",
"?offset",
"?ungrouped",
"?alias_to_cube_out",
"?push_to_cube_out",
Expand All @@ -77,27 +80,44 @@ impl WrapperRules {
&self,
members_var: &'static str,
alias_to_cube_var: &'static str,
limit_var: &'static str,
offset_var: &'static str,
ungrouped_cube_var: &'static str,
alias_to_cube_var_out: &'static str,
push_to_cube_out_var: &'static str,
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
let members_var = var!(members_var);
let alias_to_cube_var = var!(alias_to_cube_var);
let limit_var = var!(limit_var);
let offset_var = var!(offset_var);
let ungrouped_cube_var = var!(ungrouped_cube_var);
let alias_to_cube_var_out = var!(alias_to_cube_var_out);
let push_to_cube_out_var = var!(push_to_cube_out_var);
move |egraph, subst| {
let mut has_no_limit_or_offset = true;
for limit in var_iter!(egraph[subst[limit_var]], CubeScanLimit).cloned() {
has_no_limit_or_offset &= limit.is_none();
}
for offset in var_iter!(egraph[subst[offset_var]], CubeScanOffset).cloned() {
has_no_limit_or_offset &= offset.is_none();
}

if let Some(_) = egraph[subst[members_var]].data.member_name_to_expr {
for alias_to_cube in
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube).cloned()
{
for ungrouped in
var_iter!(egraph[subst[ungrouped_cube_var]], CubeScanUngrouped).cloned()
{
// When CubeScan already has limit or offset, it's unsafe to allow to push
// anything on top to Cube.
// Especially aggregation: aggregate does not commute with limit,
// so it would be incorrect to join them to single CubeScan
let push_to_cube_out = ungrouped && has_no_limit_or_offset;
subst.insert(
push_to_cube_out_var,
egraph.add(LogicalPlanLanguage::WrapperPullupReplacerPushToCube(
WrapperPullupReplacerPushToCube(ungrouped),
WrapperPullupReplacerPushToCube(push_to_cube_out),
)),
);
subst.insert(
Expand Down
125 changes: 125 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cubeclient::models::V1LoadRequestQuery;
use datafusion::physical_plan::displayable;
use pretty_assertions::assert_eq;
use serde_json::json;
Expand Down Expand Up @@ -1080,3 +1081,127 @@ async fn test_wrapper_filter_flatten() {
}
);
}

/// Regular aggregation over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan
#[tokio::test]
async fn wrapper_agg_over_limit() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
SELECT
customer_gender
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
LIMIT 5
) scan
GROUP BY
1
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(5),
ungrouped: Some(true),
..Default::default()
}
);

assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("\"limit\": 5"));
assert!(query_plan
.as_logical_plan()
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("\"ungrouped\": true"));
}

/// Aggregation(dimension) over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan
#[tokio::test]
async fn wrapper_agg_dimension_over_limit() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let query_plan = convert_select_to_query_plan(
// language=PostgreSQL
r#"
SELECT
MAX(customer_gender)
FROM (
SELECT
customer_gender
FROM
KibanaSampleDataEcommerce
LIMIT 5
) scan
"#
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(5),
ungrouped: Some(true),
..Default::default()
}
);

assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("\"limit\": 5"));
assert!(query_plan
.as_logical_plan()
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("\"ungrouped\": true"));
}
Loading