diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs index 1c123c1242fe3..d6cbe46d2a8f3 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs @@ -137,8 +137,11 @@ impl RewriteRules for MemberRules { "?old_members", "?filters", "?orders", - "?limit", - "?offset", + // If CubeScan already have limit and offset it would be incorrect to push aggregation into it + // Aggregate(CubeScan(limit, offset)) would run aggregation over limited rows + // CubeScan(aggregation, limit, offset) would return limited groups + "CubeScanLimit:None", + "CubeScanOffset:None", "?split", "?can_pushdown_join", "CubeScanWrapped:false", @@ -164,8 +167,8 @@ impl RewriteRules for MemberRules { ), "?filters", "?orders", - "?limit", - "?offset", + "CubeScanLimit:None", + "CubeScanOffset:None", "?split", "?new_pushdown_join", "CubeScanWrapped:false", diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/cube_scan_wrapper.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/cube_scan_wrapper.rs index 29ae2e2328df2..1e7cccd145534 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/cube_scan_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/cube_scan_wrapper.rs @@ -3,8 +3,9 @@ use crate::{ cube_scan, cube_scan_wrapper, rewrite, rewriter::{CubeEGraph, CubeRewrite}, rules::wrapper::WrapperRules, - transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube, CubeScanUngrouped, - LogicalPlanLanguage, WrapperPullupReplacerAliasToCube, WrapperPullupReplacerPushToCube, + transforming_rewrite, wrapper_pullup_replacer, CubeScanAliasToCube, CubeScanLimit, + CubeScanOffset, CubeScanUngrouped, LogicalPlanLanguage, WrapperPullupReplacerAliasToCube, + WrapperPullupReplacerPushToCube, }, var, var_iter, }; @@ -51,6 +52,8 @@ impl WrapperRules { self.transform_wrap_cube_scan( "?members", "?alias_to_cube", + "?limit", + "?offset", "?ungrouped", "?alias_to_cube_out", "?push_to_cube_out", @@ -77,16 +80,28 @@ impl WrapperRules { &self, members_var: &'static str, alias_to_cube_var: &'static str, + limit_var: &'static str, + offset_var: &'static str, ungrouped_cube_var: &'static str, alias_to_cube_var_out: &'static str, push_to_cube_out_var: &'static str, ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { let members_var = var!(members_var); let alias_to_cube_var = var!(alias_to_cube_var); + let limit_var = var!(limit_var); + let offset_var = var!(offset_var); let ungrouped_cube_var = var!(ungrouped_cube_var); let alias_to_cube_var_out = var!(alias_to_cube_var_out); let push_to_cube_out_var = var!(push_to_cube_out_var); move |egraph, subst| { + let mut has_no_limit_or_offset = true; + for limit in var_iter!(egraph[subst[limit_var]], CubeScanLimit).cloned() { + has_no_limit_or_offset &= limit.is_none(); + } + for offset in var_iter!(egraph[subst[offset_var]], CubeScanOffset).cloned() { + has_no_limit_or_offset &= offset.is_none(); + } + if let Some(_) = egraph[subst[members_var]].data.member_name_to_expr { for alias_to_cube in var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube).cloned() @@ -94,10 +109,15 @@ impl WrapperRules { for ungrouped in var_iter!(egraph[subst[ungrouped_cube_var]], CubeScanUngrouped).cloned() { + // When CubeScan already has limit or offset, it's unsafe to allow to push + // anything on top to Cube. + // Especially aggregation: aggregate does not commute with limit, + // so it would be incorrect to join them to single CubeScan + let push_to_cube_out = ungrouped && has_no_limit_or_offset; subst.insert( push_to_cube_out_var, egraph.add(LogicalPlanLanguage::WrapperPullupReplacerPushToCube( - WrapperPullupReplacerPushToCube(ungrouped), + WrapperPullupReplacerPushToCube(push_to_cube_out), )), ); subst.insert( diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index b899760160c2d..e33fc656d1176 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1,3 +1,4 @@ +use cubeclient::models::V1LoadRequestQuery; use datafusion::physical_plan::displayable; use pretty_assertions::assert_eq; use serde_json::json; @@ -1080,3 +1081,127 @@ async fn test_wrapper_filter_flatten() { } ); } + +/// Regular aggregation over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan +#[tokio::test] +async fn wrapper_agg_over_limit() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" + SELECT + customer_gender + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + LIMIT 5 + ) scan + GROUP BY + 1 + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec![]), + segments: Some(vec![]), + order: Some(vec![]), + limit: Some(5), + ungrouped: Some(true), + ..Default::default() + } + ); + + assert!(logical_plan + .find_cube_scan_wrapper() + .wrapped_sql + .unwrap() + .sql + .contains("\"limit\": 5")); + assert!(query_plan + .as_logical_plan() + .find_cube_scan_wrapper() + .wrapped_sql + .unwrap() + .sql + .contains("\"ungrouped\": true")); +} + +/// Aggregation(dimension) over CubeScan(limit=n, ungrouped=true) is NOT pushed to CubeScan +#[tokio::test] +async fn wrapper_agg_dimension_over_limit() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" + SELECT + MAX(customer_gender) + FROM ( + SELECT + customer_gender + FROM + KibanaSampleDataEcommerce + LIMIT 5 + ) scan + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let logical_plan = query_plan.as_logical_plan(); + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![]), + dimensions: Some(vec![]), + segments: Some(vec![]), + order: Some(vec![]), + limit: Some(5), + ungrouped: Some(true), + ..Default::default() + } + ); + + assert!(logical_plan + .find_cube_scan_wrapper() + .wrapped_sql + .unwrap() + .sql + .contains("\"limit\": 5")); + assert!(query_plan + .as_logical_plan() + .find_cube_scan_wrapper() + .wrapped_sql + .unwrap() + .sql + .contains("\"ungrouped\": true")); +}