From f86fba68bdfe51f8679f1044793c0dd15d3190e3 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Mon, 10 May 2021 20:44:33 -0400 Subject: [PATCH 1/5] Changes implementation Clean solution --- .../spark/sql/catalyst/expressions/hash.scala | 20 +++++++++++--- .../expressions/HashExpressionsSuite.scala | 10 +++++++ .../execution/WholeStageCodegenSuite.scala | 26 +++++++++++++++++++ 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index f3a82743182a9..3a7986e0bd515 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -369,11 +369,21 @@ abstract class HashExpression[E] extends Expression { protected def genHashBoolean(input: String, result: String): String = genHashInt(s"$input ? 1 : 0", result) - protected def genHashFloat(input: String, result: String): String = - genHashInt(s"Float.floatToIntBits($input)", result) + protected def genHashFloat(input: String, result: String): String = { + s"if(Float.floatToIntBits($input) == Float.floatToIntBits(-0.0f)) {" + + genHashInt(s"Float.floatToIntBits(0.0f)", result) + + "}else{" + + genHashInt(s"Float.floatToIntBits($input)", result) + + "}" + } - protected def genHashDouble(input: String, result: String): String = - genHashLong(s"Double.doubleToLongBits($input)", result) + protected def genHashDouble(input: String, result: String): String = { + s"if(Double.doubleToLongBits($input) == Double.doubleToLongBits(-0.0d)) {" + + genHashLong(s"Double.doubleToLongBits(0.0d)", result) + + "}else{" + + genHashLong(s"Double.doubleToLongBits($input)", result) + + "}" + } protected def genHashDecimal( ctx: CodegenContext, @@ -523,7 +533,9 @@ abstract class InterpretedHashFunction { case s: Short => hashInt(s, seed) case i: Int => hashInt(i, seed) case l: Long => hashLong(l, seed) + case f: Float if (f == -0.0f) => hashInt(java.lang.Float.floatToIntBits(0.0f), seed) case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed) + case d: Double if (d == -0.0d) => hashLong(java.lang.Double.doubleToLongBits(0.0d), seed) case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed) case d: Decimal => val precision = dataType.asInstanceOf[DecimalType].precision diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index 858d8f78bef67..8ac011babf1fa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -708,6 +708,16 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(HiveHash(Seq(yearMonth)), 1234) } + test("SPARK-35207: Compute hash consistent between -0.0 and 0.0") { + def checkResult(exprs1: Expression, exprs2: Expression): Unit = { + assert(Murmur3Hash(Seq(exprs1), 42).eval() == Murmur3Hash(Seq(exprs2), 42).eval()) + assert(XxHash64(Seq(exprs1), 42).eval() == XxHash64(Seq(exprs2), 42).eval()) + assert(HiveHash(Seq(exprs1)).eval() == HiveHash(Seq(exprs2)).eval()) + } + checkResult(Literal.create(0D, DoubleType), Literal.create(-0D, DoubleType)) + checkResult(Literal.create(0L, LongType), Literal.create(-0L, LongType)) + } + private def testHash(inputSchema: StructType): Unit = { val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get val toRow = RowEncoder(inputSchema).createSerializer() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index b66308c4f880f..dc34ebc777a43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -654,4 +654,30 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession } } } + + test("SPARK-35207: Compute hash consistent between -0.0 and 0.0 doubles with Codegen") { + val data = Seq((0.0d, -1.0d, 1.0d)) + withTempPath { dir => + val path = dir.getCanonicalPath + data.toDF("col1", "col2", "col3").write.parquet(path) + sql(s"create table testHash(col1 double, col2 double, col3 double) " + + s"using parquet location '$path'") + sql("select hash(col1 / col2) == hash(col1 / col3) from testHash").collect() + .foreach(row => assert(row.getBoolean(0) == true)) + sql("drop table testHash") + } + } + + test("SPARK-35207: Compute hash consistent between -0.0 and 0.0 floats with Codegen") { + val data = Seq((0.0f, -1.0f, 1.0f)) + withTempPath { dir => + val path = dir.getCanonicalPath + data.toDF("col1", "col2", "col3").write.parquet(path) + sql(s"create table testHash(col1 float, col2 float, col3 float) " + + s"using parquet location '$path'") + sql("select hash(col1 / col2) == hash(col1 / col3) from testHash").collect() + .foreach(row => assert(row.getBoolean(0) == true)) + sql("drop table testHash") + } + } } From 641629d87f7ad262f4f06d2d3dcbb9432d8c1a4a Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Wed, 12 May 2021 17:39:46 -0400 Subject: [PATCH 2/5] Style corrections --- .../spark/sql/catalyst/expressions/hash.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 3a7986e0bd515..94b990502d5df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -370,19 +370,23 @@ abstract class HashExpression[E] extends Expression { genHashInt(s"$input ? 1 : 0", result) protected def genHashFloat(input: String, result: String): String = { - s"if(Float.floatToIntBits($input) == Float.floatToIntBits(-0.0f)) {" + - genHashInt(s"Float.floatToIntBits(0.0f)", result) + - "}else{" + - genHashInt(s"Float.floatToIntBits($input)", result) + - "}" + s""" + |if(Float.floatToIntBits($input) == Float.floatToIntBits(-0.0f)) { + | ${genHashInt(s"Float.floatToIntBits(0.0f)", result)} + |} else { + | ${genHashInt(s"Float.floatToIntBits($input)", result)} + |} + """.stripMargin } protected def genHashDouble(input: String, result: String): String = { - s"if(Double.doubleToLongBits($input) == Double.doubleToLongBits(-0.0d)) {" + - genHashLong(s"Double.doubleToLongBits(0.0d)", result) + - "}else{" + - genHashLong(s"Double.doubleToLongBits($input)", result) + - "}" + s""" + |if(Double.doubleToLongBits($input) == Double.doubleToLongBits(-0.0d)) { + | ${genHashLong(s"Double.doubleToLongBits(0.0d)", result)} + |} else { + | ${genHashLong(s"Double.doubleToLongBits($input)", result)} + |} + """.stripMargin } protected def genHashDecimal( From 1de9c3d6f15fe230b61fc30dee6c539248f0e403 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Thu, 13 May 2021 16:34:25 -0400 Subject: [PATCH 3/5] Improve solution --- .../spark/sql/catalyst/expressions/hash.scala | 4 +-- .../expressions/HashExpressionsSuite.scala | 11 ++++---- .../execution/WholeStageCodegenSuite.scala | 26 ------------------- 3 files changed, 8 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 94b990502d5df..d90022b3cd243 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -371,7 +371,7 @@ abstract class HashExpression[E] extends Expression { protected def genHashFloat(input: String, result: String): String = { s""" - |if(Float.floatToIntBits($input) == Float.floatToIntBits(-0.0f)) { + |if($input == -0.0f) { | ${genHashInt(s"Float.floatToIntBits(0.0f)", result)} |} else { | ${genHashInt(s"Float.floatToIntBits($input)", result)} @@ -381,7 +381,7 @@ abstract class HashExpression[E] extends Expression { protected def genHashDouble(input: String, result: String): String = { s""" - |if(Double.doubleToLongBits($input) == Double.doubleToLongBits(-0.0d)) { + |if($input == -0.0d) { | ${genHashLong(s"Double.doubleToLongBits(0.0d)", result)} |} else { | ${genHashLong(s"Double.doubleToLongBits($input)", result)} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index 8ac011babf1fa..bd981d1633aa6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -710,12 +710,13 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-35207: Compute hash consistent between -0.0 and 0.0") { def checkResult(exprs1: Expression, exprs2: Expression): Unit = { - assert(Murmur3Hash(Seq(exprs1), 42).eval() == Murmur3Hash(Seq(exprs2), 42).eval()) - assert(XxHash64(Seq(exprs1), 42).eval() == XxHash64(Seq(exprs2), 42).eval()) - assert(HiveHash(Seq(exprs1)).eval() == HiveHash(Seq(exprs2)).eval()) + checkEvaluation(Murmur3Hash(Seq(exprs1), 42), Murmur3Hash(Seq(exprs2), 42).eval()) + checkEvaluation(XxHash64(Seq(exprs1), 42), XxHash64(Seq(exprs2), 42).eval()) + checkEvaluation(HiveHash(Seq(exprs1)), HiveHash(Seq(exprs2)).eval()) } - checkResult(Literal.create(0D, DoubleType), Literal.create(-0D, DoubleType)) - checkResult(Literal.create(0L, LongType), Literal.create(-0L, LongType)) + + checkResult(Literal.create(-0D, DoubleType), Literal.create(0D, DoubleType)) + checkResult(Literal.create(-0F, FloatType), Literal.create(0F, FloatType)) } private def testHash(inputSchema: StructType): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index b785068f9a874..f019e34b60118 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -708,30 +708,4 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession } } } - - test("SPARK-35207: Compute hash consistent between -0.0 and 0.0 doubles with Codegen") { - val data = Seq((0.0d, -1.0d, 1.0d)) - withTempPath { dir => - val path = dir.getCanonicalPath - data.toDF("col1", "col2", "col3").write.parquet(path) - sql(s"create table testHash(col1 double, col2 double, col3 double) " + - s"using parquet location '$path'") - sql("select hash(col1 / col2) == hash(col1 / col3) from testHash").collect() - .foreach(row => assert(row.getBoolean(0) == true)) - sql("drop table testHash") - } - } - - test("SPARK-35207: Compute hash consistent between -0.0 and 0.0 floats with Codegen") { - val data = Seq((0.0f, -1.0f, 1.0f)) - withTempPath { dir => - val path = dir.getCanonicalPath - data.toDF("col1", "col2", "col3").write.parquet(path) - sql(s"create table testHash(col1 float, col2 float, col3 float) " + - s"using parquet location '$path'") - sql("select hash(col1 / col2) == hash(col1 / col3) from testHash").collect() - .foreach(row => assert(row.getBoolean(0) == true)) - sql("drop table testHash") - } - } } From 869ae7ca48a97a4ff3c75aad70d35fb25772e560 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Thu, 13 May 2021 20:22:41 -0400 Subject: [PATCH 4/5] Update migration guide --- docs/sql-migration-guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 2ce42bc5a5d45..ff2ad0464e75b 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -87,6 +87,8 @@ license: | - In Spark 3.2, Spark supports `DayTimeIntervalType` and `YearMonthIntervalType` as inputs and outputs of `TRANSFORM` clause in Hive `SERDE` mode, the behavior is different between Hive `SERDE` mode and `ROW FORMAT DELIMITED` mode when these two types are used as inputs. In Hive `SERDE` mode, `DayTimeIntervalType` column is converted to `HiveIntervalDayTime`, its string format is `[-]?d h:m:s.n`, but in `ROW FORMAT DELIMITED` mode the format is `INTERVAL '[-]?d h:m:s.n' DAY TO TIME`. In Hive `SERDE` mode, `YearMonthIntervalType` column is converted to `HiveIntervalYearMonth`, its string format is `[-]?y-m`, but in `ROW FORMAT DELIMITED` mode the format is `INTERVAL '[-]?y-m' YEAR TO MONTH`. + - In Spark 3.2, `hash(0) == hash(-0)` for floating point types. Previously, different values were generated. + ## Upgrading from Spark SQL 3.0 to 3.1 - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`. From c123599da608adf219304b1409279abfd38b114c Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Thu, 13 May 2021 20:23:23 -0400 Subject: [PATCH 5/5] Simplify expressions --- .../org/apache/spark/sql/catalyst/expressions/hash.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index d90022b3cd243..65e7714a3d8ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -372,7 +372,7 @@ abstract class HashExpression[E] extends Expression { protected def genHashFloat(input: String, result: String): String = { s""" |if($input == -0.0f) { - | ${genHashInt(s"Float.floatToIntBits(0.0f)", result)} + | ${genHashInt("0", result)} |} else { | ${genHashInt(s"Float.floatToIntBits($input)", result)} |} @@ -382,7 +382,7 @@ abstract class HashExpression[E] extends Expression { protected def genHashDouble(input: String, result: String): String = { s""" |if($input == -0.0d) { - | ${genHashLong(s"Double.doubleToLongBits(0.0d)", result)} + | ${genHashLong("0L", result)} |} else { | ${genHashLong(s"Double.doubleToLongBits($input)", result)} |} @@ -537,9 +537,9 @@ abstract class InterpretedHashFunction { case s: Short => hashInt(s, seed) case i: Int => hashInt(i, seed) case l: Long => hashLong(l, seed) - case f: Float if (f == -0.0f) => hashInt(java.lang.Float.floatToIntBits(0.0f), seed) + case f: Float if (f == -0.0f) => hashInt(0, seed) case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed) - case d: Double if (d == -0.0d) => hashLong(java.lang.Double.doubleToLongBits(0.0d), seed) + case d: Double if (d == -0.0d) => hashLong(0L, seed) case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed) case d: Decimal => val precision = dataType.asInstanceOf[DecimalType].precision