From 1afead7c641b67baa747870bc11f509105ae5f46 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 21 Nov 2023 12:27:29 -0700 Subject: [PATCH 01/25] Specify nullable=False when generating filter values in dpp tests (#9818) * Specify nullable=False when generating filter values in dpp tests * signoff Signed-off-by: Andy Grove * add comment --------- Signed-off-by: Andy Grove --- integration_tests/src/main/python/dpp_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/dpp_test.py b/integration_tests/src/main/python/dpp_test.py index d3a33401c63..f56bb603ac4 100644 --- a/integration_tests/src/main/python/dpp_test.py +++ b/integration_tests/src/main/python/dpp_test.py @@ -28,8 +28,10 @@ def fn(spark): ('skey', IntegerGen(nullable=False, min_val=0, max_val=4, special_cases=[])), ('ex_key', IntegerGen(nullable=False, min_val=0, max_val=3, special_cases=[])), ('value', int_gen), + # specify nullable=False for `filter` to avoid generating invalid SQL with + # expression `filter = None` (https://github.com/NVIDIA/spark-rapids/issues/9817) ('filter', RepeatSeqGen( - IntegerGen(min_val=0, max_val=length, special_cases=[]), length=length // 20)) + IntegerGen(min_val=0, max_val=length, special_cases=[], nullable=False), length=length // 20)) ], length) df.cache() df.write.format(table_format) \ From f354ddfd89c3bd6ee2a9edb1db53dced247af9c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 21 Nov 2023 15:59:07 -0700 Subject: [PATCH 02/25] Support timestamp in `from_json` [databricks] (#9720) * Support timestamp in from_json * fix shims * fix shims * signoff Signed-off-by: Andy Grove * improve tests * fix 321db shim * update compatibility guide --------- Signed-off-by: Andy Grove --- docs/compatibility.md | 10 ++- docs/supported_ops.md | 4 +- .../src/main/python/json_test.py | 84 ++++++++++++++++++- .../com/nvidia/spark/rapids/GpuCast.scala | 4 +- .../nvidia/spark/rapids/GpuOverrides.scala | 3 +- .../catalyst/json/rapids/GpuJsonScan.scala | 10 +++ .../spark/sql/rapids/GpuJsonToStructs.scala | 16 +++- .../rapids/shims/GpuJsonToStructsShim.scala | 9 ++ .../sql/catalyst/json/GpuJsonUtils.scala | 5 ++ .../sql/catalyst/json/GpuJsonUtils.scala | 6 ++ .../sql/catalyst/json/GpuJsonUtils.scala | 6 ++ .../rapids/shims/GpuJsonToStructsShim.scala | 19 +++++ .../sql/catalyst/json/GpuJsonUtils.scala | 19 +++-- 13 files changed, 181 insertions(+), 14 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 370d61e5b0c..53b39ec251e 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -330,7 +330,15 @@ Dates are partially supported but there are some known issues: parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4 will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown. -Timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)). +Timestamps are partially supported but there are some known issues: + +- Only the default `timestampFormat` of `yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]` is supported. The query will fall back to CPU if any other format + is specified ([#9273](https://github.com/NVIDIA/spark-rapids/issues/9723)) +- Strings containing integers with more than four digits will be + parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4 + will parse these numbers as number of days since the epoch, and in Spark 3.4 and later, an exception will be thrown. +- Strings containing special date constant values such as `now` and `today` will parse as null ([#9724](https://github.com/NVIDIA/spark-rapids/issues/9724)), + which differs from the behavior in Spark 3.1.x When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)). diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 490ec771ab0..1566a291f36 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -8141,8 +8141,8 @@ are limited. NS -PS
MAP only supports keys and values that are of STRING type;
unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
-PS
unsupported child types TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
+PS
MAP only supports keys and values that are of STRING type;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types NULL, BINARY, CALENDAR, MAP, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types NULL, BINARY, CALENDAR, MAP, UDT
diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index e3f50727619..bb99a01425f 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -21,7 +21,7 @@ from datetime import timezone from conftest import is_databricks_runtime from marks import approximate_float, allow_non_gpu, ignore_order -from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330, is_before_spark_340, \ +from spark_session import with_cpu_session, with_gpu_session, is_before_spark_320, is_before_spark_330, is_before_spark_340, \ is_before_spark_341 json_supported_gens = [ @@ -600,6 +600,88 @@ def test_from_json_struct_date_fallback_non_default_format(date_gen, date_format conf={"spark.rapids.sql.expression.JsonToStructs": True, 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}) +@pytest.mark.parametrize('timestamp_gen', [ + # "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}(\\.[0-9]{1,6})?Z?[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]}?\"", + # "yyyy-MM-dd" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"", + # "yyyy-MM" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[1-8]{1}[0-9]{3}-[0-3]{1,2}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"", + # "yyyy" + "\"[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?[0-9]{4}[ \t\xA0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]?\"", + # "dd/MM/yyyy" + "\"[0-9]{2}/[0-9]{2}/[1-8]{1}[0-9]{3}\"", + # special constant values + pytest.param("\"(now|today|tomorrow|epoch)\"", marks=pytest.mark.xfail(condition=is_before_spark_320(), reason="https://github.com/NVIDIA/spark-rapids/issues/9724")), + # "nnnnn" (number of days since epoch prior to Spark 3.4, throws exception from 3.4) + pytest.param("\"[0-9]{5}\"", marks=pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9664")), + # integral + pytest.param("[0-9]{1,5}", marks=pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9588")), + "[1-9]{1,8}", + # floating-point + "[0-9]{0,2}\.[0-9]{1,2}" + # boolean + "(true|false)" +]) +@pytest.mark.parametrize('timestamp_format', [ + "", + "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]", + # https://github.com/NVIDIA/spark-rapids/issues/9723 + pytest.param("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", marks=pytest.mark.allow_non_gpu('ProjectExec')), + pytest.param("dd/MM/yyyy'T'HH:mm:ss[.SSS][XXX]", marks=pytest.mark.allow_non_gpu('ProjectExec')), +]) +@pytest.mark.parametrize('time_parser_policy', [ + pytest.param("LEGACY", marks=pytest.mark.allow_non_gpu('ProjectExec')), + "CORRECTED" +]) +@pytest.mark.parametrize('ansi_enabled', [ True, False ]) +def test_from_json_struct_timestamp(timestamp_gen, timestamp_format, time_parser_policy, ansi_enabled): + json_string_gen = StringGen(r'{ "a": ' + timestamp_gen + ' }') \ + .with_special_case('{ "a": null }') \ + .with_special_case('null') + options = { 'timestampFormat': timestamp_format } if len(timestamp_format) > 0 else { } + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.col('a'), f.from_json('a', 'struct', options)), + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.sql.legacy.timeParserPolicy': time_parser_policy, + 'spark.sql.ansi.enabled': ansi_enabled }) + +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('timestamp_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}(\\.[0-9]{1,6})?Z?\""]) +@pytest.mark.parametrize('timestamp_format', [ + "", + "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]", +]) +def test_from_json_struct_timestamp_fallback_legacy(timestamp_gen, timestamp_format): + json_string_gen = StringGen(r'{ "a": ' + timestamp_gen + ' }') \ + .with_special_case('{ "a": null }') \ + .with_special_case('null') + options = { 'timestampFormat': timestamp_format } if len(timestamp_format) > 0 else { } + assert_gpu_fallback_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.col('a'), f.from_json('a', 'struct', options)), + 'ProjectExec', + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.sql.legacy.timeParserPolicy': 'LEGACY'}) + +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('timestamp_gen', ["\"[1-8]{1}[0-9]{3}-[0-3]{1,2}-[0-3]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}(\\.[0-9]{1,6})?Z?\""]) +@pytest.mark.parametrize('timestamp_format', [ + "yyyy-MM-dd'T'HH:mm:ss.SSSXXX", + "dd/MM/yyyy'T'HH:mm:ss[.SSS][XXX]", +]) +def test_from_json_struct_timestamp_fallback_non_default_format(timestamp_gen, timestamp_format): + json_string_gen = StringGen(r'{ "a": ' + timestamp_gen + ' }') \ + .with_special_case('{ "a": null }') \ + .with_special_case('null') + options = { 'timestampFormat': timestamp_format } if len(timestamp_format) > 0 else { } + assert_gpu_fallback_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.col('a'), f.from_json('a', 'struct', options)), + 'ProjectExec', + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}) @pytest.mark.parametrize('schema', ['struct', 'struct>', diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 2d1ba0d3c3b..2f59cfba072 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1379,7 +1379,7 @@ object GpuCast { } /** This method does not close the `input` ColumnVector. */ - private def convertTimestampOrNull( + def convertTimestampOrNull( input: ColumnVector, regex: String, cudfFormat: String): ColumnVector = { @@ -1463,7 +1463,7 @@ object GpuCast { } } - private def castStringToTimestamp(input: ColumnVector, ansiMode: Boolean): ColumnVector = { + def castStringToTimestamp(input: ColumnVector, ansiMode: Boolean): ColumnVector = { // special timestamps val today = DateUtils.currentDate() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index bdeae65a975..8119e78d988 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3570,7 +3570,8 @@ object GpuOverrides extends Logging { "Returns a struct value with the given `jsonStr` and `schema`", ExprChecks.projectOnly( TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral + - TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE) + + TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128 + TypeSig.BOOLEAN + TypeSig.DATE + + TypeSig.TIMESTAMP) + TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP, "MAP only supports keys and values that are of STRING type"), (TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 5c730bc23bf..c4840839616 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -121,6 +121,16 @@ object GpuJsonScan { } } + val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[TimestampType]) + if (hasTimestamps) { + GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions) match { + case None | Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") => + // this is fine + case timestampFormat => + meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported timestampFormat $timestampFormat") + } + } + if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) { meta.willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs") } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index f2de53483b0..3447c91d861 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -20,7 +20,7 @@ import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar} import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.GpuCast.doCast +import com.nvidia.spark.rapids.GpuCast import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq import com.nvidia.spark.rapids.jni.MapUtils import com.nvidia.spark.rapids.shims.GpuJsonToStructsShim @@ -215,7 +215,13 @@ case class GpuJsonToStructs( GpuJsonToStructsShim.castJsonStringToDate(col, options) case (_, DataTypes.DateType) => castToNullDate(input.getBase) - case _ => doCast(col, sparkType, dtype) + case (DataTypes.StringType, DataTypes.TimestampType) => + GpuJsonToStructsShim.castJsonStringToTimestamp(col, options) + case (DataTypes.LongType, DataTypes.TimestampType) => + GpuCast.castLongToTimestamp(col, DataTypes.TimestampType) + case (_, DataTypes.TimestampType) => + castToNullTimestamp(input.getBase) + case _ => GpuCast.doCast(col, sparkType, dtype) } } @@ -267,6 +273,12 @@ case class GpuJsonToStructs( } } + private def castToNullTimestamp(input: ColumnVector): ColumnVector = { + withResource(Scalar.fromNull(DType.TIMESTAMP_MICROSECONDS)) { nullScalar => + ColumnVector.fromScalar(nullScalar, input.getRowCount.toInt) + } + } + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = copy(timeZoneId = Option(timeZoneId)) diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala index 7e6709388f3..1fae5e7c5dc 100644 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala @@ -57,4 +57,13 @@ object GpuJsonToStructsShim { } } + def castJsonStringToTimestamp(input: ColumnVector, + options: Map[String, String]): ColumnVector = { + withResource(Scalar.fromString(" ")) { space => + withResource(input.strip(space)) { trimmed => + // from_json doesn't respect ansi mode + GpuCast.castStringToTimestamp(trimmed, ansiMode = false) + } + } + } } \ No newline at end of file diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 6a3c63ca2e9..7b7b680db24 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -42,6 +42,11 @@ object GpuJsonUtils { def dateFormatInRead(options: Map[String, String]): String = dateFormatInRead(parseJSONReadOptions(options)) + def optionalTimestampFormatInRead(options: JSONOptions): Option[String] = + Some(options.timestampFormat) + def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] = + optionalTimestampFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index ab673be12f5..5f1d8929887 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -36,6 +36,12 @@ object GpuJsonUtils { def dateFormatInRead(options: Map[String, String]): String = dateFormatInRead(parseJSONReadOptions(options)) + def optionalTimestampFormatInRead(options: JSONOptions): Option[String] = + options.timestampFormatInRead + + def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] = + optionalTimestampFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 9a1e34bfe12..33989821009 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -43,6 +43,12 @@ object GpuJsonUtils { def dateFormatInRead(options: Map[String, String]): String = dateFormatInRead(parseJSONReadOptions(options)) + def optionalTimestampFormatInRead(options: JSONOptions): Option[String] = + options.timestampFormatInRead + + def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] = + optionalTimestampFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( if (SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala index 88560143f2e..0ae9bb3d839 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/GpuJsonToStructsShim.scala @@ -46,4 +46,23 @@ object GpuJsonToStructsShim { } } + def castJsonStringToTimestamp(input: ColumnVector, + options: Map[String, String]): ColumnVector = { + options.get("timestampFormat") match { + case None => + // legacy behavior + withResource(Scalar.fromString(" ")) { space => + withResource(input.strip(space)) { trimmed => + // from_json doesn't respect ansi mode + GpuCast.castStringToTimestamp(trimmed, ansiMode = false) + } + } + case Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") => + GpuCast.convertTimestampOrNull(input, + "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]{1,6})?Z?$", "%Y-%m-%d") + case other => + // should be unreachable due to GpuOverrides checks + throw new IllegalStateException(s"Unsupported timestampFormat $other") + } + } } \ No newline at end of file diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 92c1c17bba5..4685cc0d289 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -33,11 +33,7 @@ object GpuJsonUtils { options.dateFormatInRead def optionalDateFormatInRead(options: Map[String, String]): Option[String] = { - val parsedOptions = new JSONOptionsInRead( - options, - SQLConf.get.sessionLocalTimeZone, - SQLConf.get.columnNameOfCorruptRecord) - optionalDateFormatInRead(parsedOptions) + optionalDateFormatInRead(parseJSONReadOptions(options)) } /** @@ -51,6 +47,12 @@ object GpuJsonUtils { def dateFormatInRead(options: JSONOptions): String = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + def optionalTimestampFormatInRead(options: JSONOptions): Option[String] = + options.timestampFormatInRead + + def optionalTimestampFormatInRead(options: Map[String, String]): Option[String] = + optionalTimestampFormatInRead(parseJSONReadOptions(options)) + def timestampFormatInRead(options: JSONOptions): String = options.timestampFormatInRead.getOrElse( if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy()) { s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" @@ -60,4 +62,11 @@ object GpuJsonUtils { def enableDateTimeParsingFallback(options: JSONOptions): Boolean = options.enableDateTimeParsingFallback.getOrElse(false) + + def parseJSONReadOptions(options: Map[String, String]) = { + new JSONOptionsInRead( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } } From 7963612a50a1f753a2e6d5553229cdf461d7d1ed Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 21 Nov 2023 20:47:53 -0600 Subject: [PATCH 03/25] Set seed to 0 for test_hash_reduction_sum (#9826) Signed-off-by: Jason Lowe --- integration_tests/src/main/python/hash_aggregate_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index a9300a51c79..4ecf42a9b42 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -381,6 +381,7 @@ def test_hash_grpby_sum_full_decimal(data_gen, conf): conf = conf) @approximate_float +@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9822") @ignore_order @incompat @pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens + [DecimalGen(precision=36, scale=5)], ids=idfn) From bdc45cb9264ac19d4058c93072bdeaa4e738fc8a Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 21 Nov 2023 20:52:26 -0600 Subject: [PATCH 04/25] Update timestamp gens to avoid "year 0 is out of range" errors (#9821) Signed-off-by: Jason Lowe --- integration_tests/src/main/python/delta_lake_write_test.py | 2 +- .../src/main/python/fastparquet_compatibility_test.py | 2 +- integration_tests/src/main/python/parquet_write_test.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/delta_lake_write_test.py b/integration_tests/src/main/python/delta_lake_write_test.py index 5e4163a9ecc..b62a6992ff0 100644 --- a/integration_tests/src/main/python/delta_lake_write_test.py +++ b/integration_tests/src/main/python/delta_lake_write_test.py @@ -412,7 +412,7 @@ def setup_tables(spark): @pytest.mark.parametrize("ts_write", ["INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"], ids=idfn) @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_legacy_timestamp(spark_tmp_path, ts_write): - gen = TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), + gen = TimestampGen(start=datetime(1, 2, 1, tzinfo=timezone.utc), end=datetime(2000, 1, 1, tzinfo=timezone.utc)).with_special_case( datetime(1000, 1, 1, tzinfo=timezone.utc), weight=10.0) data_path = spark_tmp_path + "/DELTA_DATA" diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index d2636d58d01..6ec5ec88fd3 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -192,7 +192,7 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): start=pandas_min_datetime, end=pandas_max_datetime), # Vanilla case. pytest.param(TimestampGen(nullable=False, - start=datetime(1, 1, 1, tzinfo=timezone.utc), + start=datetime(1, 2, 1, tzinfo=timezone.utc), end=pandas_min_datetime), marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), ], ids=idfn) diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index bd330b569bb..8a74973b0be 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -72,7 +72,7 @@ parquet_datetime_gen_simple = [DateGen(start=date(1, 1, 1), end=date(2000, 1, 1)) .with_special_case(date(1000, 1, 1), weight=10.0), - TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), + TimestampGen(start=datetime(1, 2, 1, tzinfo=timezone.utc), end=datetime(2000, 1, 1, tzinfo=timezone.utc)) .with_special_case(datetime(1000, 1, 1, tzinfo=timezone.utc), weight=10.0)] parquet_datetime_in_struct_gen = [ @@ -280,8 +280,8 @@ def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_fact @pytest.mark.parametrize('ts_write_data_gen', [('INT96', TimestampGen()), - ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))), - ('TIMESTAMP_MILLIS', TimestampGen(start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc)))]) + ('TIMESTAMP_MICROS', TimestampGen(start=datetime(1, 2, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc))), + ('TIMESTAMP_MILLIS', TimestampGen(start=datetime(1, 2, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc)))]) @pytest.mark.parametrize('rebase', ["CORRECTED","EXCEPTION"]) def test_ts_write_fails_datetime_exception(spark_tmp_path, ts_write_data_gen, spark_tmp_table_factory, rebase): ts_write, gen = ts_write_data_gen From 908e9869465918369a4e1a9a4ee95c30376cac7f Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 22 Nov 2023 08:21:12 -0600 Subject: [PATCH 05/25] Add GpuCheckOverflowInTableInsert to Databricks 11.3+ (#9800) Signed-off-by: Jason Lowe --- .../src/main/python/parquet_write_test.py | 21 +++++ .../rapids/shims/Spark330PlusDBShims.scala | 87 +++++++++++++++++++ .../spark/rapids/shims/SparkShims.scala | 50 +---------- .../GpuCheckOverflowInTableInsert.scala | 3 + ...ims.scala => Spark331PlusNonDBShims.scala} | 2 +- .../spark/rapids/shims/SparkShims.scala | 2 +- .../spark/rapids/shims/SparkShims.scala | 2 +- .../rapids/shims/Spark332PlusDBShims.scala | 46 ++-------- ...ims.scala => Spark340PlusNonDBShims.scala} | 2 +- .../spark/rapids/shims/SparkShims.scala | 2 +- .../spark/rapids/shims/SparkShims.scala | 2 +- 11 files changed, 126 insertions(+), 93 deletions(-) create mode 100644 sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/Spark330PlusDBShims.scala rename sql-plugin/src/main/{spark331 => spark330db}/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala (97%) rename sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/{Spark331PlusShims.scala => Spark331PlusNonDBShims.scala} (97%) rename sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/{Spark340PlusShims.scala => Spark340PlusNonDBShims.scala} (99%) diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 8a74973b0be..c83939f4774 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -818,3 +818,24 @@ def test_parquet_write_column_name_with_dots(spark_tmp_path): lambda spark, path: gen_df(spark, gens).coalesce(1).write.parquet(path), lambda spark, path: spark.read.parquet(path), data_path) + +@ignore_order +def test_parquet_append_with_downcast(spark_tmp_table_factory, spark_tmp_path): + data_path = spark_tmp_path + "/PARQUET_DATA" + cpu_table = spark_tmp_table_factory.get() + gpu_table = spark_tmp_table_factory.get() + def setup_tables(spark): + df = unary_op_df(spark, int_gen, length=10) + df.write.format("parquet").option("path", data_path + "/CPU").saveAsTable(cpu_table) + df.write.format("parquet").option("path", data_path + "/GPU").saveAsTable(gpu_table) + with_cpu_session(setup_tables) + def do_append(spark, path): + table = cpu_table + if path.endswith("/GPU"): + table = gpu_table + unary_op_df(spark, LongGen(min_val=0, max_val=128, special_cases=[]), length=10)\ + .write.mode("append").saveAsTable(table) + assert_gpu_and_cpu_writes_are_equal_collect( + do_append, + lambda spark, path: spark.read.parquet(path), + data_path) diff --git a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/Spark330PlusDBShims.scala b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/Spark330PlusDBShims.scala new file mode 100644 index 00000000000..cb45d0fa440 --- /dev/null +++ b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/Spark330PlusDBShims.scala @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330db"} +{"spark": "332db"} +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} +import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec +import org.apache.spark.sql.execution.exchange.{EXECUTOR_BROADCAST, ShuffleExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.rapids.{GpuCheckOverflowInTableInsert, GpuElementAtMeta} +import org.apache.spark.sql.rapids.execution.{GpuBroadcastHashJoinExec, GpuBroadcastNestedLoopJoinExec} + +trait Spark330PlusDBShims extends Spark321PlusDBShims { + override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { + val shimExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( + GpuOverrides.expr[CheckOverflowInTableInsert]( + "Casting a numeric value as another numeric type in store assignment", + ExprChecks.unaryProjectInputMatchesOutput( + TypeSig.all, + TypeSig.all), + (t, conf, p, r) => new UnaryExprMeta[CheckOverflowInTableInsert](t, conf, p, r) { + override def convertToGpu(child: Expression): GpuExpression = { + child match { + case c: GpuCast => GpuCheckOverflowInTableInsert(c, t.columnName) + case _ => + throw new IllegalStateException("Expression child is not of Type GpuCast") + } + } + }), + GpuElementAtMeta.elementAtRule(true) + ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + super.getExprs ++ shimExprs ++ DayTimeIntervalShims.exprs ++ RoundingShims.exprs + } + + override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = + super.getExecs ++ PythonMapInArrowExecShims.execs + + override def reproduceEmptyStringBug: Boolean = false + + override def isExecutorBroadcastShuffle(shuffle: ShuffleExchangeLike): Boolean = { + shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) + } + + override def shuffleParentReadsShuffleData(shuffle: ShuffleExchangeLike, + parent: SparkPlan): Boolean = { + parent match { + case _: GpuBroadcastHashJoinExec => + shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) + case _: GpuBroadcastNestedLoopJoinExec => + shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) + case _ => false + } + } + + + override def addRowShuffleToQueryStageTransitionIfNeeded(c2r: ColumnarToRowTransition, + sqse: ShuffleQueryStageExec): SparkPlan = { + val plan = GpuTransitionOverrides.getNonQueryStagePlan(sqse) + plan match { + case shuffle: ShuffleExchangeLike if shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) => + ShuffleExchangeExec(SinglePartition, c2r, EXECUTOR_BROADCAST) + case _ => + c2r + } + } +} diff --git a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 92a52ce3fcf..84f5c4e4886 100644 --- a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -21,29 +21,13 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.SinglePartition -import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} -import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} -import org.apache.spark.sql.execution.exchange.{EXECUTOR_BROADCAST, ShuffleExchangeExec, ShuffleExchangeLike} -import org.apache.spark.sql.rapids.GpuElementAtMeta -import org.apache.spark.sql.rapids.execution.{GpuBroadcastHashJoinExec, GpuBroadcastNestedLoopJoinExec} -object SparkShimImpl extends Spark321PlusDBShims { +object SparkShimImpl extends Spark330PlusDBShims { // AnsiCast is removed from Spark3.4.0 override def ansiCastRule: ExprRule[_ <: Expression] = null - override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { - val elementAtExpr: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( - GpuElementAtMeta.elementAtRule(true) - ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap - super.getExprs ++ DayTimeIntervalShims.exprs ++ RoundingShims.exprs ++ elementAtExpr - } - - override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = - super.getExecs ++ PythonMapInArrowExecShims.execs - override def getDataWriteCmds: Map[Class[_ <: DataWritingCommand], DataWritingCommandRule[_ <: DataWritingCommand]] = { Seq(GpuOverrides.dataWriteCmd[CreateDataSourceTableAsSelectCommand]( @@ -56,32 +40,4 @@ object SparkShimImpl extends Spark321PlusDBShims { RunnableCommandRule[_ <: RunnableCommand]] = { Map.empty } - - override def reproduceEmptyStringBug: Boolean = false - - override def isExecutorBroadcastShuffle(shuffle: ShuffleExchangeLike): Boolean = { - shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) - } - - override def shuffleParentReadsShuffleData(shuffle: ShuffleExchangeLike, - parent: SparkPlan): Boolean = { - parent match { - case _: GpuBroadcastHashJoinExec => - shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) - case _: GpuBroadcastNestedLoopJoinExec => - shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) - case _ => false - } - } - - override def addRowShuffleToQueryStageTransitionIfNeeded(c2r: ColumnarToRowTransition, - sqse: ShuffleQueryStageExec): SparkPlan = { - val plan = GpuTransitionOverrides.getNonQueryStagePlan(sqse) - plan match { - case shuffle: ShuffleExchangeLike if shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) => - ShuffleExchangeExec(SinglePartition, c2r, EXECUTOR_BROADCAST) - case _ => - c2r - } - } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark331/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala similarity index 97% rename from sql-plugin/src/main/spark331/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala rename to sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala index 3618f749bbd..7906b522d70 100644 --- a/sql-plugin/src/main/spark331/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/GpuCheckOverflowInTableInsert.scala @@ -15,12 +15,15 @@ */ /*** spark-rapids-shim-json-lines +{"spark": "330db"} {"spark": "331"} {"spark": "332"} {"spark": "332cdh"} +{"spark": "332db"} {"spark": "333"} {"spark": "340"} {"spark": "341"} +{"spark": "341db"} {"spark": "350"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids diff --git a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusShims.scala b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala similarity index 97% rename from sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusShims.scala rename to sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala index 2d49bf7dc2c..4a346235627 100644 --- a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusShims.scala +++ b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/Spark331PlusNonDBShims.scala @@ -30,7 +30,7 @@ import com.nvidia.spark.rapids.{ExprChecks, ExprRule, GpuCast, GpuExpression, Gp import org.apache.spark.sql.catalyst.expressions.{CheckOverflowInTableInsert, Expression} import org.apache.spark.sql.rapids.GpuCheckOverflowInTableInsert -trait Spark331PlusShims extends Spark330PlusNonDBShims { +trait Spark331PlusNonDBShims extends Spark330PlusNonDBShims { override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { val map: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( // Add expression CheckOverflowInTableInsert starting Spark-3.3.1+ diff --git a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index a201fe1de0c..eeede6ed2d9 100644 --- a/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark331/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -25,7 +25,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} -object SparkShimImpl extends Spark331PlusShims with AnsiCastRuleShims { +object SparkShimImpl extends Spark331PlusNonDBShims with AnsiCastRuleShims { override def getDataWriteCmds: Map[Class[_ <: DataWritingCommand], DataWritingCommandRule[_ <: DataWritingCommand]] = { Seq(GpuOverrides.dataWriteCmd[CreateDataSourceTableAsSelectCommand]( diff --git a/sql-plugin/src/main/spark332cdh/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark332cdh/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index d43a8092b33..517b52b7218 100644 --- a/sql-plugin/src/main/spark332cdh/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark332cdh/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -19,4 +19,4 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims -object SparkShimImpl extends Spark33cdhShims with Spark331PlusShims {} +object SparkShimImpl extends Spark33cdhShims with Spark331PlusNonDBShims {} diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala index 563dcfac8e7..43ef6118746 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/Spark332PlusDBShims.scala @@ -23,16 +23,11 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.SinglePartition -import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} -import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, RunnableCommand} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.exchange.{EXECUTOR_BROADCAST, ShuffleExchangeExec, ShuffleExchangeLike} -import org.apache.spark.sql.rapids.GpuElementAtMeta -import org.apache.spark.sql.rapids.execution.{GpuBroadcastHashJoinExec, GpuBroadcastNestedLoopJoinExec} -trait Spark332PlusDBShims extends Spark321PlusDBShims { +trait Spark332PlusDBShims extends Spark330PlusDBShims { // AnsiCast is removed from Spark3.4.0 override def ansiCastRule: ExprRule[_ <: Expression] = null @@ -45,10 +40,9 @@ trait Spark332PlusDBShims extends Spark321PlusDBShims { (a, conf, p, r) => new UnaryExprMeta[KnownNullable](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = GpuKnownNullable(child) } - ), - GpuElementAtMeta.elementAtRule(true) + ) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap - super.getExprs ++ shimExprs ++ DayTimeIntervalShims.exprs ++ RoundingShims.exprs + super.getExprs ++ shimExprs } private val shimExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq( @@ -63,7 +57,7 @@ trait Spark332PlusDBShims extends Spark321PlusDBShims { ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = - super.getExecs ++ shimExecs ++ PythonMapInArrowExecShims.execs + super.getExecs ++ shimExecs override def getDataWriteCmds: Map[Class[_ <: DataWritingCommand], DataWritingCommandRule[_ <: DataWritingCommand]] = { @@ -78,32 +72,4 @@ trait Spark332PlusDBShims extends Spark321PlusDBShims { (a, conf, p, r) => new CreateDataSourceTableAsSelectCommandMeta(a, conf, p, r)) ).map(r => (r.getClassFor.asSubclass(classOf[RunnableCommand]), r)).toMap } - - override def reproduceEmptyStringBug: Boolean = false - - override def isExecutorBroadcastShuffle(shuffle: ShuffleExchangeLike): Boolean = { - shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) - } - - override def shuffleParentReadsShuffleData(shuffle: ShuffleExchangeLike, - parent: SparkPlan): Boolean = { - parent match { - case _: GpuBroadcastHashJoinExec => - shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) - case _: GpuBroadcastNestedLoopJoinExec => - shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) - case _ => false - } - } - - override def addRowShuffleToQueryStageTransitionIfNeeded(c2r: ColumnarToRowTransition, - sqse: ShuffleQueryStageExec): SparkPlan = { - val plan = GpuTransitionOverrides.getNonQueryStagePlan(sqse) - plan match { - case shuffle: ShuffleExchangeLike if shuffle.shuffleOrigin.equals(EXECUTOR_BROADCAST) => - ShuffleExchangeExec(SinglePartition, c2r, EXECUTOR_BROADCAST) - case _ => - c2r - } - } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusShims.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala similarity index 99% rename from sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusShims.scala rename to sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala index 3462086b0b1..2db727f14e0 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusShims.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/Spark340PlusNonDBShims.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS import org.apache.spark.sql.rapids.GpuElementAtMeta import org.apache.spark.sql.rapids.GpuV1WriteUtils.GpuEmpty2Null -trait Spark340PlusShims extends Spark331PlusShims { +trait Spark340PlusNonDBShims extends Spark331PlusNonDBShims { private val shimExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq( GpuOverrides.exec[GlobalLimitExec]( diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index a91f7263372..d2031435147 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -20,4 +20,4 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims -object SparkShimImpl extends Spark340PlusShims +object SparkShimImpl extends Spark340PlusNonDBShims diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 75a13143a94..7231030bdee 100644 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, PythonUDAF, ToPret import org.apache.spark.sql.rapids.execution.python.GpuPythonUDAF import org.apache.spark.sql.types.StringType -object SparkShimImpl extends Spark340PlusShims { +object SparkShimImpl extends Spark340PlusNonDBShims { override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { val shimExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( From cd3f85ff3bf9c0ab1b2877fd272fe72f065fa3ac Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 22 Nov 2023 11:16:57 -0600 Subject: [PATCH 06/25] UCX 1.15 upgrade (#9824) * UCX 1.15 upgrade Signed-off-by: Alessandro Bellina * Also update scala 2.13 --------- Signed-off-by: Alessandro Bellina --- .../Dockerfile.rocky_no_rdma | 13 ++++++++----- .../Dockerfile.rocky_rdma | 13 ++++++++----- .../Dockerfile.ubuntu_no_rdma | 15 +++++++++------ .../Dockerfile.ubuntu_rdma | 16 ++++++++++------ jenkins/Dockerfile-blossom.multi | 2 +- jenkins/Dockerfile-blossom.ubuntu | 5 +++-- pom.xml | 2 +- scala2.13/pom.xml | 2 +- 8 files changed, 41 insertions(+), 27 deletions(-) diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma index 4be0562123a..adf28f5fea2 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_no_rdma @@ -17,23 +17,26 @@ # # The parameters are: # - CUDA_VER: 11.8.0 by default -# - UCX_VER and UCX_CUDA_VER: these are used to pick a package matching a specific UCX version and -# CUDA runtime from the UCX github repo. -# See: https://github.com/openucx/ucx/releases/ +# - UCX_VER, UCX_CUDA_VER, and UCX_ARCH: +# Used to pick a package matching a specific UCX version and +# CUDA runtime from the UCX github repo. +# See: https://github.com/openucx/ucx/releases/ # - ROCKY_VER: Rocky Linux OS version ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.14.0 +ARG UCX_VER=1.15.0 ARG UCX_CUDA_VER=11 +ARG UCX_ARCH=x86_64 ARG ROCKY_VER=8 FROM nvidia/cuda:${CUDA_VER}-runtime-rockylinux${ROCKY_VER} ARG UCX_VER ARG UCX_CUDA_VER +ARG UCX_ARCH RUN yum update -y && yum install -y wget bzip2 numactl-libs libgomp RUN ls /usr/lib RUN mkdir /tmp/ucx_install && cd /tmp/ucx_install && \ - wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-centos8-mofed5-cuda$UCX_CUDA_VER.tar.bz2 && \ + wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-centos8-mofed5-cuda$UCX_CUDA_VER-$UCX_ARCH.tar.bz2 && \ tar -xvf *.bz2 && \ rpm -i ucx-$UCX_VER*.rpm && \ rpm -i ucx-cuda-$UCX_VER*.rpm --nodeps && \ diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma index c5055e61ec6..9083e1561b5 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.rocky_rdma @@ -17,22 +17,25 @@ # # The parameters are: # - CUDA_VER: 11.8.0 by default -# - UCX_VER and UCX_CUDA_VER: these are used to pick a package matching a specific UCX version and -# CUDA runtime from the UCX github repo. -# See: https://github.com/openucx/ucx/releases/ +# - UCX_VER, UCX_CUDA_VER, and UCX_ARCH: +# Used to pick a package matching a specific UCX version and +# CUDA runtime from the UCX github repo. +# See: https://github.com/openucx/ucx/releases/ # - ROCKY_VER: Rocky Linux OS version ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.14.0 +ARG UCX_VER=1.15.0 ARG UCX_CUDA_VER=11 +ARG UCX_ARCH=x86_64 ARG ROCKY_VER=8 FROM nvidia/cuda:${CUDA_VER}-runtime-rockylinux${ROCKY_VER} ARG UCX_VER ARG UCX_CUDA_VER +ARG UCX_ARCH RUN yum update -y && yum install -y wget bzip2 rdma-core numactl-libs libgomp libibverbs librdmacm RUN mkdir /tmp/ucx_install && cd /tmp/ucx_install && \ - wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-centos8-mofed5-cuda$UCX_CUDA_VER.tar.bz2 && \ + wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-centos8-mofed5-cuda$UCX_CUDA_VER-$UCX_ARCH.tar.bz2 && \ tar -xvf *.bz2 && \ rpm -i ucx-$UCX_VER*.rpm && \ rpm -i ucx-cuda-$UCX_VER*.rpm --nodeps && \ diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma index 8d6fc1671bc..e0318a0de60 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_no_rdma @@ -17,21 +17,24 @@ # # The parameters are: # - CUDA_VER: 11.8.0 by default -# - UCX_VER and UCX_CUDA_VER: these are used to pick a package matching a specific UCX version and -# CUDA runtime from the UCX github repo. -# See: https://github.com/openucx/ucx/releases/ +# - UCX_VER, UCX_CUDA_VER, and UCX_ARCH: +# Used to pick a package matching a specific UCX version and +# CUDA runtime from the UCX github repo. +# See: https://github.com/openucx/ucx/releases/ # - UBUNTU_VER: 20.04 by default # ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.14.0 +ARG UCX_VER=1.15.0 ARG UCX_CUDA_VER=11 +ARG UCX_ARCH=x86_64 ARG UBUNTU_VER=20.04 FROM nvidia/cuda:${CUDA_VER}-runtime-ubuntu${UBUNTU_VER} ARG UCX_VER ARG UCX_CUDA_VER ARG UBUNTU_VER +ARG UCX_ARCH RUN apt-get update && apt-get install -y gnupg2 # https://forums.developer.nvidia.com/t/notice-cuda-linux-repository-key-rotation/212771 @@ -41,7 +44,7 @@ RUN CUDA_UBUNTU_VER=`echo "$UBUNTU_VER"| sed -s 's/\.//'` && \ RUN apt update RUN apt-get install -y wget RUN mkdir /tmp/ucx_install && cd /tmp/ucx_install && \ - wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER.tar.bz2 && \ - tar -xvf ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER.tar.bz2 && \ + wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER-$UCX_ARCH.tar.bz2 && \ + tar -xvf ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER-$UCX_ARCH.tar.bz2 && \ apt install -y /tmp/ucx_install/*.deb && \ rm -rf /tmp/ucx_install diff --git a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma index 9980da80c15..55281fc4b1b 100644 --- a/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma +++ b/docs/additional-functionality/shuffle-docker-examples/Dockerfile.ubuntu_rdma @@ -20,9 +20,10 @@ # - RDMA_CORE_VERSION: Set to 32.1 to match the rdma-core line in the latest # released MLNX_OFED 5.x driver # - CUDA_VER: 11.8.0 by default -# - UCX_VER and UCX_CUDA_VER: these are used to pick a package matching a specific UCX version and -# CUDA runtime from the UCX github repo. -# See: https://github.com/openucx/ucx/releases/ +# - UCX_VER, UCX_CUDA_VER, and UCX_ARCH: +# Used to pick a package matching a specific UCX version and +# CUDA runtime from the UCX github repo. +# See: https://github.com/openucx/ucx/releases/ # - UBUNTU_VER: 20.04 by default # # The Dockerfile first fetches and builds `rdma-core` to satisfy requirements for @@ -34,8 +35,9 @@ ARG RDMA_CORE_VERSION=32.1 ARG CUDA_VER=11.8.0 -ARG UCX_VER=1.14.0 +ARG UCX_VER=1.15.0 ARG UCX_CUDA_VER=11 +ARG UCX_ARCH=x86_64 ARG UBUNTU_VER=20.04 # Throw away image to build rdma_core @@ -43,6 +45,7 @@ FROM ubuntu:${UBUNTU_VER} as rdma_core ARG RDMA_CORE_VERSION ARG UBUNTU_VER ARG CUDA_VER +ARG UCX_ARCH RUN apt-get update && apt-get install -y gnupg2 # https://forums.developer.nvidia.com/t/notice-cuda-linux-repository-key-rotation/212771 @@ -61,6 +64,7 @@ RUN tar -xvf *.tar.gz && cd rdma-core*/ && dpkg-buildpackage -b -d FROM nvidia/cuda:${CUDA_VER}-runtime-ubuntu${UBUNTU_VER} ARG UCX_VER ARG UCX_CUDA_VER +ARG UCX_ARCH ARG UBUNTU_VER RUN mkdir /tmp/ucx_install @@ -70,7 +74,7 @@ COPY --from=rdma_core /*.deb /tmp/ucx_install/ RUN apt update RUN apt-get install -y wget RUN cd /tmp/ucx_install && \ - wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER.tar.bz2 && \ - tar -xvf ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER.tar.bz2 && \ + wget https://github.com/openucx/ucx/releases/download/v$UCX_VER/ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER-$UCX_ARCH.tar.bz2 && \ + tar -xvf ucx-$UCX_VER-ubuntu$UBUNTU_VER-mofed5-cuda$UCX_CUDA_VER-$UCX_ARCH.tar.bz2 && \ apt install -y /tmp/ucx_install/*.deb && \ rm -rf /tmp/ucx_install diff --git a/jenkins/Dockerfile-blossom.multi b/jenkins/Dockerfile-blossom.multi index f7e3cc59674..b5897f01881 100644 --- a/jenkins/Dockerfile-blossom.multi +++ b/jenkins/Dockerfile-blossom.multi @@ -26,7 +26,7 @@ ARG CUDA_VER=11.8.0 ARG UBUNTU_VER=20.04 -ARG UCX_VER=1.15.0-rc6 +ARG UCX_VER=1.15.0 # multi-platform build with: docker buildx build --platform linux/arm64,linux/amd64 on either amd64 or arm64 host # check available official arm-based docker images at https://hub.docker.com/r/nvidia/cuda/tags (OS/ARCH) FROM --platform=$TARGETPLATFORM nvidia/cuda:${CUDA_VER}-runtime-ubuntu${UBUNTU_VER} diff --git a/jenkins/Dockerfile-blossom.ubuntu b/jenkins/Dockerfile-blossom.ubuntu index b3500c491ac..b3366a5362f 100644 --- a/jenkins/Dockerfile-blossom.ubuntu +++ b/jenkins/Dockerfile-blossom.ubuntu @@ -27,13 +27,14 @@ ARG CUDA_VER=11.0.3 ARG UBUNTU_VER=20.04 -ARG UCX_VER=1.14.0 +ARG UCX_VER=1.15.0 ARG UCX_CUDA_VER=11 FROM nvidia/cuda:${CUDA_VER}-runtime-ubuntu${UBUNTU_VER} ARG CUDA_VER ARG UBUNTU_VER ARG UCX_VER ARG UCX_CUDA_VER +ARG UCX_ARCH=x86_64 # https://forums.developer.nvidia.com/t/notice-cuda-linux-repository-key-rotation/212771 RUN UB_VER=$(echo ${UBUNTU_VER} | tr -d '.') && \ @@ -65,7 +66,7 @@ RUN apt install -y inetutils-ping expect wget libnuma1 libgomp1 RUN mkdir -p /tmp/ucx && \ cd /tmp/ucx && \ - wget https://github.com/openucx/ucx/releases/download/v${UCX_VER}/ucx-${UCX_VER}-ubuntu${UBUNTU_VER}-mofed5-cuda${UCX_CUDA_VER}.tar.bz2 && \ + wget https://github.com/openucx/ucx/releases/download/v${UCX_VER}/ucx-${UCX_VER}-ubuntu${UBUNTU_VER}-mofed5-cuda${UCX_CUDA_VER}-${UCX_ARCH}.tar.bz2 && \ tar -xvf *.bz2 && \ dpkg -i *.deb && \ rm -rf /tmp/ucx diff --git a/pom.xml b/pom.xml index 297492604de..9b11655d7a7 100644 --- a/pom.xml +++ b/pom.xml @@ -648,7 +648,7 @@ https://github.com/openjdk/jdk17/blob/4afbcaf55383ec2f5da53282a1547bac3d099e9d/src/jdk.compiler/share/classes/com/sun/tools/javac/resources/compiler.properties#L1993-L1994 --> -Xlint:all,-serial,-path,-try,-processing|-Werror - 1.14 + 1.15.0 true package diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index fbc33b06cb5..c18ebb13930 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -648,7 +648,7 @@ https://github.com/openjdk/jdk17/blob/4afbcaf55383ec2f5da53282a1547bac3d099e9d/src/jdk.compiler/share/classes/com/sun/tools/javac/resources/compiler.properties#L1993-L1994 --> -Xlint:all,-serial,-path,-try,-processing|-Werror - 1.14 + 1.15.0 true package From 1ca5a0904c26b7370463b762acedd0a9a7709e6f Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 22 Nov 2023 11:22:07 -0800 Subject: [PATCH 07/25] Check paths for existence to prevent ignorable error messages during build (#9786) Fixes #9782 Signed-off-by: Gera Shegalov --- aggregator/pom.xml | 26 +++++++++++++++++--------- dist/pom.xml | 15 ++++++++++++--- pom.xml | 20 +++++++++++++++++--- scala2.13/aggregator/pom.xml | 26 +++++++++++++++++--------- scala2.13/dist/pom.xml | 15 ++++++++++++--- scala2.13/pom.xml | 20 +++++++++++++++++--- scala2.13/sql-plugin/pom.xml | 13 ++++++++++++- sql-plugin/pom.xml | 13 ++++++++++++- 8 files changed, 116 insertions(+), 32 deletions(-) diff --git a/aggregator/pom.xml b/aggregator/pom.xml index 8f8b6da47fc..27c13af1e4d 100644 --- a/aggregator/pom.xml +++ b/aggregator/pom.xml @@ -156,15 +156,23 @@ - - - - - - - - + + + + + + + + + + + + Clean build? Skipping diff because ${oldClassesDir} does not exist + + diff --git a/dist/pom.xml b/dist/pom.xml index a858d2865b5..6fbc047ac47 100644 --- a/dist/pom.xml +++ b/dist/pom.xml @@ -331,9 +331,18 @@ run - - - + + + + + + + + + + Re-execute build with the default `-Drapids.jni.unpack.skip=false` + + diff --git a/pom.xml b/pom.xml index 9b11655d7a7..d099315ef8c 100644 --- a/pom.xml +++ b/pom.xml @@ -820,6 +820,7 @@ install ${spark.rapids.source.basedir}/.bloop ${project.build.outputDirectory}/rapids4spark-version-info.properties + false @@ -984,11 +985,22 @@ - + + - + + + + + + + + + Comparing git revisions: previous=${saved.build-info.revision} @@ -1471,7 +1483,9 @@ This will force full Scala code rebuild in downstream modules. Cleaning build directories of all modules ${target.dirs.str} - + diff --git a/scala2.13/aggregator/pom.xml b/scala2.13/aggregator/pom.xml index 4868d10d74e..5f85f31de01 100644 --- a/scala2.13/aggregator/pom.xml +++ b/scala2.13/aggregator/pom.xml @@ -156,15 +156,23 @@ - - - - - - - - + + + + + + + + + + + + Clean build? Skipping diff because ${oldClassesDir} does not exist + + diff --git a/scala2.13/dist/pom.xml b/scala2.13/dist/pom.xml index 7e87dfe5f7c..071ce8247b5 100644 --- a/scala2.13/dist/pom.xml +++ b/scala2.13/dist/pom.xml @@ -331,9 +331,18 @@ run - - - + + + + + + + + + + Re-execute build with the default `-Drapids.jni.unpack.skip=false` + + diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index c18ebb13930..39a811664e4 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -820,6 +820,7 @@ install ${spark.rapids.source.basedir}/.bloop ${project.build.outputDirectory}/rapids4spark-version-info.properties + false @@ -984,11 +985,22 @@ - + + - + + + + + + + + + Comparing git revisions: previous=${saved.build-info.revision} @@ -1471,7 +1483,9 @@ This will force full Scala code rebuild in downstream modules. Cleaning build directories of all modules ${target.dirs.str} - + diff --git a/scala2.13/sql-plugin/pom.xml b/scala2.13/sql-plugin/pom.xml index 67f3f91c30f..ee849082aa9 100644 --- a/scala2.13/sql-plugin/pom.xml +++ b/scala2.13/sql-plugin/pom.xml @@ -179,7 +179,18 @@ value="${servicesDir}/com.nvidia.spark.rapids.SparkShimServiceProvider"/> - + + + + + + + + + diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index 9773cc91ba1..0a7fb1ff8c1 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -179,7 +179,18 @@ value="${servicesDir}/com.nvidia.spark.rapids.SparkShimServiceProvider"/> - + + + + + + + + + From 1b4dbd7578984659d79652d75910c31bb2f014db Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 22 Nov 2023 14:30:07 -0600 Subject: [PATCH 08/25] Avoid pre-Gregorian dates in schema_evolution_test (#9835) Signed-off-by: Jason Lowe --- .../src/main/python/schema_evolution_test.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/schema_evolution_test.py b/integration_tests/src/main/python/schema_evolution_test.py index a457b082858..4138bb11e86 100644 --- a/integration_tests/src/main/python/schema_evolution_test.py +++ b/integration_tests/src/main/python/schema_evolution_test.py @@ -14,7 +14,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from datetime import datetime, timezone +from datetime import date, datetime, timezone from marks import ignore_order import pytest from spark_session import is_databricks_runtime, is_databricks113_or_later @@ -28,6 +28,9 @@ "spark.sql.legacy.parquet.int96RebaseModeInWrite": "CORRECTED", } +# Using a custom date generator due to https://github.com/NVIDIA/spark-rapids/issues/9807 +_custom_date_gen = DateGen(start=date(1590, 1, 1)) + # List of additional column data generators to use when adding columns _additional_gens = [ boolean_gen, @@ -38,12 +41,12 @@ float_gen, double_gen, string_gen, - date_gen, + _custom_date_gen, TimestampGen(start=datetime(1677, 9, 22, tzinfo=timezone.utc), end=datetime(2262, 4, 11, tzinfo=timezone.utc)), # RAPIDS Accelerator does not support MapFromArrays yet # https://github.com/NVIDIA/spark-rapids/issues/8696 # simple_string_to_string_map_gen), - ArrayGen(date_gen), + ArrayGen(_custom_date_gen), struct_gen_decimal128, StructGen([("c0", ArrayGen(long_gen)), ("c1", boolean_gen)]), ] From 61cfb7de20b4b0f524f4db8e2c77a6e197bf9b47 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Thu, 23 Nov 2023 07:36:13 +0800 Subject: [PATCH 09/25] Re-enable AST string integration cases (#9809) * Fix integration test for non UTF8 case Signed-off-by: Ferdinand Xu * Revert * Remove unused imports * hardcode LC_ALL to test using pre-merge CI * Revert "hardcode LC_ALL to test using pre-merge CI" This reverts commit 3372d1e2e7a1f89bbb5c1a2d7376c87470fec86b. --------- Signed-off-by: Ferdinand Xu --- integration_tests/src/main/python/ast_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/ast_test.py b/integration_tests/src/main/python/ast_test.py index 0fa03b37d6a..21e982e4fb8 100644 --- a/integration_tests/src/main/python/ast_test.py +++ b/integration_tests/src/main/python/ast_test.py @@ -42,8 +42,7 @@ (double_gen, False), (timestamp_gen, True), (date_gen, True), - pytest.param((string_gen, True), - marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/9771")) + (string_gen, True) ] ast_boolean_descr = [(boolean_gen, True)] From d3629fd1993a391a4aac00d34edabfe58b28061c Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 23 Nov 2023 02:09:44 +0100 Subject: [PATCH 10/25] Enable build for Databricks 13.3 [databricks] (#9677) * pom changes * pom changes * pom changes * add databricks13.3 to premerge * Added ToPrettyString support * xfail approximate percentile test * xfail failing udf tests * xfail failing tests due to WriteIntoDeltaCommand * xfail test_delta_atomic_create_table_as_select and test_delta_atomic_replace_table_as_select * Added 341db to shim-deps and removed from datagen/pom.xml * updated udf-compiler pom.xml * updated sql-plugin pom.xml * fixed multiple pom.xml * updated udf-compiler pom.xml * removed TODO * Signoff Signed-off-by: Raza Jafri * updated scala 2.13 poms * Revert "xfail failing tests due to WriteIntoDeltaCommand" This reverts commit 00b498ed3ea963605cc36560e8896fe27bd412d2. * Revert "xfail test_delta_atomic_create_table_as_select and test_delta_atomic_replace_table_as_select" This reverts commit ea2fd40b8215cdfa845074127a641af62052e947. * remove tests/pom.xml changes * reverted 2.13 generation of tests/pom.xml * removed 341db profile from tests as we don't run unit tests on databricks * fixed the xfail reason to point to the correct issue * removed diff.patch * Revert "xfail approximate percentile test" This reverts commit 0a7fa52dc06681a9ef8f1da6b36ed35ac2be79dc. * build fixes Signed-off-by: Jason Lowe * Fix spark321db build * Skip UDF tests until UDF handling is updated * Remove xfail/skips eclipsed by module-level skip * xfail fastparquet tests due to nulls being introduced by pandas * Fix incorrect shimplify directives for 341db * Fix fallback test --------- Signed-off-by: Raza Jafri Signed-off-by: Jason Lowe Co-authored-by: Jason Lowe --- aggregator/pom.xml | 17 ++++++++ .../src/main/python/delta_lake_merge_test.py | 2 +- .../python/fastparquet_compatibility_test.py | 25 +++++++---- .../src/main/python/udf_cudf_test.py | 7 +++- integration_tests/src/main/python/udf_test.py | 8 +++- .../Jenkinsfile-blossom.premerge-databricks | 2 +- pom.xml | 29 ++++++++++++- scala2.13/aggregator/pom.xml | 17 ++++++++ scala2.13/pom.xml | 29 ++++++++++++- scala2.13/shim-deps/pom.xml | 41 +++++++++++++++++++ shim-deps/pom.xml | 41 +++++++++++++++++++ .../shims/ParquetLegacyNanoAsLongShims.scala | 1 - .../shims/ParquetTimestampNTZShims.scala | 1 - .../hive/rapids/shims/FileSinkDescShim.scala | 1 - .../rapids/shims/HiveInspectorsShim.scala | 1 - .../shims/TagScanForRuntimeFiltering.scala | 1 - ...puDatabricksShuffleExchangeExecBase.scala} | 16 ++------ .../rapids/shims/GpuShuffleExchangeExec.scala | 16 +++++++- .../spark/rapids/shims/CastCheckShims.scala | 1 - .../ParquetTimestampAnnotationShims.scala | 1 - .../spark/rapids/shims/CastCheckShims.scala | 1 + .../shims/ParquetLegacyNanoAsLongShims.scala | 1 + .../ParquetTimestampAnnotationShims.scala | 1 + .../shims/ParquetTimestampNTZShims.scala | 1 + .../shims/TagScanForRuntimeFiltering.scala | 1 + .../rapids/shims/Spark341PlusDBShims.scala | 16 +++++++- .../rapids/shims/GpuShuffleExchangeExec.scala | 10 +++-- .../hive/rapids/shims/FileSinkDescShim.scala | 1 + .../rapids/shims/HiveInspectorsShim.scala | 1 + 29 files changed, 251 insertions(+), 39 deletions(-) rename sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/{GpuShuffleExchangeExecBase.scala => GpuDatabricksShuffleExchangeExecBase.scala} (76%) rename sql-plugin/src/main/{spark350 => spark341db}/scala/org/apache/spark/sql/hive/rapids/shims/FileSinkDescShim.scala (98%) rename sql-plugin/src/main/{spark350 => spark341db}/scala/org/apache/spark/sql/hive/rapids/shims/HiveInspectorsShim.scala (98%) diff --git a/aggregator/pom.xml b/aggregator/pom.xml index 27c13af1e4d..4fa4827ac52 100644 --- a/aggregator/pom.xml +++ b/aggregator/pom.xml @@ -619,6 +619,23 @@ + + release341db + + + buildver + 341db + + + + + com.nvidia + rapids-4-spark-delta-spark341db_${scala.binary.version} + ${project.version} + ${spark.version.classifier} + + + release333 diff --git a/integration_tests/src/main/python/delta_lake_merge_test.py b/integration_tests/src/main/python/delta_lake_merge_test.py index 1d43259434b..0ba63380aba 100644 --- a/integration_tests/src/main/python/delta_lake_merge_test.py +++ b/integration_tests/src/main/python/delta_lake_merge_test.py @@ -97,7 +97,7 @@ def checker(data_path, do_merge): merge_sql=merge_sql, check_func=checker) -@allow_non_gpu("ExecutedCommandExec,BroadcastHashJoinExec,ColumnarToRowExec,BroadcastExchangeExec,DataWritingCommandExec", *delta_meta_allow) +@allow_non_gpu("ExecutedCommandExec,BroadcastHashJoinExec,ColumnarToRowExec,BroadcastExchangeExec,DataWritingCommandExec", delta_write_fallback_allow, *delta_meta_allow) @delta_lake @ignore_order @pytest.mark.skipif(is_databricks_runtime() and spark_version() < "3.3.2", reason="NOT MATCHED BY SOURCE added in DBR 12.2") diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 6ec5ec88fd3..b51fa5a55ef 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * from fastparquet_utils import get_fastparquet_result_canonicalizer -from spark_session import spark_version, with_cpu_session, with_gpu_session +from spark_session import is_databricks_runtime, spark_version, with_cpu_session, with_gpu_session def fastparquet_unavailable(): @@ -107,8 +107,12 @@ def read_with_fastparquet_or_plugin(spark): pytest.param(IntegerGen(nullable=True), marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), LongGen(nullable=False), - FloatGen(nullable=False), - DoubleGen(nullable=False), + pytest.param(FloatGen(nullable=False), + marks=pytest.mark.xfail(is_databricks_runtime(), + reason="https://github.com/NVIDIA/spark-rapids/issues/9778")), + pytest.param(DoubleGen(nullable=False), + marks=pytest.mark.xfail(is_databricks_runtime(), + reason="https://github.com/NVIDIA/spark-rapids/issues/9778")), StringGen(nullable=False), pytest.param(DecimalGen(nullable=False), marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " @@ -131,8 +135,11 @@ def read_with_fastparquet_or_plugin(spark): marks=pytest.mark.xfail(reason="Conversion from Pandas dataframe (read with fastparquet) to Spark dataframe " "fails: \"Unable to infer the type of the field a\".")), - StructGen(children=[("first", IntegerGen(nullable=False)), - ("second", FloatGen(nullable=False))], nullable=False) + pytest.param( + StructGen(children=[("first", IntegerGen(nullable=False)), + ("second", FloatGen(nullable=False))], nullable=False), + marks=pytest.mark.xfail(is_databricks_runtime(), + reason="https://github.com/NVIDIA/spark-rapids/issues/9778")), ], ids=idfn) def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): """ @@ -176,8 +183,12 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): LongGen(nullable=False), pytest.param(LongGen(nullable=True), marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), - FloatGen(nullable=False), - DoubleGen(nullable=False), + pytest.param(FloatGen(nullable=False), + marks=pytest.mark.xfail(is_databricks_runtime(), + reason="https://github.com/NVIDIA/spark-rapids/issues/9778")), + pytest.param(DoubleGen(nullable=False), + marks=pytest.mark.xfail(is_databricks_runtime(), + reason="https://github.com/NVIDIA/spark-rapids/issues/9778")), StringGen(nullable=False), pytest.param(DecimalGen(nullable=False), marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index 04416315702..6d94a5da206 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -37,10 +37,15 @@ from typing import Iterator from pyspark.sql import Window from pyspark.sql.functions import pandas_udf, PandasUDFType -from spark_session import with_cpu_session, with_gpu_session +from spark_session import is_databricks_runtime, is_spark_340_or_later, with_cpu_session, with_gpu_session from marks import cudf_udf +if is_databricks_runtime() and is_spark_340_or_later(): + # Databricks 13.3 does not use separate reader/writer threads for Python UDFs + # which can lead to hangs. Skipping these tests until the Python UDF handling is updated. + pytestmark = pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9493") + _conf = { 'spark.rapids.sql.exec.AggregateInPandasExec': 'true', 'spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec': 'true', diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index 14fc57cf972..db8425f6387 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -15,7 +15,7 @@ import pytest from conftest import is_at_least_precommit_run -from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_350_or_later +from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_340_or_later from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version @@ -43,6 +43,12 @@ import pyarrow from typing import Iterator, Tuple + +if is_databricks_runtime() and is_spark_340_or_later(): + # Databricks 13.3 does not use separate reader/writer threads for Python UDFs + # which can lead to hangs. Skipping these tests until the Python UDF handling is updated. + pytestmark = pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9493") + arrow_udf_conf = { 'spark.sql.execution.arrow.pyspark.enabled': 'true', 'spark.rapids.sql.exec.WindowInPandasExec': 'true', diff --git a/jenkins/Jenkinsfile-blossom.premerge-databricks b/jenkins/Jenkinsfile-blossom.premerge-databricks index 0ea835d39a9..27c42f59aab 100644 --- a/jenkins/Jenkinsfile-blossom.premerge-databricks +++ b/jenkins/Jenkinsfile-blossom.premerge-databricks @@ -88,7 +88,7 @@ pipeline { // 'name' and 'value' only supprt literal string in the declarative Jenkins // Refer to Jenkins issue https://issues.jenkins.io/browse/JENKINS-62127 name 'DB_RUNTIME' - values '10.4', '11.3', '12.2' + values '10.4', '11.3', '12.2', '13.3' } } stages { diff --git a/pom.xml b/pom.xml index d099315ef8c..7e6ed88cf9f 100644 --- a/pom.xml +++ b/pom.xml @@ -509,6 +509,31 @@ delta-lake/delta-spark332db + + + release341db + + + buildver + 341db + + + + + 3.4.4 + spark341db + ${spark341db.version} + ${spark341db.version} + 3.3.1 + true + 1.12.0 + ${spark330.iceberg.version} + + + shim-deps/databricks + delta-lake/delta-spark341db + + release350 @@ -691,6 +716,7 @@ 3.3.2.3.3.7190.0-91 3.3.0-databricks 3.3.2-databricks + 3.4.1-databricks 3.5.0 3.12.4 4.3.0 @@ -745,7 +771,8 @@ 321db, 330db, - 332db + 332db, + 341db + release341db + + + buildver + 341db + + + + + 3.4.4 + spark341db + ${spark341db.version} + ${spark341db.version} + 3.3.1 + true + 1.12.0 + ${spark330.iceberg.version} + + + shim-deps/databricks + delta-lake/delta-spark341db + + release350 @@ -691,6 +716,7 @@ 3.3.2.3.3.7190.0-91 3.3.0-databricks 3.3.2-databricks + 3.4.1-databricks 3.5.0 3.12.4 4.3.0 @@ -745,7 +771,8 @@ 321db, 330db, - 332db + 332db, + 341db