Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor for SPARK-34079-multi-column-scalar-subquery #4

Conversation

attilapiros
Copy link

No description provided.

@github-actions github-actions bot added the SQL label Nov 16, 2021
peter-toth pushed a commit that referenced this pull request Jan 17, 2023
### What changes were proposed in this pull request?
This PR introduces sasl retry count in RetryingBlockTransferor.

### Why are the changes needed?
Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario:

1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException

Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4.
Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New test is added, courtesy of Mridul.

Closes apache#39611 from tedyu/sasl-cnt.

Authored-by: Ted Yu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
peter-toth pushed a commit that referenced this pull request Apr 11, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
peter-toth pushed a commit that referenced this pull request Aug 22, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ef0a76e)
Signed-off-by: Wenchen Fan <[email protected]>
peter-toth pushed a commit that referenced this pull request Aug 22, 2023
### What changes were proposed in this pull request? This PR introduces sasl retry count in RetryingBlockTransferor.

### Why are the changes needed?
Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario:

1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException

Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount.

### Does this PR introduce _any_ user-facing change? No

### How was this patch tested?
New test is added, courtesy of Mridul.

Closes apache#39611 from tedyu/sasl-cnt.

Authored-by: Ted Yu <yuzhihonggmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>

Closes apache#39709 from akpatnam25/SPARK-42090-backport-3.3.

Authored-by: Ted Yu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
peter-toth pushed a commit that referenced this pull request Apr 23, 2024
### What changes were proposed in this pull request?

In the `Window` node, both `partitionSpec` and `orderSpec` must be orderable, but the current type check only verifies `orderSpec` is orderable. This can cause an error in later optimizing phases.

Given a query:

```
with t as (select id, map(id, id) as m from range(0, 10))
select rank() over (partition by m order by id) from t
```

Before the PR, it fails with an `INTERNAL_ERROR`:

```
org.apache.spark.SparkException: [INTERNAL_ERROR] grouping/join/window partition keys cannot be map type. SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.needNormalize(NormalizeFloatingNumbers.scala:103)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.org$apache$spark$sql$catalyst$optimizer$NormalizeFloatingNumbers$$needNormalize(NormalizeFloatingNumbers.scala:94)
...
```

After the PR, it fails with a `EXPRESSION_TYPE_IS_NOT_ORDERABLE`, which is expected:

```
  org.apache.spark.sql.catalyst.ExtendedAnalysisException: [EXPRESSION_TYPE_IS_NOT_ORDERABLE] Column expression "m" cannot be sorted because its type "MAP<BIGINT, BIGINT>" is not orderable. SQLSTATE: 42822; line 2 pos 53;
Project [RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
+- Project [id#1L, m#0, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
   +- Window [rank(id#1L) windowspecdefinition(m#0, id#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4], [m#0], [id#1L ASC NULLS FIRST]
      +- Project [id#1L, m#0]
         +- SubqueryAlias t
            +- SubqueryAlias t
               +- Project [id#1L, map(id#1L, id#1L) AS m#0]
                  +- Range (0, 10, step=1, splits=None)
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
...
```

### How was this patch tested?

Unit test.

Closes apache#45730 from chenhao-db/SPARK-47572.

Authored-by: Chenhao Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
peter-toth pushed a commit that referenced this pull request Jun 28, 2024
… throw internal error

### What changes were proposed in this pull request?

This PR fixes the error messages and classes when Python UDFs are used in higher order functions.

### Why are the changes needed?

To show the proper user-facing exceptions with error classes.

### Does this PR introduce _any_ user-facing change?

Yes, previously it threw internal error such as:

```python
from pyspark.sql.functions import transform, udf, col, array
spark.range(1).select(transform(array("id"), lambda x: udf(lambda y: y)(x))).collect()
```

Before:

```
py4j.protocol.Py4JJavaError: An error occurred while calling o74.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 0.0 failed 1 times, most recent failure: Lost task 15.0 in stage 0.0 (TID 15) (ip-192-168-123-103.ap-northeast-2.compute.internal executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: <lambda>(lambda x_0#3L)#2 SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```

After:

```
pyspark.errors.exceptions.captured.AnalysisException: [INVALID_LAMBDA_FUNCTION_CALL.UNEVALUABLE] Invalid lambda function call. Python UDFs should be used in a lambda function at a higher order function. However, "<lambda>(lambda x_0#3L)" was a Python UDF. SQLSTATE: 42K0D;
Project [transform(array(id#0L), lambdafunction(<lambda>(lambda x_0#3L)#2, lambda x_0#3L, false)) AS transform(array(id), lambdafunction(<lambda>(lambda x_0#3L), namedlambdavariable()))#4]
+- Range (0, 1, step=1, splits=Some(16))
```

### How was this patch tested?

Unittest was added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47079 from HyukjinKwon/SPARK-48706.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant