Skip to content

Commit

Permalink
rebase all execution and preceding recursive cte work
Browse files Browse the repository at this point in the history
add config flag for recursive ctes

update docs from script

update slt test for doc change

restore testing pin

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

fix sql integration test

wip: add sql test for logical plan

wip: format test assertion

wip: remove uncessary with qualifier method

some docs

more docs

Add comments to `RecursiveQuery`

Update datfusion-cli Cargo.lock

Fix clippy

better errors and comments

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

fix sql integration test

wip: add sql test for logical plan

wip: format test assertion

wip: remove uncessary with qualifier method

some docs

more docs

impl execution support

add sql -> logical plan support

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan

partway through porting over isidentical's work

Continuing implementation with fixes and improvements

Lint fixes

ensure that repartitions are not added immediately after RecursiveExec
in the physical-plan

add trivial sqllogictest

more recursive tests

remove test that asserts recursive cte should fail

additional cte test

wip: remove tokio from physical plan dev deps

format cargo tomls

fix issue where CTE could not be referenced more than 1 time

wip: fixes after rebase but tpcds_physical_q54 keeps overflowing its stack

Impl NamedRelation as CteWorkTable

* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

* upd

* assign work table state

* upd

* upd

fix min repro but still broken on larger test case

set config in sql logic tests

clean up cte slt tests

fixes

fix option

add group by test case and more test case files

wip

add window function recursive cte example

simplify stream impl for recrusive query stream

add explain to trivial test case

move setting of recursive ctes to slt file and add test to ensure multiple record batches are produced each iteration

remove tokio dep and remove mut

lint, comments and remove tokio stream

update submodule pin to feat branch that contains csvs

update submodule pin to feat branch that contains csvs
  • Loading branch information
matthewgapp committed Jan 21, 2024
1 parent ae0f401 commit b11ca8b
Show file tree
Hide file tree
Showing 9 changed files with 1,173 additions and 22 deletions.
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::not_impl_err;
use datafusion_physical_plan::work_table::WorkTableExec;

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};

use datafusion_common::DataFusionError;

use crate::datasource::{TableProvider, TableType};
use crate::execution::context::SessionState;

Expand Down Expand Up @@ -84,7 +82,11 @@ impl TableProvider for CteWorkTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("scan not implemented for CteWorkTable yet")
// TODO: pushdown filters and limits
Ok(Arc::new(WorkTableExec::new(
self.name.clone(),
self.table_schema.clone(),
)))
}

fn supports_filter_pushdown(
Expand Down
33 changes: 20 additions & 13 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ fn try_swapping_with_csv(
projection: &ProjectionExec,
csv: &CsvExec,
) -> Option<Arc<dyn ExecutionPlan>> {
// info!("csv exec: {}", csv);
// If there is any non-column or alias-carrier expression, Projection should not be removed.
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = csv.base_config().clone();
let new_projections =
new_projections_for_columns(projection, &file_scan.projection);
file_scan.projection = Some(new_projections);
file_scan.projection = new_projections;

Arc::new(CsvExec::new(
file_scan,
Expand Down Expand Up @@ -194,7 +195,7 @@ fn try_swapping_with_memory(
MemoryExec::try_new(
memory.partitions(),
memory.original_schema(),
Some(new_projections),
new_projections,
)
.map(|e| Arc::new(e) as _)
})
Expand Down Expand Up @@ -238,7 +239,7 @@ fn try_swapping_with_streaming_table(
StreamingTableExec::try_new(
streaming_table.partition_schema().clone(),
streaming_table.partitions().clone(),
Some(&new_projections),
new_projections.as_ref(),
lex_orderings,
streaming_table.is_infinite(),
)
Expand Down Expand Up @@ -834,16 +835,22 @@ fn all_alias_free_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
fn new_projections_for_columns(
projection: &ProjectionExec,
source: &Option<Vec<usize>>,
) -> Vec<usize> {
projection
.expr()
.iter()
.filter_map(|(expr, _)| {
expr.as_any()
.downcast_ref::<Column>()
.and_then(|expr| source.as_ref().map(|proj| proj[expr.index()]))
})
.collect()
) -> Option<Vec<usize>> {
if source.is_none() {
return None;
}

Some(
projection
.expr()
.iter()
.filter_map(|(expr, _)| {
expr.as_any()
.downcast_ref::<Column>()
.and_then(|expr| source.as_ref().map(|proj| proj[expr.index()]))
})
.collect(),
)
}

/// The function operates in two modes:
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::recursive_query::RecursiveQueryExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
Expand Down Expand Up @@ -896,7 +897,7 @@ impl DefaultPhysicalPlanner {
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}
LogicalPlan::Union(Union { inputs, .. }) => {
LogicalPlan::Union(Union { inputs, schema: _ }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

Ok(Arc::new(UnionExec::new(physical_plans)))
Expand Down Expand Up @@ -1290,8 +1291,10 @@ impl DefaultPhysicalPlanner {
Ok(plan)
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => {
not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet")
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct,.. }) => {
let static_term = self.create_initial_plan(static_term, session_state).await?;
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;
Ok(Arc::new(RecursiveQueryExec::try_new(name.clone(), static_term, recursive_term, *is_distinct)?))
}
};
exec_plan
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod metrics;
mod ordering;
pub mod placeholder_row;
pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod sorts;
pub mod stream;
Expand All @@ -71,6 +72,7 @@ pub mod union;
pub mod unnest;
pub mod values;
pub mod windows;
pub mod work_table;

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
Expand Down
Loading

0 comments on commit b11ca8b

Please sign in to comment.