Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent a degenerative join in test_dpp_reuse_broadcast_exchange [databricks] #10168

Merged
merged 8 commits into from
Jan 10, 2024
9 changes: 6 additions & 3 deletions integration_tests/src/main/python/dpp_test.py
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -20,14 +20,17 @@
from marks import ignore_order, allow_non_gpu
from spark_session import is_before_spark_320, with_cpu_session, is_before_spark_312, is_databricks_runtime, is_databricks113_or_later

# non-positive values here can produce a degenerative join, so we want a filter value associated
# with only positive values. See https://github.com/NVIDIA/spark-rapids/issues/10147
jlowe marked this conversation as resolved.
Show resolved Hide resolved
value_gen = RepeatSeqGen(int_gen, length=100)

def create_dim_table(table_name, table_format, length=500):
def fn(spark):
df = gen_df(spark, [
('key', IntegerGen(nullable=False, min_val=0, max_val=9, special_cases=[])),
('skey', IntegerGen(nullable=False, min_val=0, max_val=4, special_cases=[])),
('ex_key', IntegerGen(nullable=False, min_val=0, max_val=3, special_cases=[])),
('value', int_gen),
('value', value_gen),
# specify nullable=False for `filter` to avoid generating invalid SQL with
# expression `filter = None` (https://github.com/NVIDIA/spark-rapids/issues/9817)
('filter', RepeatSeqGen(
Expand All @@ -49,7 +52,7 @@ def fn(spark):
('skey', IntegerGen(nullable=False, min_val=0, max_val=4, special_cases=[])),
# ex_key is not a partition column
('ex_key', IntegerGen(nullable=False, min_val=0, max_val=3, special_cases=[])),
('value', int_gen)], length)
('value', value_gen)], length)
df.write.format(table_format) \
.mode("overwrite") \
.partitionBy('key', 'skey') \
Expand Down