Skip to content

Commit

Permalink
Recursive CTEs: Stage 3 - add execution support (#8840)
Browse files Browse the repository at this point in the history
* rebase all execution and preceding recursive cte work

add config flag for recursive ctes

update docs from script

update slt test for doc change

restore testing pin

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

fix sql integration test

wip: add sql test for logical plan

wip: format test assertion

wip: remove uncessary with qualifier method

some docs

more docs

Add comments to `RecursiveQuery`

Update datfusion-cli Cargo.lock

Fix clippy

better errors and comments

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

fix sql integration test

wip: add sql test for logical plan

wip: format test assertion

wip: remove uncessary with qualifier method

some docs

more docs

impl execution support

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

partway through porting over isidentical's work

Continuing implementation with fixes and improvements

Lint fixes

ensure that repartitions are not added immediately after RecursiveExec
in the physical-plan

add trivial sqllogictest

more recursive tests

remove test that asserts recursive cte should fail

additional cte test

wip: remove tokio from physical plan dev deps

format cargo tomls

fix issue where CTE could not be referenced more than 1 time

wip: fixes after rebase but tpcds_physical_q54 keeps overflowing its stack

Impl NamedRelation as CteWorkTable

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

* upd

* assign work table state

* upd

* upd

fix min repro but still broken on larger test case

set config in sql logic tests

clean up cte slt tests

fixes

fix option

add group by test case and more test case files

wip

add window function recursive cte example

simplify stream impl for recrusive query stream

add explain to trivial test case

move setting of recursive ctes to slt file and add test to ensure multiple record batches are produced each iteration

remove tokio dep and remove mut

lint, comments and remove tokio stream

update submodule pin to feat branch that contains csvs

update submodule pin to feat branch that contains csvs

* error if recursive ctes are nested

* error if recursive cte is referenced multiple times within the recursive term

* wip

* fix rebase

* move testing files into main repo

* update testing pin to main pin

* tweaks
  • Loading branch information
matthewgapp authored Jan 27, 2024
1 parent a7a74fa commit a6cdd0d
Show file tree
Hide file tree
Showing 14 changed files with 1,330 additions and 9 deletions.
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::not_impl_err;
use datafusion_physical_plan::work_table::WorkTableExec;

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};

use datafusion_common::DataFusionError;

use crate::datasource::{TableProvider, TableType};
use crate::execution::context::SessionState;

Expand Down Expand Up @@ -84,7 +82,11 @@ impl TableProvider for CteWorkTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("scan not implemented for CteWorkTable yet")
// TODO: pushdown filters and limits
Ok(Arc::new(WorkTableExec::new(
self.name.clone(),
self.table_schema.clone(),
)))
}

