Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_dpp_reuse_broadcast_exchange failed #10147

Open
jlowe opened this issue Jan 3, 2024 · 12 comments · Fixed by #10168 or #10551
Open

[BUG] test_dpp_reuse_broadcast_exchange failed #10147

jlowe opened this issue Jan 3, 2024 · 12 comments · Fixed by #10168 or #10551
Assignees
Labels
bug Something isn't working

Comments

@jlowe
Copy link
Member

jlowe commented Jan 3, 2024

From a recent nightly test run:

[2024-01-03T17:41:18.525Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1704297021, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
[2024-01-03T17:41:18.525Z] : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
[2024-01-03T17:41:18.525Z] AdaptiveSparkPlan isFinalPlan=true
[2024-01-03T17:41:18.525Z] +- == Final Plan ==
[2024-01-03T17:41:18.525Z]    LocalTableScan <empty>, [key#37135, max(value)#37151L]
[2024-01-03T17:41:18.525Z] +- == Initial Plan ==
[2024-01-03T17:41:18.525Z]    Sort [key#37135 ASC NULLS FIRST, max(value)#37151L ASC NULLS FIRST], true, 0
[2024-01-03T17:41:18.525Z]    +- Exchange rangepartitioning(key#37135 ASC NULLS FIRST, max(value)#37151L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=109491]
[2024-01-03T17:41:18.525Z]       +- HashAggregate(keys=[key#37135], functions=[max(value#37136L)], output=[key#37135, max(value)#37151L])
[2024-01-03T17:41:18.525Z]          +- Exchange hashpartitioning(key#37135, 4), ENSURE_REQUIREMENTS, [plan_id=109488]
[2024-01-03T17:41:18.525Z]             +- HashAggregate(keys=[key#37135], functions=[partial_max(value#37136L)], output=[key#37135, max#37199L])
[2024-01-03T17:41:18.525Z]                +- Union
[2024-01-03T17:41:18.525Z]                   :- Project [key#37054 AS key#37135, value#37140L AS value#37136L]
[2024-01-03T17:41:18.525Z]                   :  +- BroadcastHashJoin [key#37054], [key#37056], Inner, BuildRight, false
[2024-01-03T17:41:18.526Z]                   :     :- HashAggregate(keys=[key#37054], functions=[sum(value#37053)], output=[key#37054, value#37140L])
[2024-01-03T17:41:18.526Z]                   :     :  +- Exchange hashpartitioning(key#37054, 4), ENSURE_REQUIREMENTS, [plan_id=109473]
[2024-01-03T17:41:18.526Z]                   :     :     +- HashAggregate(keys=[key#37054], functions=[partial_sum(value#37053)], output=[key#37054, sum#37201L])
[2024-01-03T17:41:18.526Z]                   :     :        +- Project [value#37053, key#37054]
[2024-01-03T17:41:18.526Z]                   :     :           +- Filter (isnotnull(value#37053) AND (value#37053 > 0))
[2024-01-03T17:41:18.526Z]                   :     :              +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_0[value#37053,key#37054,skey#37055] Batched: true, DataFilters: [isnotnull(value#37053), (value#37053 > 0)], Format: Parquet, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37054), dynamicpruningexpression(key#37054 IN dynamicpruning#37196)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.526Z]                   :     :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37196, 0, true, Project [key#37056], [key#37056]
[2024-01-03T17:41:18.526Z]                   :     :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.526Z]                   :     :                          +- Project [key#37056]
[2024-01-03T17:41:18.526Z]                   :     :                             +- Filter ((((isnotnull(ex_key#37058) AND isnotnull(filter#37060)) AND (ex_key#37058 = 3)) AND (filter#37060 = 1458)) AND isnotnull(key#37056))
[2024-01-03T17:41:18.526Z]                   :     :                                +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37056,ex_key#37058,filter#37060] Batched: true, DataFilters: [isnotnull(ex_key#37058), isnotnull(filter#37060), (ex_key#37058 = 3), (filter#37060 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z]                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=109476]
[2024-01-03T17:41:18.526Z]                   :        +- Project [key#37056]
[2024-01-03T17:41:18.526Z]                   :           +- Filter ((((isnotnull(ex_key#37058) AND isnotnull(filter#37060)) AND (ex_key#37058 = 3)) AND (filter#37060 = 1458)) AND isnotnull(key#37056))
[2024-01-03T17:41:18.526Z]                   :              +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37056,ex_key#37058,filter#37060] Batched: true, DataFilters: [isnotnull(ex_key#37058), isnotnull(filter#37060), (ex_key#37058 = 3), (filter#37060 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z]                   +- Project [key#37184, value#37187L]
[2024-01-03T17:41:18.526Z]                      +- BroadcastHashJoin [key#37184], [key#37188], Inner, BuildRight, false
[2024-01-03T17:41:18.526Z]                         :- HashAggregate(keys=[key#37184], functions=[sum(value#37183)], output=[key#37184, value#37187L])
[2024-01-03T17:41:18.526Z]                         :  +- Exchange hashpartitioning(key#37184, 4), ENSURE_REQUIREMENTS, [plan_id=109479]
[2024-01-03T17:41:18.526Z]                         :     +- HashAggregate(keys=[key#37184], functions=[partial_sum(value#37183)], output=[key#37184, sum#37203L])
[2024-01-03T17:41:18.526Z]                         :        +- Project [value#37183, key#37184]
[2024-01-03T17:41:18.526Z]                         :           +- Filter (isnotnull(value#37183) AND (value#37183 > 0))
[2024-01-03T17:41:18.526Z]                         :              +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_0[value#37183,key#37184,skey#37185] Batched: true, DataFilters: [isnotnull(value#37183), (value#37183 > 0)], Format: Parquet, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37184), dynamicpruningexpression(key#37184 IN dynamicpruning#37197)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.526Z]                         :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37197, 0, true, Project [key#37188], [key#37188]
[2024-01-03T17:41:18.526Z]                         :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.526Z]                         :                          +- Project [key#37188]
[2024-01-03T17:41:18.526Z]                         :                             +- Filter ((((isnotnull(ex_key#37190) AND isnotnull(filter#37192)) AND (ex_key#37190 = 3)) AND (filter#37192 = 1458)) AND isnotnull(key#37188))
[2024-01-03T17:41:18.526Z]                         :                                +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37188,ex_key#37190,filter#37192] Batched: true, DataFilters: [isnotnull(ex_key#37190), isnotnull(filter#37192), (ex_key#37190 = 3), (filter#37192 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z]                         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=109482]
[2024-01-03T17:41:18.526Z]                            +- Project [key#37188]
[2024-01-03T17:41:18.526Z]                               +- Filter ((((isnotnull(ex_key#37190) AND isnotnull(filter#37192)) AND (ex_key#37190 = 3)) AND (filter#37192 = 1458)) AND isnotnull(key#37188))
[2024-01-03T17:41:18.526Z]                                  +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37188,ex_key#37190,filter#37192] Batched: true, DataFilters: [isnotnull(ex_key#37190), isnotnull(filter#37192), (ex_key#37190 = 3), (filter#37192 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z] 
[2024-01-03T17:41:18.526Z] 	at scala.Predef$.assert(Predef.scala:223)
[2024-01-03T17:41:18.526Z] 	at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:170)
[2024-01-03T17:41:18.526Z] 	at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:175)
[2024-01-03T17:41:18.526Z] 	at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertContains(ExecutionPlanCaptureCallback.scala:76)
[2024-01-03T17:41:18.526Z] 	at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(ExecutionPlanCaptureCallback.scala)
[2024-01-03T17:41:18.526Z] 	at sun.reflect.GeneratedMethodAccessor140.invoke(Unknown Source)
[2024-01-03T17:41:18.526Z] 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[2024-01-03T17:41:18.526Z] 	at java.lang.reflect.Method.invoke(Method.java:498)
[2024-01-03T17:41:18.526Z] 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
[2024-01-03T17:41:18.526Z] 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
[2024-01-03T17:41:18.526Z] 	at py4j.Gateway.invoke(Gateway.java:282)
[2024-01-03T17:41:18.526Z] 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
[2024-01-03T17:41:18.526Z] 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
[2024-01-03T17:41:18.526Z] 	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
[2024-01-03T17:41:18.526Z] 	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
[2024-01-03T17:41:18.527Z] 	at java.lang.Thread.run(Thread.java:750)
[2024-01-03T17:41:18.527Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-orc][DATAGEN_SEED=1704297021, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
[2024-01-03T17:41:18.527Z] : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
[2024-01-03T17:41:18.527Z] AdaptiveSparkPlan isFinalPlan=true
[2024-01-03T17:41:18.527Z] +- == Final Plan ==
[2024-01-03T17:41:18.527Z]    LocalTableScan <empty>, [key#37628, max(value)#37644L]
[2024-01-03T17:41:18.527Z] +- == Initial Plan ==
[2024-01-03T17:41:18.527Z]    Sort [key#37628 ASC NULLS FIRST, max(value)#37644L ASC NULLS FIRST], true, 0
[2024-01-03T17:41:18.527Z]    +- Exchange rangepartitioning(key#37628 ASC NULLS FIRST, max(value)#37644L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=111165]
[2024-01-03T17:41:18.527Z]       +- HashAggregate(keys=[key#37628], functions=[max(value#37629L)], output=[key#37628, max(value)#37644L])
[2024-01-03T17:41:18.527Z]          +- Exchange hashpartitioning(key#37628, 4), ENSURE_REQUIREMENTS, [plan_id=111162]
[2024-01-03T17:41:18.527Z]             +- HashAggregate(keys=[key#37628], functions=[partial_max(value#37629L)], output=[key#37628, max#37692L])
[2024-01-03T17:41:18.527Z]                +- Union
[2024-01-03T17:41:18.527Z]                   :- Project [key#37547 AS key#37628, value#37633L AS value#37629L]
[2024-01-03T17:41:18.527Z]                   :  +- BroadcastHashJoin [key#37547], [key#37549], Inner, BuildRight, false
[2024-01-03T17:41:18.527Z]                   :     :- HashAggregate(keys=[key#37547], functions=[sum(value#37546)], output=[key#37547, value#37633L])
[2024-01-03T17:41:18.527Z]                   :     :  +- Exchange hashpartitioning(key#37547, 4), ENSURE_REQUIREMENTS, [plan_id=111147]
[2024-01-03T17:41:18.527Z]                   :     :     +- HashAggregate(keys=[key#37547], functions=[partial_sum(value#37546)], output=[key#37547, sum#37694L])
[2024-01-03T17:41:18.527Z]                   :     :        +- Project [value#37546, key#37547]
[2024-01-03T17:41:18.527Z]                   :     :           +- Filter (isnotnull(value#37546) AND (value#37546 > 0))
[2024-01-03T17:41:18.527Z]                   :     :              +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_0[value#37546,key#37547,skey#37548] Batched: true, DataFilters: [isnotnull(value#37546), (value#37546 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37547), dynamicpruningexpression(key#37547 IN dynamicpruning#37689)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.527Z]                   :     :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37689, 0, true, Project [key#37549], [key#37549]
[2024-01-03T17:41:18.527Z]                   :     :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.527Z]                   :     :                          +- Project [key#37549]
[2024-01-03T17:41:18.527Z]                   :     :                             +- Filter ((((isnotnull(ex_key#37551) AND isnotnull(filter#37553)) AND (ex_key#37551 = 3)) AND (filter#37553 = 1458)) AND isnotnull(key#37549))
[2024-01-03T17:41:18.527Z]                   :     :                                +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37549,ex_key#37551,filter#37553] Batched: true, DataFilters: [isnotnull(ex_key#37551), isnotnull(filter#37553), (ex_key#37551 = 3), (filter#37553 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.527Z]                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=111150]
[2024-01-03T17:41:18.527Z]                   :        +- Project [key#37549]
[2024-01-03T17:41:18.527Z]                   :           +- Filter ((((isnotnull(ex_key#37551) AND isnotnull(filter#37553)) AND (ex_key#37551 = 3)) AND (filter#37553 = 1458)) AND isnotnull(key#37549))
[2024-01-03T17:41:18.527Z]                   :              +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37549,ex_key#37551,filter#37553] Batched: true, DataFilters: [isnotnull(ex_key#37551), isnotnull(filter#37553), (ex_key#37551 = 3), (filter#37553 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.527Z]                   +- Project [key#37677, value#37680L]
[2024-01-03T17:41:18.527Z]                      +- BroadcastHashJoin [key#37677], [key#37681], Inner, BuildRight, false
[2024-01-03T17:41:18.527Z]                         :- HashAggregate(keys=[key#37677], functions=[sum(value#37676)], output=[key#37677, value#37680L])
[2024-01-03T17:41:18.527Z]                         :  +- Exchange hashpartitioning(key#37677, 4), ENSURE_REQUIREMENTS, [plan_id=111153]
[2024-01-03T17:41:18.527Z]                         :     +- HashAggregate(keys=[key#37677], functions=[partial_sum(value#37676)], output=[key#37677, sum#37696L])
[2024-01-03T17:41:18.527Z]                         :        +- Project [value#37676, key#37677]
[2024-01-03T17:41:18.527Z]                         :           +- Filter (isnotnull(value#37676) AND (value#37676 > 0))
[2024-01-03T17:41:18.527Z]                         :              +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_0[value#37676,key#37677,skey#37678] Batched: true, DataFilters: [isnotnull(value#37676), (value#37676 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37677), dynamicpruningexpression(key#37677 IN dynamicpruning#37690)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.527Z]                         :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37690, 0, true, Project [key#37681], [key#37681]
[2024-01-03T17:41:18.527Z]                         :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.527Z]                         :                          +- Project [key#37681]
[2024-01-03T17:41:18.527Z]                         :                             +- Filter ((((isnotnull(ex_key#37683) AND isnotnull(filter#37685)) AND (ex_key#37683 = 3)) AND (filter#37685 = 1458)) AND isnotnull(key#37681))
[2024-01-03T17:41:18.527Z]                         :                                +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37681,ex_key#37683,filter#37685] Batched: true, DataFilters: [isnotnull(ex_key#37683), isnotnull(filter#37685), (ex_key#37683 = 3), (filter#37685 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.527Z]                         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=111156]
[2024-01-03T17:41:18.527Z]                            +- Project [key#37681]
[2024-01-03T17:41:18.527Z]                               +- Filter ((((isnotnull(ex_key#37683) AND isnotnull(filter#37685)) AND (ex_key#37683 = 3)) AND (filter#37685 = 1458)) AND isnotnull(key#37681))
[2024-01-03T17:41:18.527Z]                                  +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37681,ex_key#37683,filter#37685] Batched: true, DataFilters: [isnotnull(ex_key#37683), isnotnull(filter#37685), (ex_key#37683 = 3), (filter#37685 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.528Z] 
[2024-01-03T17:41:18.528Z] 	at scala.Predef$.assert(Predef.scala:223)
[2024-01-03T17:41:18.528Z] 	at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:170)
[2024-01-03T17:41:18.528Z] 	at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:175)
[2024-01-03T17:41:18.528Z] 	at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertContains(ExecutionPlanCaptureCallback.scala:76)
[2024-01-03T17:41:18.528Z] 	at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(ExecutionPlanCaptureCallback.scala)
[2024-01-03T17:41:18.528Z] 	at sun.reflect.GeneratedMethodAccessor140.invoke(Unknown Source)
[2024-01-03T17:41:18.528Z] 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[2024-01-03T17:41:18.528Z] 	at java.lang.reflect.Method.invoke(Method.java:498)
[2024-01-03T17:41:18.528Z] 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
[2024-01-03T17:41:18.528Z] 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
[2024-01-03T17:41:18.528Z] 	at py4j.Gateway.invoke(Gateway.java:282)
[2024-01-03T17:41:18.528Z] 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
[2024-01-03T17:41:18.528Z] 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
[2024-01-03T17:41:18.528Z] 	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
[2024-01-03T17:41:18.528Z] 	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
[2024-01-03T17:41:18.528Z] 	at java.lang.Thread.run(Thread.java:750)
[2024-01-03T17:41:18.528Z] = 2 failed, 20760 passed, 933 skipped, 417 xfailed, 388 xpassed, 9504 warnings in 6656.39s (1:50:56) =
@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jan 3, 2024
@NVnavkumar
Copy link
Collaborator

From CI, this test failure occurred in Spark 3.5.0

@NVnavkumar NVnavkumar added Spark 3.5+ Spark 3.5+ issues Spark 3.4+ Spark 3.4+ issues and removed Spark 3.5+ Spark 3.5+ issues Spark 3.4+ Spark 3.4+ issues labels Jan 5, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Jan 5, 2024
@NVnavkumar
Copy link
Collaborator

I was able to replicate both failures on Spark 3.2.4, 3.3.3, 3.4.0, and 3.5.0 (all versions of Spark that support AQE + DPP)

@NVnavkumar
Copy link
Collaborator

Basically by the plan output here, it looks like this is an AQE optimization that is turning the entire plan into a LocalTableScan

E                   py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
E                   : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
E                   AdaptiveSparkPlan isFinalPlan=true
E                   +- == Final Plan ==
E                      LocalTableScan <empty>, [key#2006, max(value)#2017L]
E                   +- == Initial Plan ==
E                   +- == Initial Plan ==
E                      Sort [key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST], true, 0
E                      +- Exchange rangepartitioning(key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=5302]
E                         +- HashAggregate(keys=[key#2006], functions=[max(value#2007L)], output=[key#2006, max(value)#2017L])
E                            +- Exchange hashpartitioning(key#2006, 4), ENSURE_REQUIREMENTS, [plan_id=5299]
E                               +- HashAggregate(keys=[key#2006], functions=[partial_max(value#2007L)], output=[key#2006, max#2023L])
E                                  +- Union
...

@NVnavkumar
Copy link
Collaborator

Basically by the plan output here, it looks like this is an AQE optimization that is turning the entire plan into a LocalTableScan


E                   py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.

E                   : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan

E                   AdaptiveSparkPlan isFinalPlan=true

E                   +- == Final Plan ==

E                      LocalTableScan <empty>, [key#2006, max(value)#2017L]

E                   +- == Initial Plan ==

E                   +- == Initial Plan ==

E                      Sort [key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST], true, 0

E                      +- Exchange rangepartitioning(key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=5302]

E                         +- HashAggregate(keys=[key#2006], functions=[max(value#2007L)], output=[key#2006, max(value)#2017L])

E                            +- Exchange hashpartitioning(key#2006, 4), ENSURE_REQUIREMENTS, [plan_id=5299]

E                               +- HashAggregate(keys=[key#2006], functions=[partial_max(value#2007L)], output=[key#2006, max#2023L])

E                                  +- Union

...

I guess it determined via the join that this would return empty

@NVnavkumar
Copy link
Collaborator

So basically after some debugging, I think one the subqueries returned an empty result, so that was short-circuited by AQE to return a LocalTableScan <empty>. This happens on both the CPU and GPU, but of course this means that the result did not contain a DynamicPruningExpression, so it looks like the solution here is that we need update the test logic to be something like an either/or capture. Either there is a single LocalTableScanExec or the GPU plan needs to contain DynamicPruningExpression.

@jlowe
Copy link
Member Author

jlowe commented Jan 8, 2024

it looks like the solution here is that we need update the test logic to be something like an either/or capture

I'm not sure that's the best fix. The point of this test is to check handling of DPP, and the problem here is that the datagen happened to produce inputs that failed to produce a plan requiring DPP. IMHO a better fix is to update the input data generation to ensure there isn't a degenerate join. If we want to test handling of degenerate joins as well, that should be a separate test that explicitly sets up inputs to produce a degenerate join.

@NVnavkumar
Copy link
Collaborator

it looks like the solution here is that we need update the test logic to be something like an either/or capture

I'm not sure that's the best fix. The point of this test is to check handling of DPP, and the problem here is that the datagen happened to produce inputs that failed to produce a plan requiring DPP. IMHO a better fix is to update the input data generation to ensure there isn't a degenerate join. If we want to test handling of degenerate joins as well, that should be a separate test that explicitly sets up inputs to produce a degenerate join.

Makes sense. Will investigate what is producing the empty join

@NVnavkumar
Copy link
Collaborator

test_dpp_empty_relation already exists, so I think we just need to prevent the degenerate join in this test

@NVnavkumar
Copy link
Collaborator

Test is now failing again:

FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1707665221, INJECT_OOM, IGNORE_ORDER]

@jlowe
Copy link
Member Author

jlowe commented Feb 22, 2024

Saw this fail again on Dataproc nightly run.

[2024-02-22T15:41:19.602Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1708615902, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
[2024-02-22T15:41:19.602Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-orc][DATAGEN_SEED=1708615902, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
[2024-02-22T15:41:19.602Z] = 2 failed, 116 passed, 11 skipped, 26232 deselected, 9 warnings in 557.14s (0:09:17) =

@GaryShen2008
Copy link
Collaborator

Another failure

[2024-02-29T10:10:57.500Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1709192431, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...

[2024-02-29T10:10:57.500Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-orc][DATAGEN_SEED=1709192431, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...

@NVnavkumar
Copy link
Collaborator

Considering this is actually a test issue (the test not being able to avoid an empty LocalTableScan) and not an issue with the plugin, lowering the priority

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants