Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Casting FLOAT64 to DECIMAL(12,7) produces different rows from Apache Spark CPU #9682

Closed
abellina opened this issue Nov 13, 2023 · 13 comments · Fixed by #10917
Closed
Labels
bug Something isn't working

Comments

@abellina
Copy link
Collaborator

abellina commented Nov 13, 2023

Update:

This bug was originally filed with the title:

[BUG] test_window_aggs_for_rows fails with rounding errors with DATAGEN_SEED=1698940723

It has since been established that the problem does not lie in window functions, or aggregations. The problem is with casting float64 to decimal, producing rounding errors.

Repro:

Seq(3527.61953125).toDF("d").repartition(1).selectExpr("CAST(d AS DECIMAL(12,7)) as_decimal").show

This produces different results on CPU and GPU.
On CPU:

+------------+
|  as_decimal|
+------------+
|3527.6195313|
+------------+

On GPU:

+------------+
|  as_decimal|
+------------+
|3527.6195312|
+------------+

The old description continues below:

test_window_aggs_for_rows fails with DATAGEN_SEED=1698940723.

Repro:

SPARK_RAPIDS_TEST_DATAGEN_SEED=1698940723 ./run_pyspark_from_build.sh -k test_window_aggs_for_row\ and\ RepeatSeq\ and\ Integer\ and\ Decimal
 [gw4]^[[36m [ 96%] ^[[0m^[[31mFAILED^[[0m ../../src/main/python/window_function_test.py::test_window_aggs_for_rows[[('a', RepeatSeq(Long)), ('b', Integer), ('c', Decimal(8,3))]-1000][DATAGEN_SEED=1698940723, IGNORE_ORDER({'local': True})]
[gw4]^[[36m [ 96%] ^[[0m^[[31mFAILED^[[0m ../../src/main/python/window_function_test.py::test_window_aggs_for_rows[[('a', RepeatSeq(Long)), ('b', Integer), ('c', Decimal(8,3))]-1g][DATAGEN_SEED=1698940723, IGNORE_ORDER({'local': True})
[2023-11-02T17:07:56.063Z]  Row(sum_c_asc=Decimal('-231179.895'), max_c_desc=Decimal('-48178.972'), min_c_asc=Decimal('-99999.999'), count_1=103, count_c=97, avg_c=Decimal('-9080.9564433'), rank_val=57, dense_rank_val=57, percent_rank_val=0.5490196078431373, row_num=57)
[2023-11-02T17:07:56.064Z]  Row(sum_c_asc=Decimal('-227954.458'), max_c_desc=Decimal('89948.943'), min_c_asc=Decimal('-95651.987'), count_1=103, count_c=98, avg_c=Decimal('-598.2695306'), rank_val=82, dense_rank_val=82, percent_rank_val=0.7941176470588235, row_num=82)
[2023-11-02T17:07:56.064Z]  Row(sum_c_asc=Decimal('-223873.440'), max_c_desc=Decimal('48971.237'), min_c_asc=Decimal('-86568.757'), count_1=102, count_c=94, avg_c=Decimal('9590.8501170'), rank_val=88, dense_rank_val=88, percent_rank_val=0.8613861386138614, row_num=88)
[2023-11-02T17:07:56.064Z] -Row(sum_c_asc=Decimal('-220416.559'), max_c_desc=Decimal('34383.780'), min_c_asc=Decimal('-67119.262'), count_1=102, count_c=96, avg_c=Decimal('3527.6195313'), rank_val=84, dense_rank_val=84, percent_rank_val=0.8217821782178217, row_num=84)
[2023-11-02T17:07:56.064Z] +Row(sum_c_asc=Decimal('-220416.559'), max_c_desc=Decimal('34383.780'), min_c_asc=Decimal('-67119.262'), count_1=102, count_c=96, avg_c=Decimal('3527.6195312'), rank_val=84, dense_rank_val=84, percent_rank_val=0.8217821782178217, row_num=84)
[2023-11-02T17:07:56.064Z]  Row(sum_c_asc=Decimal('-220028.169'), max_c_desc=Decimal('2270.621'), min_c_asc=Decimal('-91351.118'), count_1=102, count_c=97, avg_c=Decimal('-4903.2057010'), rank_val=47, dense_rank_val=47, percent_rank_val=0.45544554455445546, row_num=47)
[2023-11-02T17:07:56.064Z]  Row(sum_c_asc=Decimal('-219560.674'), max_c_desc=Decimal('-32132.633'), min_c_asc=Decimal('-95941.789'), count_1=102, count_c=95, avg_c=Decimal('-1184.0629789'), rank_val=4, dense_rank_val=4, percent_rank_val=0.0297029702970297, row_num=4)
[2023-11-02T17:07:56.064Z]  Row(sum_c_asc=Decimal('-219399.848'), max_c_desc=Decimal('43804.921'), min_c_
@abellina abellina added bug Something isn't working ? - Needs Triage Need team to review and classify labels Nov 13, 2023
@abellina abellina changed the title [BUG] [BUG] test_window_aggs_for_rows fails with rounding errors Nov 13, 2023
@abellina abellina changed the title [BUG] test_window_aggs_for_rows fails with rounding errors [BUG] test_window_aggs_for_rows fails with rounding errors with DATAGEN_SEED=1698940723 Nov 13, 2023
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Nov 14, 2023
@mythrocks
Copy link
Collaborator

mythrocks commented Nov 22, 2023

Attached herewith is a zipped Parquet file with 102 rows in a single Decimal(8,3) column.

Taking the window functions out of the equation, one sees that running AVG() produces slightly different results on Apache Spark and the plugin:

// On Spark.
scala> spark.read.parquet("/tmp/decimals_avg.parquet").select( expr("avg(c)") ).show
+------------+
|      avg(c)|
+------------+
|3527.6195313|
+------------+

// On the plugin:
+------------+
|      avg(c)|
+------------+
|3527.6195312|
+------------+

The behaviour seems to be consistent on Spark 3.4.2 3.4.1 and Spark 3.2.3. The plugin result is off by 0.000001.

@mythrocks
Copy link
Collaborator

I have filed rapidsai/cudf#14507 to track the CUDF side of this.

I was able to repro this on CUDF by writing the input as DECIMAL(12,7) to Parquet, and then running the MEAN aggregation on it. The bug I filed has the repro details.

@mythrocks
Copy link
Collaborator

A couple of other findings. I tried querying SUM, COUNT, AVG, etc. as follows:

select sum(c),  count(c), sum(c)/count(c), avg(c), cast(avg(c) as DECIMAL(12,8)) , cast(sum(c)/count(c) as decimal(12,7)) from foobar 

On CPU, those results tally up:

+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|sum(c)    |count(c)|(sum(c) / count(c))         |avg(c)      |CAST(avg(c) AS DECIMAL(12,8))|CAST((sum(c) / count(c)) AS DECIMAL(12,7))|
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|338651.475|96      |3527.61953125000000000000000|3527.6195313|3527.61953130                |3527.6195313                              |
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+

Here's what one finds on GPU:

+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|sum(c)    |count(c)|(sum(c) / count(c))         |avg(c)      |CAST(avg(c) AS DECIMAL(12,8))|CAST((sum(c) / count(c)) AS DECIMAL(12,7))|
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+
|338651.475|96      |3527.61953125000000000000000|3527.6195312|3527.61953120                |3527.6195313                              |
+----------+--------+----------------------------+------------+-----------------------------+------------------------------------------+

@mythrocks
Copy link
Collaborator

mythrocks commented Nov 30, 2023

There were some red herrings in investigating this bug.

First off, I have closed the CUDF bug (rapidsai/cudf#14507) I raised for this. CUDF is not at fault; it consistently truncates additional decimal digits.

It looked like this might have to do with GpuDecimalAverage and GpuDecimalDivide, but those operators are not involved. Consider this query:

SELECT avg(c) FROM foobar; -- c is a DECIMAL(8,3).

The execution plan indicates GpuDecimalAverage isn't invoked at all. The operation is done completely in integer/double, and the result is cast to DECIMAL(12,7):

== Optimized Logical Plan ==Aggregate [cast((avg(UnscaledValue(c#9)) / 1000.0) as decimal(12,7)) AS avg(c)#888]
+- Relation [c#9] parquet

== Physical Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[], functions=[avg(UnscaledValue(c#9), DoubleType)], output=[avg(c)#888])
   +- GpuShuffleCoalesce 1073741824
      +- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [plan_id=1819]
         +- GpuHashAggregate(keys=[], functions=[partial_avg(UnscaledValue(c#9), DoubleType)], output=[sum#892, count#893L])
            +- GpuFileGpuScan parquet [c#9] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/decimals_avg.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:decimal(8,3)>

The avg() is computed on the unscaled decimal reps, and the average is divided by 1000.0. The result is cast to DECIMAL(12,7).

The avg() and the divide produce the same results on CPU and GPU. So this amounts to a casting problem.

Here's the simplest repro for the problem:

Seq(3527.61953125).toDF("d").repartition(1).selectExpr("d", "CAST(d AS DECIMAL(12,7))").show

On CPU:

+-------------+---------------+
|            d|as_decimal_12_7|
+-------------+---------------+
|3527.61953125|   3527.6195313|
+-------------+---------------+

On GPU:

+-----------+---------------+
|          d|as_decimal_12_7|
+-----------+---------------+
|3527.619531|   3527.6195312|
+-----------+---------------+

(Ignore how d is printed as 3527.619531 on GPU. I think that's just a string/display formatting issue. The correct/complete value is written if serialized to file.)

All mention of window functions, aggregation, GpuDecimalAverage and everything else is a distraction.

@revans2
Copy link
Collaborator

revans2 commented Nov 30, 2023

This is a performance optimization in Spark that is only supposed to happen when the value would not be impacted by potential floating point issues.

https://github.com/apache/spark/blob/9bb358b51e30b5041c0cd20e27cf995aca5ed4c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2110-L2142

So if the precision is less than 15. 15 requires 50 bits to store it and a double has 52 bits in the significant section so the result should produce the correct answer without any possibility of errors.

https://en.wikipedia.org/wiki/Double-precision_floating-point_format

So if we are getting the wrong answer back, then the problem is some where in the computation that the average was replaced with.

@revans2
Copy link
Collaborator

revans2 commented Nov 30, 2023

I am remembering more now. Converting a double to a decimal value has problems because they do it by going from a double to a string to a decimal. This is inherent in how scala does it in their BigDecimal class, and it is even a bit of magic with an implicit method that just makes it happen behind the scenes. But going from a Double to a String we cannot match what java does. It is not standard which is why we have spark.rapids.sql.castStringToFloat.enabled, which is off by default. Now I really want to understand what would happen if this optimization is disabled and what the result really would be. Is Spark right and we are wrong, or is Spark wrong and we are right?

@revans2
Copy link
Collaborator

revans2 commented Nov 30, 2023

So java does odd things when interpreting floating point values compared to the rest of the world. They try to fix the problem that some decimal values cannot be represented as floating point values.

https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#toString-double-

https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#valueOf-java.lang.String-

They are self consistent, but it is not standard. The number we are trying to convert is one of them that cannot be accurately represented as a double.

https://binaryconvert.com/result_double.html?decimal=051053050055046054049057053051049050053

So technically the Spark performance optimization is wrong in the general case. But I think how Java/Scala convert double to Strings and in turn decimal values "fixes" it.

So there are two options that we have to fix the problem ourselves. We either undo the optimization and just to the average on Decimal values, or we find a way to replicate what Java is doing. None of these are simple.

In the case of a Window operation it is not that hard to undo the optimization because it is self contained in a single exec. We can do the pattern matching see the UnscaledValue(e) being manipulated. But for a hash aggregation or a reduction it gets to be much harder. Especially if the optimization later went through other transformations related to distinct/etc it could get to be really hard to detect and undo this. We might be able to just find the final Divide by a constant followed by a cast to a Decimal and try to rewrite that part. Just because we get the rest of it right. That might be the simplest way to make this work.

Matching java code is really difficult because it is GPL Licensed so we cannot copy or even read it and try to apply it.

I think if we can try and detect the case of cast(Long/Double Scalar that is 10 ^ x as Decimal(SOMETHING, X + 4) and replace it with `CAST(LONG as Decimal(19,2))/DecimalScalar(10^X) and then have the divide output the desired precision and scale like we do for other decimal average conversions, then we are good.

@revans2
Copy link
Collaborator

revans2 commented Nov 30, 2023

But before we do any of that I want to be sure that we know what the original input long was before the divide happened and what the double was that we are dividing? I am assuming that it was (352761953125 / 10 ^ 8)

@mythrocks
Copy link
Collaborator

know what the original input long was before the divide happened and what the double was that we are dividing? I am assuming that it was (352761953125/ 10 ^ 8)

Not exactly. The result of the average (of the unscaled decimals) was 3527619.53125L. That was then divided by 1000.0L to rescale it.

@mythrocks
Copy link
Collaborator

mythrocks commented Nov 30, 2023

I can confirm here that GpuCast::castFloatsToDecimal() seems to be the one producing the differing output:

    // Approach to minimize difference between CPUCast and GPUCast:
    // step 1. cast input to FLOAT64 (if necessary)
    // step 2. cast FLOAT64 to container DECIMAL (who keeps one more digit for rounding)
    // step 3. perform HALF_UP rounding on container DECIMAL
    val checkedInput = withResource(input.castTo(DType.FLOAT64)) { double =>
      val roundedDouble = double.round(dt.scale, cudf.RoundMode.HALF_UP)
      withResource(roundedDouble) { rounded =>
      // ...
      }
    }

The second step (i.e. after ensuring the input is FLOAT64) is to round(RoundMode.HALF_UP). This causes the following change in the input row:

3527.61953125 -> 3527.6195312

The (final) CPU output for this row is 3527.6195313.

@mythrocks mythrocks changed the title [BUG] test_window_aggs_for_rows fails with rounding errors with DATAGEN_SEED=1698940723 [BUG] Casting FLOAT64 to DECIMAL(12,7) produces different rows from Apache Spark CPU Nov 30, 2023
@mythrocks mythrocks removed their assignment Feb 21, 2024
@mythrocks
Copy link
Collaborator

I've relinquished ownership on this bug. I'm not actively working on this one.

@thirtiseven
Copy link
Collaborator

I am remembering more now. Converting a double to a decimal value has problems because they do it by going from a double to a string to a decimal. This is inherent in how scala does it in their BigDecimal class, and it is even a bit of magic with an implicit method that just makes it happen behind the scenes. But going from a Double to a String we cannot match what java does. It is not standard which is why we have spark.rapids.sql.castStringToFloat.enabled, which is off by default. Now I really want to understand what would happen if this optimization is disabled and what the result really would be. Is Spark right and we are wrong, or is Spark wrong and we are right?

Since we have an almost match float to string kernel in jni, does that means we can also almost match float to decimal easily by follow Spark's float => string => decimal way?

@revans2
Copy link
Collaborator

revans2 commented May 23, 2024

@thirtiseven yes that is one possibility, but again it is almost match. That is up to management about how close is good enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants