Skip to content

Commit

Permalink
Support float case of format_number with format_float kernel (#9790)
Browse files Browse the repository at this point in the history
* Use format_float kernel

Signed-off-by: Haoyang Li <[email protected]>

* Add tests and doc

Signed-off-by: Haoyang Li <[email protected]>

* use new name from jni change

Signed-off-by: Haoyang Li <[email protected]>

* move inf/nan replacement to kernel

Signed-off-by: Haoyang Li <[email protected]>

* claen up

Signed-off-by: Haoyang Li <[email protected]>

* Address comments

Signed-off-by: Haoyang Li <[email protected]>

---------

Signed-off-by: Haoyang Li <[email protected]>
  • Loading branch information
thirtiseven authored Jan 9, 2024
1 parent 328a514 commit af91522
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 493 deletions.
2 changes: 1 addition & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Name | Description | Default Value | Applicable at
<a name="sql.format.parquet.reader.type"></a>spark.rapids.sql.format.parquet.reader.type|Sets the Parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO|Runtime
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true|Runtime
<a name="sql.format.parquet.writer.int96.enabled"></a>spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true|Runtime
<a name="sql.formatNumberFloat.enabled"></a>spark.rapids.sql.formatNumberFloat.enabled|format_number with floating point types on the GPU returns results that have a different precision than the default results of Spark.|false|Runtime
<a name="sql.formatNumberFloat.enabled"></a>spark.rapids.sql.formatNumberFloat.enabled|format_number with floating point types on the GPU returns results that have a different precision than the default results of Spark.|true|Runtime
<a name="sql.hasExtendedYearValues"></a>spark.rapids.sql.hasExtendedYearValues|Spark 3.2.0+ extended parsing of years in dates and timestamps to support the full range of possible values. Prior to this it was limited to a positive 4 digit year. The Accelerator does not support the extended range yet. This config indicates if your data includes this extended range or not, or if you don't care about getting the correct values on values with the extended range.|true|Runtime
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false|Runtime
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows.|true|Runtime
Expand Down
8 changes: 4 additions & 4 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -713,13 +713,13 @@ to `false`.

The Rapids Accelerator for Apache Spark uses uses a method based on [ryu](https://github.com/ulfjack/ryu) when converting floating point data type to string. As a result the computed string can differ from the output of Spark in some cases: sometimes the output is shorter (which is arguably more accurate) and sometimes the output may differ in the precise digits output.

The `format_number` function will retain 10 digits of precision for the GPU when the input is a floating
point number, but Spark will retain up to 17 digits of precision, i.e. `format_number(1234567890.1234567890, 5)`
will return `1,234,567,890.00000` on the GPU and `1,234,567,890.12346` on the CPU. To enable this on the GPU, set [`spark.rapids.sql.formatNumberFloat.enabled`](additional-functionality/advanced_configs.md#sql.formatNumberFloat.enabled) to `true`.

This configuration is enabled by default. To disable this operation on the GPU set
[`spark.rapids.sql.castFloatToString.enabled`](additional-functionality/advanced_configs.md#sql.castFloatToString.enabled) to `false`.

The `format_number` function also uses [ryu](https://github.com/ulfjack/ryu) as the solution when formatting floating-point data types to
strings, so results may differ from Spark in the same way. To disable this on the GPU, set
[`spark.rapids.sql.formatNumberFloat.enabled`](additional-functionality/advanced_configs.md#sql.formatNumberFloat.enabled) to `false`.

### String to Float

Casting from string to floating-point types on the GPU returns incorrect results when the string
Expand Down
52 changes: 31 additions & 21 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -830,25 +830,35 @@ def test_format_number_supported(data_gen):
'format_number(a, 100)')
)

float_format_number_conf = {'spark.rapids.sql.formatNumberFloat.enabled': 'true'}
format_number_float_gens = [DoubleGen(min_exp=-300, max_exp=15)]
format_float_special_vals = [float('nan'), float('inf'), float('-inf'), 0.0, -0.0,
1.1234543, 0.0000152, 0.0000252, 0.999999, 999990.0,
0.001234, 0.00000078, 7654321.1234567]

@pytest.mark.parametrize('data_gen', format_number_float_gens, ids=idfn)
def test_format_number_float_limited(data_gen):
@pytest.mark.parametrize('data_gen', [SetValuesGen(FloatType(), format_float_special_vals),
SetValuesGen(DoubleType(), format_float_special_vals)], ids=idfn)
def test_format_number_float_special(data_gen):
gen = data_gen
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'format_number(a, 5)'),
conf = float_format_number_conf
)

# format_number for float/double is disabled by default due to compatibility issue
# GPU will generate result with less precision than CPU
@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn)
def test_format_number_float_fallback(data_gen):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'format_number(a, 5)'),
'FormatNumber'
)
cpu_results = with_cpu_session(lambda spark: unary_op_df(spark, gen).selectExpr(
'format_number(a, 5)').collect())
gpu_results = with_gpu_session(lambda spark: unary_op_df(spark, gen).selectExpr(
'format_number(a, 5)').collect())
for cpu, gpu in zip(cpu_results, gpu_results):
assert cpu[0] == gpu[0]

def test_format_number_double_value():
data_gen = DoubleGen(nullable=False, no_nans=True)
cpu_results = list(map(lambda x: float(x[0].replace(",", "")), with_cpu_session(
lambda spark: unary_op_df(spark, data_gen).selectExpr('format_number(a, 5)').collect())))
gpu_results = list(map(lambda x: float(x[0].replace(",", "")), with_gpu_session(
lambda spark: unary_op_df(spark, data_gen).selectExpr('format_number(a, 5)').collect())))
for cpu, gpu in zip(cpu_results, gpu_results):
assert math.isclose(cpu, gpu, abs_tol=1.1e-5)

def test_format_number_float_value():
data_gen = FloatGen(nullable=False, no_nans=True)
cpu_results = list(map(lambda x: float(x[0].replace(",", "")), with_cpu_session(
lambda spark: unary_op_df(spark, data_gen).selectExpr('format_number(a, 5)').collect())))
gpu_results = list(map(lambda x: float(x[0].replace(",", "")), with_gpu_session(
lambda spark: unary_op_df(spark, data_gen).selectExpr('format_number(a, 5)').collect())))
for cpu, gpu in zip(cpu_results, gpu_results):
assert math.isclose(cpu, gpu, rel_tol=1e-7) or math.isclose(cpu, gpu, abs_tol=1.1e-5)
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.doc("format_number with floating point types on the GPU returns results that have " +
"a different precision than the default results of Spark.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES =
conf("spark.rapids.sql.castFloatToIntegralTypes.enabled")
Expand Down
Loading

0 comments on commit af91522

Please sign in to comment.