Skip to content

Commit

Permalink
minor: disable tpcds-q41 due to not support decorrelate disjunction s…
Browse files Browse the repository at this point in the history
…ubquery.
  • Loading branch information
jackwener committed Feb 25, 2023
1 parent 85ed386 commit cef194c
Show file tree
Hide file tree
Showing 24 changed files with 512 additions and 518 deletions.
25 changes: 12 additions & 13 deletions benchmarks/expected-plans/q18.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity)
Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]]
LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name]
TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
TableScan: lineitem projection=[l_orderkey, l_quantity]
SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_orderkey AS l_orderkey
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
TableScan: lineitem projection=[l_orderkey, l_quantity]
Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]]
LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name]
TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
TableScan: lineitem projection=[l_orderkey, l_quantity]
SubqueryAlias: __correlated_sq_1
Projection: lineitem.l_orderkey AS l_orderkey
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
TableScan: lineitem projection=[l_orderkey, l_quantity]
5 changes: 2 additions & 3 deletions benchmarks/expected-plans/q21.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
TableScan: orders projection=[o_orderkey, o_orderstatus]
Filter: nation.n_name = Utf8("SAUDI ARABIA")
TableScan: nation projection=[n_nationkey, n_name]
Projection: l2.l_orderkey, l2.l_suppkey
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
Projection: l3.l_orderkey, l3.l_suppkey
SubqueryAlias: l3
Filter: lineitem.l_receiptdate > lineitem.l_commitdate
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/expected-plans/q22.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ Sort: custsale.cntrycode ASC NULLS LAST
LeftAnti Join: customer.c_custkey = orders.o_custkey
Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
TableScan: customer projection=[c_custkey, c_phone, c_acctbal]
Projection: orders.o_custkey
TableScan: orders projection=[o_custkey]
TableScan: orders projection=[o_custkey]
SubqueryAlias: __scalar_sq_1
Projection: AVG(customer.c_acctbal) AS __value
Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
println!("{}", formatted);
assert!(formatted.contains("ParquetExec: limit=Some(10)"));
Ok(())
}
Expand Down Expand Up @@ -500,8 +501,7 @@ mod tests {
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2, abc.column3\
\n TableScan: abc projection=[column1, column2, column3]";
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let dataframe = session_ctx
Expand All @@ -512,9 +512,8 @@ mod tests {
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2, abc.column3\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2, column3]";
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);

let dataframe = session_ctx
Expand All @@ -525,9 +524,8 @@ mod tests {
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2]";
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2]";
assert_eq!(expected, actual);

Ok(())
Expand Down
16 changes: 2 additions & 14 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ mod tests {
use crate::config::ConfigOptions;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use crate::prelude::SessionContext;
use crate::test::create_vec_batches;
Expand All @@ -308,12 +307,7 @@ mod tests {

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
.input()
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.unwrap();
let coalesce = plan.as_any().downcast_ref::<CoalesceBatchesExec>().unwrap();
assert_eq!(1234, coalesce.target_batch_size);
Ok(())
}
Expand All @@ -325,13 +319,7 @@ mod tests {

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
// projection should directly wrap filter with no coalesce step
let _filter = projection
.input()
.as_any()
.downcast_ref::<FilterExec>()
.unwrap();
let _filter = plan.as_any().downcast_ref::<FilterExec>().unwrap();
Ok(())
}

Expand Down
34 changes: 14 additions & 20 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::from_slice::FromSlice;
use datafusion::logical_expr::{
col, Expr, LogicalPlan, LogicalPlanBuilder, Projection, TableScan, UNNAMED_TABLE,
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
Expand Down Expand Up @@ -214,24 +214,18 @@ async fn custom_source_dataframe() -> Result<()> {

let optimized_plan = state.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 2);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
},
_ => panic!("expect optimized_plan to be projection"),
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 2);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
}

let expected = format!(
"Projection: {UNNAMED_TABLE}.c2\
\n TableScan: {UNNAMED_TABLE} projection=[c2]"
);
let expected = format!("TableScan: {UNNAMED_TABLE} projection=[c2]");
assert_eq!(format!("{optimized_plan:?}"), expected);

let physical_plan = state.create_physical_plan(&optimized_plan).await?;
Expand All @@ -242,7 +236,7 @@ async fn custom_source_dataframe() -> Result<()> {
let batches = collect(physical_plan, state.task_ctx()).await?;
let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(2, batches[0].num_columns());
assert_eq!(origin_rec_batch.num_rows(), batches[0].num_rows());

Ok(())
Expand Down Expand Up @@ -270,8 +264,8 @@ async fn optimizers_catch_all_statistics() {
let expected = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("COUNT(UInt8(1))", DataType::Int64, false),
Field::new("MIN(test.c1)", DataType::Int32, false),
Field::new("MAX(test.c1)", DataType::Int32, false),
Field::new("MIN(c1)", DataType::Int32, false),
Field::new("MAX(c1)", DataType::Int32, false),
])),
vec![
Arc::new(Int64Array::from_slice([4])),
Expand Down
22 changes: 10 additions & 12 deletions datafusion/core/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ async fn select_with_alias_overwrite() -> Result<()> {
)?;

let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
ctx.register_batch("t", batch)?;

let df = ctx
.table("t")
Expand Down Expand Up @@ -502,12 +502,11 @@ async fn right_semi_with_alias_filter() -> Result<()> {
.select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
let optimized_plan = df.clone().into_optimized_plan()?;
let expected = vec![
"Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
" RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
"RightSemi Join: t1.a = t2.a [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
];

let formatted = optimized_plan.display_indent_schema().to_string();
Expand Down Expand Up @@ -547,11 +546,10 @@ async fn right_anti_filter_push_down() -> Result<()> {
.select(vec![col("t2.a"), col("t2.b"), col("t2.c")])?;
let optimized_plan = df.clone().into_optimized_plan()?;
let expected = vec![
"Projection: t2.a, t2.b, t2.c [a:UInt32, b:Utf8, c:Int32]",
" RightAnti Join: t1.a = t2.a Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
"RightAnti Join: t1.a = t2.a Filter: t2.c > Int32(1) [a:UInt32, b:Utf8, c:Int32]",
" Filter: t1.c > Int32(1) [a:UInt32, c:Int32]",
" TableScan: t1 projection=[a, c] [a:UInt32, c:Int32]",
" TableScan: t2 projection=[a, b, c] [a:UInt32, b:Utf8, c:Int32]",
];

let formatted = optimized_plan.display_indent_schema().to_string();
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ async fn aggregate_timestamps_count() -> Result<()> {
.await;

let expected = vec![
"+----------------+-----------------+-----------------+---------------+",
"| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |",
"+----------------+-----------------+-----------------+---------------+",
"| 3 | 3 | 3 | 3 |",
"+----------------+-----------------+-----------------+---------------+",
"+--------------+---------------+---------------+-------------+",
"| COUNT(nanos) | COUNT(micros) | COUNT(millis) | COUNT(secs) |",
"+--------------+---------------+---------------+-------------+",
"| 3 | 3 | 3 | 3 |",
"+--------------+---------------+---------------+-------------+",
];
assert_batches_sorted_eq!(expected, &results);

Expand Down Expand Up @@ -185,11 +185,11 @@ async fn aggregate_times_count() -> Result<()> {
.await;

let expected = vec![
"+----------------+-----------------+-----------------+---------------+",
"| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |",
"+----------------+-----------------+-----------------+---------------+",
"| 4 | 4 | 4 | 4 |",
"+----------------+-----------------+-----------------+---------------+",
"+--------------+---------------+---------------+-------------+",
"| COUNT(nanos) | COUNT(micros) | COUNT(millis) | COUNT(secs) |",
"+--------------+---------------+---------------+-------------+",
"| 4 | 4 | 4 | 4 |",
"+--------------+---------------+---------------+-------------+",
];
assert_batches_sorted_eq!(expected, &results);

Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/tests/sql/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,16 @@ async fn avro_explain() {
let expected = vec![
vec![
"logical_plan",
"Projection: COUNT(UInt8(1))\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: alltypes_plain projection=[id]",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: alltypes_plain projection=[id]",
],
vec![
"physical_plan",
"ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
\n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
\n",
],
];
Expand Down
12 changes: 5 additions & 7 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,9 @@ async fn explain_logical_plan_only() {
let expected = vec![
vec![
"logical_plan",
"Projection: COUNT(UInt8(1))\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n SubqueryAlias: t\
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
\n SubqueryAlias: t\
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))",
]];
assert_eq!(expected, actual);
}
Expand All @@ -776,9 +775,8 @@ async fn explain_physical_plan_only() {

let expected = vec![vec![
"physical_plan",
"ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
\n ProjectionExec: expr=[2 as COUNT(UInt8(1))]\
\n EmptyExec: produce_one_row=true\
"ProjectionExec: expr=[2 as COUNT(UInt8(1))]\
\n EmptyExec: produce_one_row=true\
\n",
]];
assert_eq!(expected, actual);
Expand Down
Loading

0 comments on commit cef194c

Please sign in to comment.