diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 0c4defb32ac..369f7373daa 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -142,6 +142,8 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.suppressPlanningFailure|Option to fallback an individual query to CPU if an unexpected condition prevents the query plan from being converted to a GPU-enabled one. Note this is different from a normal CPU fallback for a yet-to-be-supported Spark SQL feature. If this happens the error should be reported and investigated as a GitHub issue.|false|Runtime spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|true|Runtime spark.rapids.sql.window.batched.bounded.row.max|Max value for bounded row window preceding/following extents permissible for the window to be evaluated in batched mode. This value affects both the preceding and following bounds, potentially doubling the window size permitted for batched execution|100|Runtime +spark.rapids.sql.window.collectList.enabled|The output size of collect list for a window operation is proportional to the window size squared. The current GPU implementation does not handle this well and is disabled by default. If you know that your window size is very small you can try to enable it.|false|Runtime +spark.rapids.sql.window.collectSet.enabled|The output size of collect set for a window operation can be proportional to the window size squared. The current GPU implementation does not handle this well and is disabled by default. If you know that your window size is very small you can try to enable it.|false|Runtime spark.rapids.sql.window.range.byte.enabled|When the order-by column of a range based window is byte type and the range boundary calculated for a value has overflow, CPU and GPU will get the different results. When set to false disables the range window acceleration for the byte type order-by column|false|Runtime spark.rapids.sql.window.range.decimal.enabled|When set to false, this disables the range window acceleration for the DECIMAL type order-by column|true|Runtime spark.rapids.sql.window.range.double.enabled|When set to false, this disables the range window acceleration for the double type order-by column|true|Runtime diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 8f30cc77f59..164f6b672dd 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -16891,7 +16891,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
+PS
window operations are disabled by default due to extreme memory usage;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
@@ -16934,7 +16934,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
+PS
window operations are disabled by default due to extreme memory usage;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
@@ -16977,7 +16977,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
+PS
window operations are disabled by default due to extreme memory usage;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
@@ -17050,7 +17050,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
window operations are disabled by default due to extreme memory usage;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
@@ -17093,7 +17093,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
window operations are disabled by default due to extreme memory usage;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
@@ -17136,7 +17136,7 @@ are limited. -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
window operations are disabled by default due to extreme memory usage;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index f9f3b063a97..47e730136b5 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1302,7 +1302,8 @@ def test_window_aggs_for_rows_collect_list(): collect_list(c_map) over (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_map from window_collect_table - ''') + ''', + conf={'spark.rapids.sql.window.collectList.enabled': True}) # SortExec does not support array type, so sort the result locally. @@ -1343,7 +1344,8 @@ def test_running_window_function_exec_for_all_aggs(): collect_list(c_struct) over (partition by a order by b,c_int rows between UNBOUNDED PRECEDING AND CURRENT ROW) as collect_struct from window_collect_table - ''') + ''', + conf={'spark.rapids.sql.window.collectList.enabled': True}) # Test the Databricks WindowExec which combines a WindowExec with a ProjectExec and provides the output # fields that we need to handle with an extra GpuProjectExec and we need the input expressions to compute @@ -1471,7 +1473,8 @@ def test_window_aggs_for_rows_collect_set(): (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_fp_nan from window_collect_table ) t - ''') + ''', + conf={'spark.rapids.sql.window.collectSet.enabled': True}) # Note, using sort_array() on the CPU, because sort_array() does not yet @@ -1500,6 +1503,7 @@ def test_window_aggs_for_rows_collect_set(): def test_window_aggs_for_rows_collect_set_nested_array(): conf = copy_and_update(_float_conf, { "spark.rapids.sql.castFloatToString.enabled": "true", + 'spark.rapids.sql.window.collectSet.enabled': "true" }) def do_it(spark): @@ -1740,7 +1744,9 @@ def test_to_date_with_window_functions(): "5 PRECEDING AND -2 FOLLOWING"], ids=idfn) def test_window_aggs_for_negative_rows_partitioned(data_gen, batch_size, window_spec): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, - 'spark.rapids.sql.castFloatToDecimal.enabled': True} + 'spark.rapids.sql.castFloatToDecimal.enabled': True, + 'spark.rapids.sql.window.collectSet.enabled': True, + 'spark.rapids.sql.window.collectList.enabled': True} assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), "window_agg_table", @@ -1791,7 +1797,9 @@ def spark_bugs_in_decimal_sorting(): ids=idfn) def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, - 'spark.rapids.sql.castFloatToDecimal.enabled': True} + 'spark.rapids.sql.castFloatToDecimal.enabled': True, + 'spark.rapids.sql.window.collectSet.enabled': True, + 'spark.rapids.sql.window.collectList.enabled': True} assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 149fd1226b4..b0be1beec89 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3382,13 +3382,24 @@ object GpuOverrides extends Logging { "Collect a list of non-unique elements, not supported in reduction", ExprChecks.fullAgg( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.BINARY + - TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP), + TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP) + .withPsNote(TypeEnum.ARRAY, "window operations are disabled by default due " + + "to extreme memory usage"), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.BINARY + TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), TypeSig.all))), (c, conf, p, r) => new TypedImperativeAggExprMeta[CollectList](c, conf, p, r) { + override def tagAggForGpu(): Unit = { + if (context == WindowAggExprContext && !conf.isWindowCollectListEnabled) { + willNotWorkOnGpu("collect_list is disabled for window operations because " + + "the output explodes in size proportional to the window size squared. If " + + "you know the window is small you can try it by setting " + + s"${RapidsConf.ENABLE_WINDOW_COLLECT_LIST} to true") + } + } + override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = GpuCollectList(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset) @@ -3412,7 +3423,9 @@ object GpuOverrides extends Logging { "Collect a set of unique elements, not supported in reduction", ExprChecks.fullAgg( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + - TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY), + TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY) + .withPsNote(TypeEnum.ARRAY, "window operations are disabled by default due " + + "to extreme memory usage"), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + @@ -3421,6 +3434,14 @@ object GpuOverrides extends Logging { TypeSig.ARRAY).nested(), TypeSig.all))), (c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) { + override def tagAggForGpu(): Unit = { + if (context == WindowAggExprContext && !conf.isWindowCollectSetEnabled) { + willNotWorkOnGpu("collect_set is disabled for window operations because " + + "the output can explode in size proportional to the window size squared. If " + + "you know the window is small you can try it by setting " + + s"${RapidsConf.ENABLE_WINDOW_COLLECT_SET} to true") + } + } override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = GpuCollectSet(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 41b3b920ad1..2f71647971e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -726,6 +726,22 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(true) + val ENABLE_WINDOW_COLLECT_LIST = conf("spark.rapids.sql.window.collectList.enabled") + .doc("The output size of collect list for a window operation is proportional to " + + "the window size squared. The current GPU implementation does not handle this well " + + "and is disabled by default. If you know that your window size is very small you " + + "can try to enable it.") + .booleanConf + .createWithDefault(false) + + val ENABLE_WINDOW_COLLECT_SET = conf("spark.rapids.sql.window.collectSet.enabled") + .doc("The output size of collect set for a window operation can be proportional to " + + "the window size squared. The current GPU implementation does not handle this well " + + "and is disabled by default. If you know that your window size is very small you " + + "can try to enable it.") + .booleanConf + .createWithDefault(false) + val ENABLE_FLOAT_AGG = conf("spark.rapids.sql.variableFloatAgg.enabled") .doc("Spark assumes that all operations produce the exact same result each time. " + "This is not true for some floating point aggregations, which can produce slightly " + @@ -2417,6 +2433,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val gpuTargetBatchSizeBytes: Long = get(GPU_BATCH_SIZE_BYTES) + lazy val isWindowCollectListEnabled: Boolean = get(ENABLE_WINDOW_COLLECT_LIST) + + lazy val isWindowCollectSetEnabled: Boolean = get(ENABLE_WINDOW_COLLECT_SET) + lazy val isFloatAggEnabled: Boolean = get(ENABLE_FLOAT_AGG) lazy val explain: String = get(EXPLAIN)