Skip to content

Commit

Permalink
Config changes:
Browse files Browse the repository at this point in the history
1. Renamed config. '.extent' to '.max'.
2. Fixed documentation for said config.
3. Removed TODOs that were already handled.
  • Loading branch information
mythrocks committed Dec 7, 2023
1 parent b5fda09 commit f9de13a
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1903,7 +1903,7 @@ def test_window_aggs_for_batched_finite_row_windows_fallback(data_gen):

def get_conf_with_extent(extent):
return {'spark.rapids.sql.batchSizeBytes': '1000',
'spark.rapids.sql.window.batched.bounded.row.extent': extent}
'spark.rapids.sql.window.batched.bounded.row.max': extent}

def assert_query_runs_on(exec, conf):
assert_gpu_and_cpu_are_equal_sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class GpuBatchedBoundedWindowIterator(
} else {
// More input rows expected. The last `maxFollowing` rows can't be finalized.
// Cannot exceed `inputRowCount`.
// TODO: Account for maxFollowing < 0 (e.g. LAG()) => numUnprocessedInCache = 0.
if (maxFollowing < 0) { // E.g. LAG(3) => [ preceding=-3, following=-3 ]
// -ve following => No need to wait for more following rows.
// All "following" context is already available in the current batch.
Expand All @@ -192,7 +191,6 @@ class GpuBatchedBoundedWindowIterator(
}

// Compute new cache using current input.
// TODO: Account for minPreceding >0 (e.g. LEAD()) => numPrecedingRowsAdded = 0.
numPrecedingRowsAdded = if (minPreceding > 0) { // E.g. LEAD(3) => [prec=3, foll=3]
// preceding > 0 => No "preceding" rows need be carried forward.
// Only the rows that need to be recomputed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ object GpuWindowExec {
conf: RapidsConf): Boolean = {

def inPermissibleRange(bounds: Int) =
Math.abs(bounds) <= conf.boundedRowsWindowMaxExtent
Math.abs(bounds) <= conf.batchedBoundedRowsWindowMax

spec match {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1340,10 +1340,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val BATCHED_BOUNDED_ROW_WINDOW_MAX_EXTENT: ConfEntryWithDefault[Integer] =
conf("spark.rapids.sql.window.batched.bounded.row.extent")
val BATCHED_BOUNDED_ROW_WINDOW_MAX: ConfEntryWithDefault[Integer] =
conf("spark.rapids.sql.window.batched.bounded.row.max")
.doc("Max value for bounded row window preceding/following extents " +
"permissible for the window to be evaluated in batched mode")
"permissible for the window to be evaluated in batched mode. This value affects " +
"both the preceding and following bounds, potentially doubling the window size " +
"permitted for batched execution")
.integerConf
.createWithDefault(value = 100)

Expand Down Expand Up @@ -2716,7 +2718,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isRangeWindowDecimalEnabled: Boolean = get(ENABLE_RANGE_WINDOW_DECIMAL)

lazy val boundedRowsWindowMaxExtent: Int = get(BATCHED_BOUNDED_ROW_WINDOW_MAX_EXTENT)
lazy val batchedBoundedRowsWindowMax: Int = get(BATCHED_BOUNDED_ROW_WINDOW_MAX)

lazy val allowSinglePassPartialSortAgg: Boolean = get(ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG)

Expand Down

0 comments on commit f9de13a

Please sign in to comment.