diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index d1a3a3aa068..2fb2a2b0417 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -365,6 +365,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.TimeAdd| |Adds interval to timestamp|true|None| spark.rapids.sql.expression.ToDegrees|`degrees`|Converts radians to degrees|true|None| spark.rapids.sql.expression.ToRadians|`radians`|Converts degrees to radians|true|None| +spark.rapids.sql.expression.ToUTCTimestamp|`to_utc_timestamp`|Render the input timestamp in UTC|true|None| spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|true|None| spark.rapids.sql.expression.TransformKeys|`transform_keys`|Transform keys in a map using a transform function|true|None| spark.rapids.sql.expression.TransformValues|`transform_values`|Transform values in a map using a transform function|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index a25c2cd073e..5af0f356627 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -6638,7 +6638,7 @@ are limited. -PS
Only timezones equivalent to UTC are supported
+PS
Only non-DST(Daylight Savings Time) timezones are supported
@@ -15441,6 +15441,74 @@ are limited. +ToUTCTimestamp +`to_utc_timestamp` +Render the input timestamp in UTC +None +project +timestamp + + + + + + + + +PS
UTC is only supported TZ for TIMESTAMP
+ + + + + + + + + + + +timezone + + + + + + + + + +PS
Only non-DST(Daylight Savings Time) timezones are supported
+ + + + + + + + + + +result + + + + + + + + +PS
UTC is only supported TZ for TIMESTAMP
+ + + + + + + + + + + ToUnixTimestamp `to_unix_timestamp` Returns the UNIX timestamp of the given time @@ -15645,6 +15713,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + UnaryMinus `negative` Negate a numeric value @@ -15735,32 +15829,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - UnaryPositive `positive` A numeric value with a + in front of it @@ -16018,6 +16086,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Upper `upper`, `ucase` String uppercase operator @@ -16112,32 +16206,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - WindowExpression Calculates a return value for every input row of a table based on a group (or "window") of rows diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 8eab7f1e231..99651750f3e 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -286,41 +286,52 @@ def test_unsupported_fallback_to_unix_timestamp(data_gen): spark, [("a", data_gen), ("b", string_gen)], length=10).selectExpr( "to_unix_timestamp(a, b)"), "ToUnixTimestamp") + +supported_timezones = ["Asia/Shanghai", "UTC", "UTC+0", "UTC-0", "GMT", "GMT+0", "GMT-0", "EST", "MST", "VST"] +unsupported_timezones = ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"] -@pytest.mark.parametrize('time_zone', ["Asia/Shanghai", "UTC", "UTC+0", "UTC-0", "GMT", "GMT+0", "GMT-0"], ids=idfn) -@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) -@tz_sensitive_test +@pytest.mark.parametrize('time_zone', supported_timezones, ids=idfn) @allow_non_gpu(*non_utc_allow) -def test_from_utc_timestamp(data_gen, time_zone): +def test_from_utc_timestamp(time_zone): assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone))) + lambda spark: unary_op_df(spark, timestamp_gen).select(f.from_utc_timestamp(f.col('a'), time_zone))) @allow_non_gpu('ProjectExec') -@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn) -@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) -@tz_sensitive_test -def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): +@pytest.mark.parametrize('time_zone', unsupported_timezones, ids=idfn) +def test_from_utc_timestamp_unsupported_timezone_fallback(time_zone): assert_gpu_fallback_collect( - lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), + lambda spark: unary_op_df(spark, timestamp_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), 'FromUTCTimestamp') -@pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn) -@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) -@tz_sensitive_test +@allow_non_gpu('ProjectExec') +def test_unsupported_fallback_from_utc_timestamp(): + time_zone_gen = StringGen(pattern="UTC") + assert_gpu_fallback_collect( + lambda spark: gen_df(spark, [("a", timestamp_gen), ("tzone", time_zone_gen)]).selectExpr( + "from_utc_timestamp(a, tzone)"), + 'FromUTCTimestamp') + @allow_non_gpu(*non_utc_allow) -def test_from_utc_timestamp_supported_timezones(data_gen, time_zone): +@pytest.mark.parametrize('time_zone', supported_timezones, ids=idfn) +def test_to_utc_timestamp(time_zone): assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone))) + lambda spark: unary_op_df(spark, timestamp_gen).select(f.to_utc_timestamp(f.col('a'), time_zone))) @allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('time_zone', unsupported_timezones, ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) -def test_unsupported_fallback_from_utc_timestamp(data_gen): - time_zone_gen = StringGen(pattern="UTC") +def test_to_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): assert_gpu_fallback_collect( - lambda spark: gen_df(spark, [("a", data_gen), ("tzone", time_zone_gen)]).selectExpr( - "from_utc_timestamp(a, tzone)"), - 'FromUTCTimestamp') + lambda spark: unary_op_df(spark, data_gen).select(f.to_utc_timestamp(f.col('a'), time_zone)), + 'ToUTCTimestamp') +@allow_non_gpu('ProjectExec') +def test_unsupported_fallback_to_utc_timestamp(): + time_zone_gen = StringGen(pattern="UTC") + assert_gpu_fallback_collect( + lambda spark: gen_df(spark, [("a", timestamp_gen), ("tzone", time_zone_gen)]).selectExpr( + "to_utc_timestamp(a, tzone)"), + 'ToUTCTimestamp') @allow_non_gpu('ProjectExec') @pytest.mark.parametrize('data_gen', [long_gen], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 2161cfb8ec1..4f4b82126fb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1798,10 +1798,21 @@ object GpuOverrides extends Logging { ExprChecks.binaryProject(TypeSig.TIMESTAMP, TypeSig.TIMESTAMP, ("timestamp", TypeSig.TIMESTAMP, TypeSig.TIMESTAMP), ("timezone", TypeSig.lit(TypeEnum.STRING) - .withPsNote(TypeEnum.STRING, "Only timezones equivalent to UTC are supported"), + .withPsNote(TypeEnum.STRING, + "Only non-DST(Daylight Savings Time) timezones are supported"), TypeSig.lit(TypeEnum.STRING))), (a, conf, p, r) => new FromUTCTimestampExprMeta(a, conf, p, r) ), + expr[ToUTCTimestamp]( + "Render the input timestamp in UTC", + ExprChecks.binaryProject(TypeSig.TIMESTAMP, TypeSig.TIMESTAMP, + ("timestamp", TypeSig.TIMESTAMP, TypeSig.TIMESTAMP), + ("timezone", TypeSig.lit(TypeEnum.STRING) + .withPsNote(TypeEnum.STRING, + "Only non-DST(Daylight Savings Time) timezones are supported"), + TypeSig.lit(TypeEnum.STRING))), + (a, conf, p, r) => new ToUTCTimestampExprMeta(a, conf, p, r) + ), expr[Pmod]( "Pmod", // Decimal support disabled https://github.com/NVIDIA/spark-rapids/issues/7553 diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 09d4a977084..4818f25343f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.shims.ShimBinaryExpression -import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, FromUnixTime, FromUTCTimestamp, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, FromUnixTime, FromUTCTimestamp, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression, ToUTCTimestamp} import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1077,15 +1077,14 @@ case class GpuFromUnixTime( override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess } - -class FromUTCTimestampExprMeta( - expr: FromUTCTimestamp, +abstract class ConvertUTCTimestampExprMetaBase[INPUT <: BinaryExpression]( + expr: INPUT, override val conf: RapidsConf, override val parent: Option[RapidsMeta[_, _, _]], rule: DataFromReplacementRule) - extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) { + extends BinaryExprMeta[INPUT](expr, conf, parent, rule) { - private[this] var timezoneId: ZoneId = null + protected[this] var timezoneId: ZoneId = null override def tagExprForGpu(): Unit = { extractStringLit(expr.right) match { @@ -1100,6 +1099,14 @@ class FromUTCTimestampExprMeta( } } } +} + +class FromUTCTimestampExprMeta( + expr: FromUTCTimestamp, + override val conf: RapidsConf, + override val parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends ConvertUTCTimestampExprMetaBase[FromUTCTimestamp](expr, conf, parent, rule) { override def convertToGpu(timestamp: Expression, timezone: Expression): GpuExpression = GpuFromUTCTimestamp(timestamp, timezone, timezoneId) @@ -1137,6 +1144,49 @@ case class GpuFromUTCTimestamp( } } +class ToUTCTimestampExprMeta( + expr: ToUTCTimestamp, + override val conf: RapidsConf, + override val parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends ConvertUTCTimestampExprMetaBase[ToUTCTimestamp](expr, conf, parent, rule) { + + override def convertToGpu(timestamp: Expression, timezone: Expression): GpuExpression = + GpuToUTCTimestamp(timestamp, timezone, timezoneId) +} + +case class GpuToUTCTimestamp( + timestamp: Expression, timezone: Expression, zoneId: ZoneId) + extends GpuBinaryExpressionArgsAnyScalar + with ImplicitCastInputTypes + with NullIntolerant { + + override def left: Expression = timestamp + override def right: Expression = timezone + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) + override def dataType: DataType = TimestampType + + override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { + if (rhs.getBase.isValid) { + if (GpuOverrides.isUTCTimezone(zoneId)) { + // For UTC timezone, just a no-op bypassing GPU computation. + lhs.getBase.incRefCount() + } else { + GpuTimeZoneDB.fromTimestampToUtcTimestamp(lhs.getBase, zoneId) + } + } else { + // All-null output column. + GpuColumnVector.columnVectorFromNull(lhs.getRowCount.toInt, dataType) + } + } + + override def doColumnar(numRows: Int, lhs: GpuScalar, rhs: GpuScalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { lhsCol => + doColumnar(lhsCol, rhs) + } + } +} + trait GpuDateMathBase extends GpuBinaryExpression with ExpectsInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType)) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala index e61ab512d1f..288bb38653f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -132,7 +132,7 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl println(s"test,type,zone,used MS") for (zoneStr <- zones) { // run 6 rounds, but ignore the first round. - for (i <- 1 to 6) { + val elapses = (1 to 6).map { i => // run on Cpu val startOnCpu = System.nanoTime() withCpuSparkSession( @@ -153,8 +153,16 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl val elapseOnGpuMS = (endOnGpu - startOnGpu) / 1000000L if (i != 1) { println(s"$testName,Gpu,$zoneStr,$elapseOnGpuMS") + (elapseOnCpuMS, elapseOnGpuMS) + } else { + (0L, 0L) // skip the first round } } + val meanCpu = elapses.map(_._1).sum / 5.0 + val meanGpu = elapses.map(_._2).sum / 5.0 + val speedup = meanCpu.toDouble / meanGpu.toDouble + println(f"$testName, $zoneStr: mean cpu time: $meanCpu%.2f ms, " + + f"mean gpu time: $meanGpu%.2f ms, speedup: $speedup%.2f x") } } @@ -173,4 +181,20 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl runAndRecordTime("from_utc_timestamp", perfTest) } + + test("test to_utc_timestamp") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.to_utc_timestamp(functions.col("c_ts"), zone) + )) + } + + runAndRecordTime("to_utc_timestamp", perfTest) + } } diff --git a/tools/generated_files/operatorsScore.csv b/tools/generated_files/operatorsScore.csv index 4d7963bc5f1..10ceb96336d 100644 --- a/tools/generated_files/operatorsScore.csv +++ b/tools/generated_files/operatorsScore.csv @@ -249,6 +249,7 @@ Tanh,4 TimeAdd,4 ToDegrees,4 ToRadians,4 +ToUTCTimestamp,4 ToUnixTimestamp,4 TransformKeys,4 TransformValues,4 diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index c81c913de78..fc32f187566 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -576,6 +576,9 @@ ToDegrees,S,`degrees`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,N ToDegrees,S,`degrees`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToRadians,S,`radians`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA ToRadians,S,`radians`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +ToUTCTimestamp,S,`to_utc_timestamp`,None,project,timestamp,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA +ToUTCTimestamp,S,`to_utc_timestamp`,None,project,timezone,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA +ToUTCTimestamp,S,`to_utc_timestamp`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA ToUnixTimestamp,S,`to_unix_timestamp`,None,project,timeExp,NA,NA,NA,NA,NA,NA,NA,S,PS,S,NA,NA,NA,NA,NA,NA,NA,NA ToUnixTimestamp,S,`to_unix_timestamp`,None,project,format,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA ToUnixTimestamp,S,`to_unix_timestamp`,None,project,result,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA