Skip to content

Commit

Permalink
[ballista] support date_part and date_turnc ser/de, pass tpch 7 (#840)
Browse files Browse the repository at this point in the history
  • Loading branch information
QP Hou authored Aug 9, 2021
1 parent 30cc674 commit 0125451
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 84 deletions.
17 changes: 9 additions & 8 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,19 @@ enum ScalarFunction {
TOTIMESTAMP = 24;
ARRAY = 25;
NULLIF = 26;
DATETRUNC = 27;
MD5 = 28;
SHA224 = 29;
SHA256 = 30;
SHA384 = 31;
SHA512 = 32;
LN = 33;
DATEPART = 27;
DATETRUNC = 28;
MD5 = 29;
SHA224 = 30;
SHA256 = 31;
SHA384 = 32;
SHA512 = 33;
LN = 34;
}

message ScalarFunctionNode {
ScalarFunction fun = 1;
repeated LogicalExprNode expr = 2;
repeated LogicalExprNode args = 2;
}

enum AggregateFunction {
Expand Down
99 changes: 40 additions & 59 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,77 +988,58 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
expr.fun
))
})?;
let args = &expr.args;

match scalar_function {
protobuf::ScalarFunction::Sqrt => {
Ok(sqrt((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Sin => Ok(sin((&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Cos => Ok(cos((&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Tan => Ok(tan((&expr.expr[0]).try_into()?)),
// protobuf::ScalarFunction::Asin => Ok(asin(&expr.expr[0]).try_into()?)),
// protobuf::ScalarFunction::Acos => Ok(acos(&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Atan => {
Ok(atan((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Exp => Ok(exp((&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Log2 => {
Ok(log2((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Ln => Ok(ln((&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Log10 => {
Ok(log10((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Floor => {
Ok(floor((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Ceil => {
Ok(ceil((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Round => {
Ok(round((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Trunc => {
Ok(trunc((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Abs => Ok(abs((&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Sqrt => Ok(sqrt((&args[0]).try_into()?)),
protobuf::ScalarFunction::Sin => Ok(sin((&args[0]).try_into()?)),
protobuf::ScalarFunction::Cos => Ok(cos((&args[0]).try_into()?)),
protobuf::ScalarFunction::Tan => Ok(tan((&args[0]).try_into()?)),
// protobuf::ScalarFunction::Asin => Ok(asin(&args[0]).try_into()?)),
// protobuf::ScalarFunction::Acos => Ok(acos(&args[0]).try_into()?)),
protobuf::ScalarFunction::Atan => Ok(atan((&args[0]).try_into()?)),
protobuf::ScalarFunction::Exp => Ok(exp((&args[0]).try_into()?)),
protobuf::ScalarFunction::Log2 => Ok(log2((&args[0]).try_into()?)),
protobuf::ScalarFunction::Ln => Ok(ln((&args[0]).try_into()?)),
protobuf::ScalarFunction::Log10 => Ok(log10((&args[0]).try_into()?)),
protobuf::ScalarFunction::Floor => Ok(floor((&args[0]).try_into()?)),
protobuf::ScalarFunction::Ceil => Ok(ceil((&args[0]).try_into()?)),
protobuf::ScalarFunction::Round => Ok(round((&args[0]).try_into()?)),
protobuf::ScalarFunction::Trunc => Ok(trunc((&args[0]).try_into()?)),
protobuf::ScalarFunction::Abs => Ok(abs((&args[0]).try_into()?)),
protobuf::ScalarFunction::Signum => {
Ok(signum((&expr.expr[0]).try_into()?))
Ok(signum((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Octetlength => {
Ok(length((&expr.expr[0]).try_into()?))
}
// // protobuf::ScalarFunction::Concat => Ok(concat((&expr.expr[0]).try_into()?)),
protobuf::ScalarFunction::Lower => {
Ok(lower((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Upper => {
Ok(upper((&expr.expr[0]).try_into()?))
}
protobuf::ScalarFunction::Trim => {
Ok(trim((&expr.expr[0]).try_into()?))
Ok(length((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Ltrim => {
Ok(ltrim((&expr.expr[0]).try_into()?))
// // protobuf::ScalarFunction::Concat => Ok(concat((&args[0]).try_into()?)),
protobuf::ScalarFunction::Lower => Ok(lower((&args[0]).try_into()?)),
protobuf::ScalarFunction::Upper => Ok(upper((&args[0]).try_into()?)),
protobuf::ScalarFunction::Trim => Ok(trim((&args[0]).try_into()?)),
protobuf::ScalarFunction::Ltrim => Ok(ltrim((&args[0]).try_into()?)),
protobuf::ScalarFunction::Rtrim => Ok(rtrim((&args[0]).try_into()?)),
// protobuf::ScalarFunction::Totimestamp => Ok(to_timestamp((&args[0]).try_into()?)),
// protobuf::ScalarFunction::Array => Ok(array((&args[0]).try_into()?)),
// // protobuf::ScalarFunction::Nullif => Ok(nulli((&args[0]).try_into()?)),
protobuf::ScalarFunction::Datepart => {
Ok(date_part((&args[0]).try_into()?, (&args[1]).try_into()?))
}
protobuf::ScalarFunction::Rtrim => {
Ok(rtrim((&expr.expr[0]).try_into()?))
protobuf::ScalarFunction::Datetrunc => {
Ok(date_trunc((&args[0]).try_into()?, (&args[1]).try_into()?))
}
// protobuf::ScalarFunction::Totimestamp => Ok(to_timestamp((&expr.expr[0]).try_into()?)),
// protobuf::ScalarFunction::Array => Ok(array((&expr.expr[0]).try_into()?)),
// // protobuf::ScalarFunction::Nullif => Ok(nulli((&expr.expr[0]).try_into()?)),
// protobuf::ScalarFunction::Datetrunc => Ok(date_trunc((&expr.expr[0]).try_into()?)),
// protobuf::ScalarFunction::Md5 => Ok(md5((&expr.expr[0]).try_into()?)),
// protobuf::ScalarFunction::Md5 => Ok(md5((&args[0]).try_into()?)),
protobuf::ScalarFunction::Sha224 => {
Ok(sha224((&expr.expr[0]).try_into()?))
Ok(sha224((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Sha256 => {
Ok(sha256((&expr.expr[0]).try_into()?))
Ok(sha256((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Sha384 => {
Ok(sha384((&expr.expr[0]).try_into()?))
Ok(sha384((&args[0]).try_into()?))
}
protobuf::ScalarFunction::Sha512 => {
Ok(sha512((&expr.expr[0]).try_into()?))
Ok(sha512((&args[0]).try_into()?))
}
_ => Err(proto_error(
"Protobuf deserialization error: Unsupported scalar function",
Expand Down Expand Up @@ -1119,10 +1100,10 @@ impl TryInto<Field> for &protobuf::Field {
}
}

use datafusion::physical_plan::datetime_expressions::{date_trunc, to_timestamp};
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
array, length, lower, ltrim, md5, rtrim, sha224, sha256, sha384, sha512, trim, upper,
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
sha384, sha512, trim, upper,
};
use std::convert::TryFrom;

Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::ScalarVariable(_) => unimplemented!(),
Expr::ScalarFunction { ref fun, ref args } => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let expr: Vec<protobuf::LogicalExprNode> = args
let args: Vec<protobuf::LogicalExprNode> = args
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<protobuf::LogicalExprNode>, BallistaError>>()?;
Expand All @@ -1074,7 +1074,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
protobuf::logical_expr_node::ExprType::ScalarFunction(
protobuf::ScalarFunctionNode {
fun: fun.into(),
expr,
args,
},
),
),
Expand Down Expand Up @@ -1374,6 +1374,7 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
}
BuiltinScalarFunction::Array => Ok(protobuf::ScalarFunction::Array),
BuiltinScalarFunction::NullIf => Ok(protobuf::ScalarFunction::Nullif),
BuiltinScalarFunction::DatePart => Ok(protobuf::ScalarFunction::Datepart),
BuiltinScalarFunction::DateTrunc => Ok(protobuf::ScalarFunction::Datetrunc),
BuiltinScalarFunction::MD5 => Ok(protobuf::ScalarFunction::Md5),
BuiltinScalarFunction::SHA224 => Ok(protobuf::ScalarFunction::Sha224),
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Totimestamp => BuiltinScalarFunction::ToTimestamp,
ScalarFunction::Array => BuiltinScalarFunction::Array,
ScalarFunction::Nullif => BuiltinScalarFunction::NullIf,
ScalarFunction::Datepart => BuiltinScalarFunction::DatePart,
ScalarFunction::Datetrunc => BuiltinScalarFunction::DateTrunc,
ScalarFunction::Md5 => BuiltinScalarFunction::MD5,
ScalarFunction::Sha224 => BuiltinScalarFunction::SHA224,
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set -e
# This bash script is meant to be run inside the docker-compose environment. Check the README for instructions

cd /
for query in 1 3 5 6 10 12
for query in 1 3 5 6 7 10 12
do
/tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug
done
1 change: 1 addition & 0 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ mod tests {
test_round_trip!(q3, 3);
test_round_trip!(q5, 5);
test_round_trip!(q6, 6);
test_round_trip!(q7, 7);
test_round_trip!(q10, 10);
test_round_trip!(q12, 12);
}
Expand Down
19 changes: 18 additions & 1 deletion datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,20 @@ macro_rules! unary_scalar_expr {
};
}

// generate methods for creating the supported unary expressions
/// Create an convenience function representing a /binaryunary scalar function
macro_rules! binary_scalar_expr {
($ENUM:ident, $FUNC:ident) => {
#[doc = "this scalar function is not documented yet"]
pub fn $FUNC(arg1: Expr, arg2: Expr) -> Expr {
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::$ENUM,
args: vec![arg1, arg2],
}
}
};
}

// generate methods for creating the supported unary/binary expressions

// math functions
unary_scalar_expr!(Sqrt, sqrt);
Expand Down Expand Up @@ -1478,6 +1491,10 @@ unary_scalar_expr!(Translate, translate);
unary_scalar_expr!(Trim, trim);
unary_scalar_expr!(Upper, upper);

// date functions
binary_scalar_expr!(DatePart, date_part);
binary_scalar_expr!(DateTrunc, date_trunc);

/// returns an array of fixed size with each argument on it.
pub fn array(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
Expand Down
15 changes: 8 additions & 7 deletions datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ pub use display::display_schema;
pub use expr::{
abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length, btrim, case,
ceil, character_length, chr, col, columnize_expr, combine_filters, concat, concat_ws,
cos, count, count_distinct, create_udaf, create_udf, exp, exprlist_to_fields, floor,
in_list, initcap, left, length, lit, ln, log10, log2, lower, lpad, ltrim, max, md5,
min, normalize_col, normalize_cols, now, octet_length, or, random, regexp_match,
regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim,
sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols,
upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, exp,
exprlist_to_fields, floor, in_list, initcap, left, length, lit, ln, log10, log2,
lower, lpad, ltrim, max, md5, min, normalize_col, normalize_cols, now, octet_length,
or, random, regexp_match, regexp_replace, repeat, replace, replace_col, reverse,
right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part,
sqrt, starts_with, strpos, substr, sum, tan, to_hex, translate, trim, trunc,
unnormalize_col, unnormalize_cols, upper, when, Column, Expr, ExprRewriter,
ExpressionVisitor, Literal, Recursion,
};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ impl FromStr for BuiltinScalarFunction {
"concat" => BuiltinScalarFunction::Concat,
"concat_ws" => BuiltinScalarFunction::ConcatWithSeparator,
"chr" => BuiltinScalarFunction::Chr,
"date_part" => BuiltinScalarFunction::DatePart,
"date_trunc" => BuiltinScalarFunction::DateTrunc,
"date_part" | "datepart" => BuiltinScalarFunction::DatePart,
"date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
"initcap" => BuiltinScalarFunction::InitCap,
"left" => BuiltinScalarFunction::Left,
"length" => BuiltinScalarFunction::CharacterLength,
Expand Down
9 changes: 5 additions & 4 deletions datafusion/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ pub use crate::dataframe::DataFrame;
pub use crate::execution::context::{ExecutionConfig, ExecutionContext};
pub use crate::logical_plan::{
array, ascii, avg, bit_length, btrim, character_length, chr, col, concat, concat_ws,
count, create_udf, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5,
min, now, octet_length, random, regexp_replace, repeat, replace, reverse, right,
rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr,
sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
count, create_udf, date_part, date_trunc, in_list, initcap, left, length, lit, lower,
lpad, ltrim, max, md5, min, now, octet_length, random, regexp_replace, repeat,
replace, reverse, right, rpad, rtrim, sha224, sha256, sha384, sha512, split_part,
starts_with, strpos, substr, sum, to_hex, translate, trim, upper, Column, JoinType,
Partitioning,
};
pub use crate::physical_plan::csv::CsvReadOptions;

0 comments on commit 0125451

Please sign in to comment.