diff --git a/dask_planner/Cargo.lock b/dask_planner/Cargo.lock index 8bdfb6108..232c71a75 100644 --- a/dask_planner/Cargo.lock +++ b/dask_planner/Cargo.lock @@ -2,17 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "ahash" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" -dependencies = [ - "getrandom", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.1" @@ -58,64 +47,95 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e24e2bcd431a4aa0ff003fdd2dc21c78cfb42f31459c89d2312c2746fe17a5ac" +checksum = "aed9849f86164fad5cb66ce4732782b15f1bc97f8febab04e782c20cce9d4b6c" dependencies = [ - "ahash 0.8.1", + "ahash", "arrow-array", "arrow-buffer", + "arrow-cast", + "arrow-csv", "arrow-data", + "arrow-ipc", + "arrow-json", "arrow-schema", "arrow-select", - "bitflags", "chrono", "comfy-table", - "csv", - "flatbuffers", "half", - "hashbrown", - "indexmap", - "lazy_static", - "lexical-core", + "hashbrown 0.13.1", "multiversion", "num", "regex", "regex-syntax", - "serde_json", ] [[package]] name = "arrow-array" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9044300874385f19e77cbf90911e239bd23630d8f23bb0f948f9067998a13b7" +checksum = "6b8504cf0a6797e908eecf221a865e7d339892720587f87c8b90262863015b08" dependencies = [ - "ahash 0.8.1", + "ahash", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", "half", - "hashbrown", + "hashbrown 0.13.1", "num", ] [[package]] name = "arrow-buffer" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78476cbe9e3f808dcecab86afe42d573863c63e149c62e6e379ed2522743e626" +checksum = "d6de64a27cea684b24784647d9608314bc80f7c4d55acb44a425e05fab39d916" dependencies = [ "half", "num", ] +[[package]] +name = "arrow-cast" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bec4a54502eefe05923c385c90a005d69474fa06ca7aa2a2b123c9f9532f6178" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-csv" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7902bbf8127eac48554fe902775303377047ad49a9fd473c2b8cb399d092080" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "lazy_static", + "lexical-core", + "regex", +] + [[package]] name = "arrow-data" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d916feee158c485dad4f701cba31bc9a90a8db87d9df8e2aa8adc0c20a2bbb9" +checksum = "7e4882efe617002449d5c6b5de9ddb632339074b36df8a96ea7147072f1faa8a" dependencies = [ "arrow-buffer", "arrow-schema", @@ -123,17 +143,49 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-ipc" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa0703a6de2785828561b03a4d7793ecd333233e1b166316b4bfc7cfce55a4a7" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "28.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd23fc8c6d251f96cd63b96fece56bbb9710ce5874a627cb786e2600673595a" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "indexmap", + "num", + "serde_json", +] + [[package]] name = "arrow-schema" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f9406eb7834ca6bd8350d1baa515d18b9fcec487eddacfb62f5e19511f7bd37" +checksum = "da9f143882a80be168538a60e298546314f50f11f2a288c8d73e11108da39d26" [[package]] name = "arrow-select" -version = "26.0.0" +version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6593a01586751c74498495d2f5a01fcd438102b52965c11dd98abf4ebcacef37" +checksum = "520406331d4ad60075359524947ebd804e479816439af82bcb17f8d280d9b38c" dependencies = [ "arrow-array", "arrow-buffer", @@ -229,9 +281,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.22" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ "iana-time-zone", "num-integer", @@ -407,23 +459,22 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f1ffcbc1f040c9ab99f41db1c743d95aff267bb2e7286aaa010738b7402251" +checksum = "7b17262b899f79afdf502846d1138a8b48441afe24dc6e07c922105289248137" dependencies = [ "arrow", "chrono", - "ordered-float", "sqlparser", ] [[package]] name = "datafusion-expr" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1883d9590d303ef38fa295567e7fdb9f8f5f511fcc167412d232844678cd295c" +checksum = "533d2226b4636a1306d1f6f4ac02e436947c5d6e8bfc85f6d8f91a425c10a407" dependencies = [ - "ahash 0.8.1", + "ahash", "arrow", "datafusion-common", "log", @@ -432,9 +483,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2127d46d566ab3463d70da9675fc07b9d634be8d17e80d0e1ce79600709fe651" +checksum = "ce7ba274267b6baf1714a67727249aa56d648c8814b0f4c43387fbe6d147e619" dependencies = [ "arrow", "async-trait", @@ -442,17 +493,17 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown", + "hashbrown 0.13.1", "log", ] [[package]] name = "datafusion-physical-expr" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d108b6fe8eeb317ecad1d74619e8758de49cccc8c771b56c97962fd52eaae23" +checksum = "f35cb53e6c2f9c40accdf45aef2be7fde030ea3051b1145a059d96109e65b0bf" dependencies = [ - "ahash 0.8.1", + "ahash", "arrow", "arrow-buffer", "arrow-schema", @@ -463,12 +514,11 @@ dependencies = [ "datafusion-expr", "datafusion-row", "half", - "hashbrown", + "hashbrown 0.13.1", "itertools", "lazy_static", "md-5", "num-traits", - "ordered-float", "paste", "rand", "regex", @@ -479,9 +529,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43537b6377d506e4788bf21e9ed943340e076b48ca4d077e6ea4405ca5e54a1c" +checksum = "27c77b1229ae5cf6a6e0e2ba43ed4e98131dbf1cc4a97fad17c94230b32e0812" dependencies = [ "arrow", "datafusion-common", @@ -491,11 +541,11 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "14.0.0" +version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244d08d4710e1088d9c0949c9b5b8d68d9cf2cde7203134a4cc389e870fe2354" +checksum = "569423fa8a50db39717080949e3b4f8763582b87baf393cc3fcf27cc21467ba7" dependencies = [ - "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", "sqlparser", @@ -600,8 +650,14 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hashbrown" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ - "ahash 0.7.6", + "ahash", ] [[package]] @@ -665,7 +721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -998,15 +1054,6 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" -[[package]] -name = "ordered-float" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d84eb1409416d254e4a9c8fa56cc24701755025b458f0fcd8e59e1f5f40c23bf" -dependencies = [ - "num-traits", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -1268,9 +1315,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "sqlparser" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86be66ea0b2b22749cfa157d16e2e84bf793e626a3375f4d378dc289fa03affb" +checksum = "aba319938d4bfe250a769ac88278b629701024fe16f34257f9563bc628081970" dependencies = [ "log", ] diff --git a/dask_planner/Cargo.toml b/dask_planner/Cargo.toml index 5cdda867d..f7dcceb51 100644 --- a/dask_planner/Cargo.toml +++ b/dask_planner/Cargo.toml @@ -9,12 +9,12 @@ edition = "2021" rust-version = "1.62" [dependencies] -arrow = { version = "26.0.0", features = ["prettyprint"] } +arrow = { version = "28.0.0", features = ["prettyprint"] } async-trait = "0.1.60" -datafusion-common = "14.0.0" -datafusion-expr = "14.0.0" -datafusion-optimizer = "14.0.0" -datafusion-sql = "14.0.0" +datafusion-common = "15.0.0" +datafusion-expr = "15.0.0" +datafusion-optimizer = "15.0.0" +datafusion-sql = "15.0.0" env_logger = "0.10" log = "^0.4" mimalloc = { version = "*", default-features = false } diff --git a/dask_planner/src/expression.rs b/dask_planner/src/expression.rs index 1e03baba5..497c5141d 100644 --- a/dask_planner/src/expression.rs +++ b/dask_planner/src/expression.rs @@ -484,7 +484,10 @@ impl PyExpr { ScalarValue::LargeBinary(_value) => "LargeBinary", ScalarValue::Date32(_value) => "Date32", ScalarValue::Date64(_value) => "Date64", - ScalarValue::Time64(_value) => "Time64", + ScalarValue::Time32Second(_value) => "Time32", + ScalarValue::Time32Millisecond(_value) => "Time32", + ScalarValue::Time64Microsecond(_value) => "Time64", + ScalarValue::Time64Nanosecond(_value) => "Time64", ScalarValue::Null => "Null", ScalarValue::TimestampSecond(..) => "TimestampSecond", ScalarValue::TimestampMillisecond(..) => "TimestampMillisecond", @@ -594,7 +597,7 @@ impl PyExpr { } #[pyo3(name = "getDecimal128Value")] - pub fn decimal_128_value(&mut self) -> PyResult<(Option, u8, u8)> { + pub fn decimal_128_value(&mut self) -> PyResult<(Option, u8, i8)> { match self.get_scalar_value()? { ScalarValue::Decimal128(value, precision, scale) => Ok((*value, *precision, *scale)), other => Err(unexpected_literal_value(other)), @@ -653,7 +656,7 @@ impl PyExpr { #[pyo3(name = "getTime64Value")] pub fn time_64_value(&self) -> PyResult> { - extract_scalar_value!(self, Time64) + extract_scalar_value!(self, Time64Nanosecond) } #[pyo3(name = "getTimestampValue")] diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index fd581044c..73b967c12 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -12,7 +12,7 @@ pub mod types; use std::{collections::HashMap, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; -use datafusion_common::{DFSchema, DataFusionError}; +use datafusion_common::{DFSchema, DataFusionError, ScalarValue}; use datafusion_expr::{ logical_plan::Extension, AccumulatorFunctionImplementation, @@ -372,6 +372,10 @@ impl ContextProvider for DaskSQLContext { fn get_variable_type(&self, _: &[String]) -> Option { unimplemented!("RUST: get_variable_type is not yet implemented for DaskSQLContext") } + + fn get_config_option(&self, _option: &str) -> Option { + None + } } #[pymethods] diff --git a/dask_planner/src/sql/logical.rs b/dask_planner/src/sql/logical.rs index 55bccb905..d4e6614ff 100644 --- a/dask_planner/src/sql/logical.rs +++ b/dask_planner/src/sql/logical.rs @@ -31,6 +31,7 @@ pub mod show_models; pub mod show_schema; pub mod show_tables; pub mod sort; +pub mod subquery_alias; pub mod table_scan; pub mod use_schema; pub mod window; @@ -137,6 +138,11 @@ impl PyLogicalPlan { to_py_plan(self.current_node.as_ref()) } + /// LogicalPlan::SubqueryAlias as PySubqueryAlias + pub fn subquery_alias(&self) -> PyResult { + to_py_plan(self.current_node.as_ref()) + } + /// LogicalPlan::Window as PyWindow pub fn window(&self) -> PyResult { to_py_plan(self.current_node.as_ref()) diff --git a/dask_planner/src/sql/logical/subquery_alias.rs b/dask_planner/src/sql/logical/subquery_alias.rs new file mode 100644 index 000000000..12ff5b3c1 --- /dev/null +++ b/dask_planner/src/sql/logical/subquery_alias.rs @@ -0,0 +1,30 @@ +use datafusion_expr::{logical_plan::SubqueryAlias, LogicalPlan}; +use pyo3::prelude::*; + +use crate::sql::exceptions::py_type_err; + +#[pyclass(name = "SubqueryAlias", module = "dask_planner", subclass)] +#[derive(Clone)] +pub struct PySubqueryAlias { + subquery_alias: SubqueryAlias, +} + +#[pymethods] +impl PySubqueryAlias { + /// Returns a Vec of the sort expressions + #[pyo3(name = "getAlias")] + pub fn alias(&self) -> PyResult { + Ok(self.subquery_alias.alias.clone()) + } +} + +impl TryFrom for PySubqueryAlias { + type Error = PyErr; + + fn try_from(logical_plan: LogicalPlan) -> Result { + match logical_plan { + LogicalPlan::SubqueryAlias(subquery_alias) => Ok(PySubqueryAlias { subquery_alias }), + _ => Err(py_type_err("unexpected plan")), + } + } +} diff --git a/dask_planner/src/sql/optimizer.rs b/dask_planner/src/sql/optimizer.rs index 2f2843763..8afc46b64 100644 --- a/dask_planner/src/sql/optimizer.rs +++ b/dask_planner/src/sql/optimizer.rs @@ -6,17 +6,17 @@ use datafusion_optimizer::{ common_subexpr_eliminate::CommonSubexprEliminate, decorrelate_where_exists::DecorrelateWhereExists, decorrelate_where_in::DecorrelateWhereIn, + eliminate_cross_join::EliminateCrossJoin, // TODO: need to handle EmptyRelation for GPU cases // eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit, + eliminate_outer_join::EliminateOuterJoin, filter_null_join_keys::FilterNullJoinKeys, - filter_push_down::FilterPushDown, inline_table_scan::InlineTableScan, limit_push_down::LimitPushDown, optimizer::{Optimizer, OptimizerRule}, projection_push_down::ProjectionPushDown, - reduce_cross_join::ReduceCrossJoin, - reduce_outer_join::ReduceOuterJoin, + push_down_filter::PushDownFilter, rewrite_disjunctive_predicate::RewriteDisjunctivePredicate, scalar_subquery_to_join::ScalarSubqueryToJoin, simplify_expressions::SimplifyExpressions, @@ -56,13 +56,13 @@ impl DaskSqlOptimizer { Arc::new(SimplifyExpressions::new()), // TODO: need to handle EmptyRelation for GPU cases // Arc::new(EliminateFilter::new()), - Arc::new(ReduceCrossJoin::new()), + Arc::new(EliminateCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), Arc::new(RewriteDisjunctivePredicate::new()), Arc::new(FilterNullJoinKeys::default()), - Arc::new(ReduceOuterJoin::new()), - Arc::new(FilterPushDown::new()), + Arc::new(EliminateOuterJoin::new()), + Arc::new(PushDownFilter::new()), Arc::new(LimitPushDown::new()), // Dask-SQL specific optimizations Arc::new(EliminateAggDistinct::new()), @@ -102,7 +102,7 @@ mod tests { use std::{any::Any, collections::HashMap, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::{DataFusionError, Result}; + use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource}; use datafusion_sql::{ planner::{ContextProvider, SqlToRel}, @@ -122,14 +122,15 @@ mod tests { AND (cast('2002-05-08' as date) + interval '5 days')\ )"; let plan = test_sql(sql)?; - let expected = - "Projection: test.col_int32\n Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\ - \n CrossJoin:\ - \n TableScan: test projection=[col_int32]\ - \n Projection: AVG(test.col_int32) AS __value, alias=__sq_1\ - \n Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\ - \n Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\ - \n TableScan: test projection=[col_int32, col_utf8]"; + let expected = r#"Projection: test.col_int32 + Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value + CrossJoin: + TableScan: test projection=[col_int32] + SubqueryAlias: __sq_1 + Projection: AVG(test.col_int32) AS __value + Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]] + Filter: test.col_utf8 >= Utf8("2002-05-08") AND test.col_utf8 <= Utf8("2002-05-13") + TableScan: test projection=[col_int32, col_utf8]"#; assert_eq!(expected, format!("{:?}", plan)); Ok(()) } @@ -189,6 +190,10 @@ mod tests { fn get_variable_type(&self, _variable_names: &[String]) -> Option { None } + + fn get_config_option(&self, _option: &str) -> Option { + None + } } struct MyTableSource { diff --git a/dask_planner/src/sql/optimizer/eliminate_agg_distinct.rs b/dask_planner/src/sql/optimizer/eliminate_agg_distinct.rs index 411e0a25a..cd0539b73 100644 --- a/dask_planner/src/sql/optimizer/eliminate_agg_distinct.rs +++ b/dask_planner/src/sql/optimizer/eliminate_agg_distinct.rs @@ -282,7 +282,6 @@ fn create_plan( LogicalPlan::Projection(Projection::try_new( projected_cols, Arc::new(second_aggregate), - None, )?) }; @@ -349,7 +348,6 @@ fn create_plan( LogicalPlan::Projection(Projection::try_new( projected_cols, Arc::new(second_aggregate), - None, )?) }; diff --git a/dask_sql/physical/rel/logical/subquery_alias.py b/dask_sql/physical/rel/logical/subquery_alias.py index 0bf96949f..2473167d7 100644 --- a/dask_sql/physical/rel/logical/subquery_alias.py +++ b/dask_sql/physical/rel/logical/subquery_alias.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from dask_sql.datacontainer import DataContainer from dask_sql.physical.rel.base import BaseRelPlugin if TYPE_CHECKING: @@ -16,4 +17,20 @@ class SubqueryAlias(BaseRelPlugin): def convert(self, rel: "LogicalPlan", context: "dask_sql.Context"): (dc,) = self.assert_inputs(rel, 1, context) - return dc + + cc = dc.column_container + + alias = rel.subquery_alias().getAlias() + + return DataContainer( + dc.df, + cc.rename( + { + col: renamed_col + for col, renamed_col in zip( + cc.columns, + (f"{alias}.{col.split('.')[-1]}" for col in cc.columns), + ) + } + ), + ) diff --git a/tests/unit/test_queries.py b/tests/unit/test_queries.py index becb378a5..b006e21c2 100644 --- a/tests/unit/test_queries.py +++ b/tests/unit/test_queries.py @@ -29,7 +29,6 @@ 54, 57, 58, - 59, 62, 67, 69,