diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 66a5671f3f39..bb572b7d5cec 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -84,8 +84,25 @@ impl DataFrame { /// Create a physical plan pub async fn create_physical_plan(&self) -> Result> { - let state = self.session_state.read().clone(); - state.create_physical_plan(&self.plan).await + // this function is copied from SessionContext function of the + // same name + let state_cloned = { + let mut state = self.session_state.write(); + state.execution_props.start_execution(); + + // We need to clone `state` to release the lock that is not `Send`. We could + // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to + // propagate async even to the `LogicalPlan` building methods. + // Cloning `state` here is fine as we then pass it as immutable `&state`, which + // means that we avoid write consistency issues as the cloned version will not + // be written to. As for eventual modifications that would be applied to the + // original state after it has been cloned, they will not be picked up by the + // clone but that is okay, as it is equivalent to postponing the state update + // by keeping the lock until the end of the function scope. + state.clone() + }; + + state_cloned.create_physical_plan(&self.plan).await } /// Filter the DataFrame by column. Returns a new DataFrame only containing the diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index d7ec155d17fd..c93d83f2789a 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -80,12 +80,15 @@ impl TableProvider for ViewTable { async fn scan( &self, - ctx: &SessionState, + state: &SessionState, _projection: &Option>, _filters: &[Expr], _limit: Option, ) -> Result> { - ctx.create_physical_plan(&self.logical_plan).await + // clone state and start_execution so that now() works in views + let mut state_cloned = state.clone(); + state_cloned.execution_props.start_execution(); + state_cloned.create_physical_plan(&self.logical_plan).await } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 64403715cf10..9c9ed952625b 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -365,20 +365,18 @@ impl SessionContext { match (or_replace, view) { (true, Ok(_)) => { self.deregister_table(name.as_str())?; - let plan = self.optimize(&input)?; let table = - Arc::new(ViewTable::try_new(plan.clone(), definition)?); + Arc::new(ViewTable::try_new((*input).clone(), definition)?); self.register_table(name.as_str(), table)?; - Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + Ok(Arc::new(DataFrame::new(self.state.clone(), &input))) } (_, Err(_)) => { - let plan = self.optimize(&input)?; let table = - Arc::new(ViewTable::try_new(plan.clone(), definition)?); + Arc::new(ViewTable::try_new((*input).clone(), definition)?); self.register_table(name.as_str(), table)?; - Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + Ok(Arc::new(DataFrame::new(self.state.clone(), &input))) } (false, Ok(_)) => Err(DataFusionError::Execution(format!( "Table '{:?}' already exists", diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index 87a02ae0118c..a79e9f5e001a 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -50,10 +50,10 @@ pub use datafusion_expr::{ StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, }, - lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr, - nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat, - replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, - sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, + lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif, + octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace, + reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, sha512, + signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator, }; diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 5e5f38ffc5b5..4613be7cdcbc 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -120,12 +120,12 @@ pub struct ColumnStatistics { /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. /// -/// Each `ExecutionPlan` is Partition-aware and is responsible for +/// Each `ExecutionPlan` is partition-aware and is responsible for /// creating the actual `async` [`SendableRecordBatchStream`]s /// of [`RecordBatch`] that incrementally compute the operator's /// output from its input partition. /// -/// [`ExecutionPlan`] can be displayed in an simplified form using the +/// [`ExecutionPlan`] can be displayed in a simplified form using the /// return value from [`displayable`] in addition to the (normally /// quite verbose) `Debug` output. pub trait ExecutionPlan: Debug + Send + Sync { @@ -168,7 +168,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// The default implementation returns `true` /// /// WARNING: if you override this default and return `false`, your - /// operator can not rely on datafusion preserving the input order + /// operator can not rely on DataFusion preserving the input order /// as it will likely not. fn relies_on_input_order(&self) -> bool { true @@ -200,7 +200,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// parallelism may outweigh any benefits /// /// The default implementation returns `true` unless this operator - /// has signalled it requiers a single child input partition. + /// has signalled it requires a single child input partition. fn benefits_from_input_partitioning(&self) -> bool { // By default try to maximize parallelism with more CPUs if // possible diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index 9f9004d6bc6e..57900bd2951c 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -436,31 +436,93 @@ async fn test_current_timestamp_expressions() -> Result<()> { } #[tokio::test] -async fn test_current_timestamp_expressions_non_optimized() -> Result<()> { - let t1 = chrono::Utc::now().timestamp(); +async fn test_now_in_same_stmt_using_sql_function() -> Result<()> { let ctx = SessionContext::new(); - let sql = "SELECT NOW(), NOW() as t2"; - let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let df1 = ctx.sql("select now(), now() as now2").await?; + let result = result_vec(&df1.collect().await?); + assert_eq!(result[0][0], result[0][1]); - let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).await.expect(&msg); + Ok(()) +} - let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); - let task_ctx = ctx.task_ctx(); - let res = collect(plan, task_ctx).await.expect(&msg); - let actual = result_vec(&res); +#[tokio::test] +async fn test_now_across_statements() -> Result<()> { + let ctx = SessionContext::new(); - let res1 = actual[0][0].as_str(); - let res2 = actual[0][1].as_str(); - let t3 = chrono::Utc::now().timestamp(); - let t2_naive = - chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d %H:%M:%S%.6f").unwrap(); + let actual1 = execute(&ctx, "SELECT NOW()").await; + let res1 = actual1[0][0].as_str(); - let t2 = t2_naive.timestamp(); - assert!(t1 <= t2 && t2 <= t3); - assert_eq!(res2, res1); + let actual2 = execute(&ctx, "SELECT NOW()").await; + let res2 = actual2[0][0].as_str(); + + assert!(res1 < res2); + + Ok(()) +} + +#[tokio::test] +async fn test_now_across_statements_using_sql_function() -> Result<()> { + let ctx = SessionContext::new(); + + let df1 = ctx.sql("select now()").await?; + let rb1 = df1.collect().await?; + let result1 = result_vec(&rb1); + let res1 = result1[0][0].as_str(); + + let df2 = ctx.sql("select now()").await?; + let rb2 = df2.collect().await?; + let result2 = result_vec(&rb2); + let res2 = result2[0][0].as_str(); + + assert!(res1 < res2); + + Ok(()) +} + +#[tokio::test] +async fn test_now_dataframe_api() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.sql("select 1").await?; // use this to get a DataFrame + let df = df.select(vec![now(), now().alias("now2")])?; + let result = result_vec(&df.collect().await?); + assert_eq!(result[0][0], result[0][1]); + + Ok(()) +} + +#[tokio::test] +async fn test_now_dataframe_api_across_statements() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx.sql("select 1").await?; // use this to get a DataFrame + let df = df.select(vec![now()])?; + let result = result_vec(&df.collect().await?); + + let df = ctx.sql("select 1").await?; + let df = df.select(vec![now()])?; + let result2 = result_vec(&df.collect().await?); + + assert_ne!(result[0][0], result2[0][0]); + + Ok(()) +} + +#[tokio::test] +async fn test_now_in_view() -> Result<()> { + let ctx = SessionContext::new(); + let _df = ctx + .sql("create or replace view test_now as select now()") + .await? + .collect() + .await?; + + let df = ctx.sql("select * from test_now").await?; + let result = result_vec(&df.collect().await?); + + let df1 = ctx.sql("select * from test_now").await?; + let result2 = result_vec(&df1.collect().await?); + + assert_ne!(result[0][0], result2[0][0]); Ok(()) } diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index d23a14ced17b..9b51273a0fc4 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -367,10 +367,8 @@ nary_scalar_expr!(Btrim, btrim); //there is a func concat_ws before, so use concat_ws_expr as name.c nary_scalar_expr!(ConcatWithSeparator, concat_ws_expr); nary_scalar_expr!(Concat, concat_expr); -nary_scalar_expr!(Now, now_expr); // date functions -unary_scalar_expr!(Now, now, "current time"); //TODO this is not a unary expression https://github.com/apache/arrow-datafusion/issues/3069 scalar_expr!(DatePart, date_part, part, date); scalar_expr!(DateTrunc, date_trunc, part, date); scalar_expr!(DateBin, date_bin, stride, source, origin); @@ -398,6 +396,15 @@ pub fn coalesce(args: Vec) -> Expr { } } +/// Returns current timestamp in nanoseconds, using the same value for all instances of now() in +/// same statement. +pub fn now() -> Expr { + Expr::ScalarFunction { + fun: BuiltinScalarFunction::Now, + args: vec![], + } +} + /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression. pub fn case(expr: Expr) -> CaseBuilder { CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None) @@ -564,7 +571,6 @@ mod test { test_unary_scalar_expr!(Atan, atan); test_unary_scalar_expr!(Floor, floor); test_unary_scalar_expr!(Ceil, ceil); - test_unary_scalar_expr!(Now, now); test_unary_scalar_expr!(Round, round); test_unary_scalar_expr!(Trunc, trunc); test_unary_scalar_expr!(Abs, abs); diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index e1e0fd82280f..bd33b8bc8baa 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -170,8 +170,8 @@ pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result { /// specified timestamp. /// /// The semantics of `now()` require it to return the same value -/// whenever it is called in a query. This this value is chosen during -/// planning time and bound into a closure that +/// wherever it appears within a single statement. This value is +/// chosen during planning time. pub fn make_now( now_ts: DateTime, ) -> impl Fn(&[ColumnarValue]) -> Result { diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 97b21c4cdcbb..ca22eb436ba2 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -36,7 +36,7 @@ use datafusion_expr::{ character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, date_bin, date_part, date_trunc, digest, exp, floor, from_unixtime, left, ln, log10, log2, logical_plan::{PlanType, StringifiedPlan}, - lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match, + lower, lpad, ltrim, md5, now, nullif, octet_length, power, random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan, to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, @@ -1117,12 +1117,7 @@ pub fn parse_expr( ScalarFunction::ToTimestampSeconds => { Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?)) } - ScalarFunction::Now => Ok(now_expr( - args.to_owned() - .iter() - .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), + ScalarFunction::Now => Ok(now()), ScalarFunction::Translate => Ok(translate( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 0791cdf3af83..11725f90d93d 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -246,7 +246,7 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut `extract(field FROM source)` - The `extract` function retrieves subfields such as year or hour from date/time values. - `source` must be a value expression of type timestamp, Data32, or Data64. `field` is an identifier that selects what field to extract from the source value. + `source` must be a value expression of type timestamp, Date32, or Date64. `field` is an identifier that selects what field to extract from the source value. The `extract` function returns values of type u32. - `year` :`extract(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 2020` - `month`:`extract(month FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 9` @@ -273,7 +273,8 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut ### `now` -current time +Returns current time as `Timestamp(Nanoseconds, UTC)`. Returns same value for the function +wherever it appears in the statement, using a value chosen at planning time. ## Other Functions