Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable collect_list and collect_set for window by default #10132

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ Name | Description | Default Value | Applicable at
<a name="sql.suppressPlanningFailure"></a>spark.rapids.sql.suppressPlanningFailure|Option to fallback an individual query to CPU if an unexpected condition prevents the query plan from being converted to a GPU-enabled one. Note this is different from a normal CPU fallback for a yet-to-be-supported Spark SQL feature. If this happens the error should be reported and investigated as a GitHub issue.|false|Runtime
<a name="sql.variableFloatAgg.enabled"></a>spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|true|Runtime
<a name="sql.window.batched.bounded.row.max"></a>spark.rapids.sql.window.batched.bounded.row.max|Max value for bounded row window preceding/following extents permissible for the window to be evaluated in batched mode. This value affects both the preceding and following bounds, potentially doubling the window size permitted for batched execution|100|Runtime
<a name="sql.window.collectList.enabled"></a>spark.rapids.sql.window.collectList.enabled|The output size of collect list for a window operation is proportional to the window size squared. The current GPU implementation does not handle this well and is disabled by default. If you know that your window size is very small you can try to enable it|false|Runtime
<a name="sql.window.collectSet.enabled"></a>spark.rapids.sql.window.collectSet.enabled|The output size of collect set for a window operation can be proportional to the window size squared. The current GPU implementation does not handle this well and is disabled by default. If you know that your window size is very small you can try to enable it|false|Runtime
<a name="sql.window.range.byte.enabled"></a>spark.rapids.sql.window.range.byte.enabled|When the order-by column of a range based window is byte type and the range boundary calculated for a value has overflow, CPU and GPU will get the different results. When set to false disables the range window acceleration for the byte type order-by column|false|Runtime
<a name="sql.window.range.decimal.enabled"></a>spark.rapids.sql.window.range.decimal.enabled|When set to false, this disables the range window acceleration for the DECIMAL type order-by column|true|Runtime
<a name="sql.window.range.double.enabled"></a>spark.rapids.sql.window.range.double.enabled|When set to false, this disables the range window acceleration for the double type order-by column|true|Runtime
Expand Down
12 changes: 6 additions & 6 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -16891,7 +16891,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>window operations are disabled by default due to extreme memory usage;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -16934,7 +16934,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>window operations are disabled by default due to extreme memory usage;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -16977,7 +16977,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>window operations are disabled by default due to extreme memory usage;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -17050,7 +17050,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>window operations are disabled by default due to extreme memory usage;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -17093,7 +17093,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>window operations are disabled by default due to extreme memory usage;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -17136,7 +17136,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>window operations are disabled by default due to extreme memory usage;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
18 changes: 13 additions & 5 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,8 @@ def test_window_aggs_for_rows_collect_list():
collect_list(c_map) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_map
from window_collect_table
''')
''',
conf={'spark.rapids.sql.window.collectList.enabled': True})


# SortExec does not support array type, so sort the result locally.
Expand Down Expand Up @@ -1343,7 +1344,8 @@ def test_running_window_function_exec_for_all_aggs():
collect_list(c_struct) over
(partition by a order by b,c_int rows between UNBOUNDED PRECEDING AND CURRENT ROW) as collect_struct
from window_collect_table
''')
''',
conf={'spark.rapids.sql.window.collectList.enabled': True})

# Test the Databricks WindowExec which combines a WindowExec with a ProjectExec and provides the output
# fields that we need to handle with an extra GpuProjectExec and we need the input expressions to compute
Expand Down Expand Up @@ -1471,7 +1473,8 @@ def test_window_aggs_for_rows_collect_set():
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_fp_nan
from window_collect_table
) t
''')
''',
conf={'spark.rapids.sql.window.collectSet.enabled': True})


# Note, using sort_array() on the CPU, because sort_array() does not yet
Expand Down Expand Up @@ -1500,6 +1503,7 @@ def test_window_aggs_for_rows_collect_set():
def test_window_aggs_for_rows_collect_set_nested_array():
conf = copy_and_update(_float_conf, {
"spark.rapids.sql.castFloatToString.enabled": "true",
'spark.rapids.sql.window.collectSet.enabled': "true"
})

def do_it(spark):
Expand Down Expand Up @@ -1740,7 +1744,9 @@ def test_to_date_with_window_functions():
"5 PRECEDING AND -2 FOLLOWING"], ids=idfn)
def test_window_aggs_for_negative_rows_partitioned(data_gen, batch_size, window_spec):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
'spark.rapids.sql.castFloatToDecimal.enabled': True,
'spark.rapids.sql.window.collectSet.enabled': True,
'spark.rapids.sql.window.collectList.enabled': True}
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
"window_agg_table",
Expand Down Expand Up @@ -1791,7 +1797,9 @@ def spark_bugs_in_decimal_sorting():
ids=idfn)
def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
'spark.rapids.sql.castFloatToDecimal.enabled': True,
'spark.rapids.sql.window.collectSet.enabled': True,
'spark.rapids.sql.window.collectList.enabled': True}

assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3382,13 +3382,24 @@ object GpuOverrides extends Logging {
"Collect a list of non-unique elements, not supported in reduction",
ExprChecks.fullAgg(
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.BINARY +
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP),
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP)
.withPsNote(TypeEnum.ARRAY, "window operations are disabled by default due " +
"to extreme memory usage"),
TypeSig.ARRAY.nested(TypeSig.all),
Seq(ParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.BINARY +
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectList](c, conf, p, r) {
override def tagAggForGpu(): Unit = {
if (context == WindowAggExprContext && !conf.isWindowCollectListEnabled) {
willNotWorkOnGpu("collect_list is disabled for window operations because " +
"the output explodes in size proportional to the window size. If you know " +
"the window is small you can try it by setting " +
s"${RapidsConf.ENABLE_WINDOW_COLLECT_LIST} to true")
}
}

override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuCollectList(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset)

Expand All @@ -3412,7 +3423,9 @@ object GpuOverrides extends Logging {
"Collect a set of unique elements, not supported in reduction",
ExprChecks.fullAgg(
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY),
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY)
.withPsNote(TypeEnum.ARRAY, "window operations are disabled by default due " +
"to extreme memory usage"),
TypeSig.ARRAY.nested(TypeSig.all),
Seq(ParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
Expand All @@ -3421,6 +3434,14 @@ object GpuOverrides extends Logging {
TypeSig.ARRAY).nested(),
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {
override def tagAggForGpu(): Unit = {
if (context == WindowAggExprContext && !conf.isWindowCollectSetEnabled) {
willNotWorkOnGpu("collect_set is disabled for window operations because " +
"the output can explode in size proportional to the window size. If you know " +
"the window is small you can try it by setting " +
s"${RapidsConf.ENABLE_WINDOW_COLLECT_SET} to true")
}
}

override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuCollectSet(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset)
Expand Down
20 changes: 20 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,22 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val ENABLE_WINDOW_COLLECT_LIST = conf("spark.rapids.sql.window.collectList.enabled")
.doc("The output size of collect list for a window operation is proportional to " +
"the window size squared. The current GPU implementation does not handle this well " +
"and is disabled by default. If you know that your window size is very small you " +
"can try to enable it")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"can try to enable it")
"can try to enable it.")

.booleanConf
.createWithDefault(false)

val ENABLE_WINDOW_COLLECT_SET = conf("spark.rapids.sql.window.collectSet.enabled")
.doc("The output size of collect set for a window operation can be proportional to " +
"the window size squared. The current GPU implementation does not handle this well " +
"and is disabled by default. If you know that your window size is very small you " +
"can try to enable it")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"can try to enable it")
"can try to enable it.")

.booleanConf
.createWithDefault(false)

val ENABLE_FLOAT_AGG = conf("spark.rapids.sql.variableFloatAgg.enabled")
.doc("Spark assumes that all operations produce the exact same result each time. " +
"This is not true for some floating point aggregations, which can produce slightly " +
Expand Down Expand Up @@ -2417,6 +2433,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val gpuTargetBatchSizeBytes: Long = get(GPU_BATCH_SIZE_BYTES)

lazy val isWindowCollectListEnabled: Boolean = get(ENABLE_WINDOW_COLLECT_LIST)

lazy val isWindowCollectSetEnabled: Boolean = get(ENABLE_WINDOW_COLLECT_SET)

lazy val isFloatAggEnabled: Boolean = get(ENABLE_FLOAT_AGG)

lazy val explain: String = get(EXPLAIN)
Expand Down