fn supports_filter_pushdown(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ fn try_swapping_with_streaming_table(
StreamingTableExec::try_new(
streaming_table.partition_schema().clone(),
streaming_table.partitions().clone(),
Some(&new_projections),
Some(new_projections.as_ref()),
lex_orderings,
streaming_table.is_infinite(),
)
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::recursive_query::RecursiveQueryExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
Expand Down Expand Up @@ -894,7 +895,7 @@ impl DefaultPhysicalPlanner {
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}
LogicalPlan::Union(Union { inputs, .. }) => {
LogicalPlan::Union(Union { inputs, schema: _ }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

Ok(Arc::new(UnionExec::new(physical_plans)))
Expand Down Expand Up @@ -1288,8 +1289,10 @@ impl DefaultPhysicalPlanner {
Ok(plan)
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => {
not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet")
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct,.. }) => {
let static_term = self.create_initial_plan(static_term, session_state).await?;
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;
Ok(Arc::new(RecursiveQueryExec::try_new(name.clone(), static_term, recursive_term, *is_distinct)?))
}
};
exec_plan
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/tests/data/recursive_cte/balance.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
time,name,account_balance
1,John,100
1,Tim,200
2,John,300
2,Tim,400
4 changes: 4 additions & 0 deletions datafusion/core/tests/data/recursive_cte/growth.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name,account_growth
John,3
Tim,20
Eliza,150
101 changes: 101 additions & 0 deletions datafusion/core/tests/data/recursive_cte/prices.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
Index,product,price,prices_row_num
1,Holden,334.8,1
2,Mercedes-Benz,623.22,2
3,Aston Martin,363.48,3
4,GMC,615.67,4
5,Lincoln,521.13,5
6,Mitsubishi,143.05,6
7,Infiniti,861.82,7
8,Ford,330.57,8
9,GMC,136.87,9
10,Toyota,106.29,10
11,Pontiac,686.95,11
12,Ford,197.48,12
13,Honda,774.42,13
14,Dodge,854.26,14
15,Bentley,628.82,15
16,Chevrolet,756.82,16
17,Volkswagen,438.51,17
18,Mazda,156.15,18
19,Hyundai,322.43,19
20,Oldsmobile,979.95,20
21,Geo,359.59,21
22,Ford,960.75,22
23,Subaru,106.75,23
24,Pontiac,13.4,24
25,Mercedes-Benz,858.46,25
26,Subaru,55.72,26
27,BMW,316.69,27
28,Chevrolet,290.32,28
29,Mercury,296.8,29
30,Dodge,410.78,30
31,Oldsmobile,18.07,31
32,Subaru,442.22,32
33,Dodge,93.29,33
34,Honda,282.9,34
35,Chevrolet,750.87,35
36,Lexus,249.82,36
37,Ford,732.33,37
38,Toyota,680.78,38
39,Nissan,657.01,39
40,Mazda,200.76,40
41,Nissan,251.44,41
42,Buick,714.44,42
43,Ford,436.2,43
44,Volvo,865.53,44
45,Saab,471.52,45
46,Mercedes-Benz,51.13,46
47,Chrysler,943.52,47
48,Lamborghini,181.6,48
49,Hyundai,634.89,49
50,Ford,757.58,50
51,Porsche,294.64,51
52,Ford,261.34,52
53,Chrysler,822.01,53
54,Audi,430.68,54
55,Mitsubishi,69.12,55
56,Mazda,723.16,56
57,Mazda,711.46,57
58,Land Rover,435.15,58
59,Buick,189.58,59
60,GMC,651.92,60
61,Mazda,491.37,61
62,BMW,346.18,62
63,Ford,456.25,63
64,Ford,10.65,64
65,Mazda,985.39,65
66,Mercedes-Benz,955.79,66
67,Honda,550.95,67
68,Mitsubishi,127.6,68
69,Mercedes-Benz,840.65,69
70,Infiniti,647.45,70
71,Bentley,827.26,71
72,Lincoln,822.22,72
73,Plymouth,970.55,73
74,Ford,595.05,74
75,Maybach,808.46,75
76,Chevrolet,341.48,76
77,Jaguar,759.03,77
78,Land Rover,625.01,78
79,Lincoln,289.13,79
80,Suzuki,285.24,80
81,GMC,253.4,81
82,Oldsmobile,174.76,82
83,Lincoln,434.17,83
84,Dodge,887.38,84
85,Mercedes-Benz,308.65,85
86,GMC,182.71,86
87,Ford,619.62,87
88,Lexus,228.63,88
89,Hyundai,901.06,89
90,Chevrolet,615.65,90
91,GMC,460.19,91
92,Mercedes-Benz,729.28,92
93,Dodge,414.69,93
94,Maserati,300.83,94
95,Suzuki,503.64,95
96,Audi,275.05,96
97,Ford,303.25,97
98,Lotus,101.01,98
99,Lincoln,721.05,99
100,Kia,833.31,100
10 changes: 10 additions & 0 deletions datafusion/core/tests/data/recursive_cte/sales.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
region_id,salesperson_id,sale_amount
101,1,1000
102,2,500
101,2,700
103,3,800
102,4,300
101,4,400
102,5,600
103,6,500
101,7,900
8 changes: 8 additions & 0 deletions datafusion/core/tests/data/recursive_cte/salespersons.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
salesperson_id,manager_id
1,
2,1
3,1
4,2
5,2
6,3
7,3
5 changes: 5 additions & 0 deletions datafusion/core/tests/data/recursive_cte/time.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
time,other
1,foo
2,bar
4,baz
5,qux
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod metrics;
mod ordering;
pub mod placeholder_row;
pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod sorts;
pub mod stream;
Expand All @@ -71,6 +72,7 @@ pub mod union;
pub mod unnest;
pub mod values;
pub mod windows;
pub mod work_table;

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
Expand Down
Loading

0 comments on commit a6cdd0d

Please sign in to comment.