Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49022] Use Column Node API in Column #47688

Closed
wants to merge 21 commits into from

Conversation

hvanhovell
Copy link
Contributor

@hvanhovell hvanhovell commented Aug 9, 2024

What changes were proposed in this pull request?

This PR makes the org.apache.spark.sql.Column and friends use the recently introduced ColumnNode API. This is a stepping stone towards making the Column API implementation agnostic.

Most of the changes are fairly mechanical, and they are mostly caused by the removal of the Column(Expression) constructor.

Why are the changes needed?

We want to create unified Scala interface for Classic and Connect. A language agnostic Column API implementation is part of this.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No

return Column(cast(JVMView, sc._jvm).Column(expr(*jcols + jfuns)))

jcols = [_to_java_column(c) for c in cols]
return Column(sc._jvm.Column.pysparkFn(name, _to_seq(sc, jcols + jfuns)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I cannot invoke Column.fn(...) here. Do you know why? I added Column.pysparkFn(...) as a workaround.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the Column.fn has to be decorated with scala.annotation.varargs (which will create the same signature for variant arguments for Java).

@github-actions github-actions bot removed the CORE label Aug 14, 2024
@hvanhovell
Copy link
Contributor Author

This is waiting for #47746.

@github-actions github-actions bot added the R label Aug 15, 2024
@@ -26,8 +26,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.UnboundRowEncoder
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
import org.apache.spark.sql.execution.aggregate.ScalaAggregator
import org.apache.spark.sql.internal.UserDefinedFunctionLike
import org.apache.spark.sql.internal.{InvokeInlineUserDefinedFunction, UserDefinedFunctionLike}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not so related to this PR but I wonder if we should name the package a little bit differently, e.g., org.apache.spark.sql.internal.api. At least I am checking which package does individual class belongs to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean move all ColumnNode classes elsewhere?


def unapply(col: Column): Option[Expression] = Some(col.expr)
def apply(node: => ColumnNode): Column = withOrigin(new Column(node))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error report here would be a bit weird via Origin because the top level of the function call changed. We get Thread.currentThread().getStackTrace, and set it to Origin so whichever function fails, it will always point it here.

Would be great if we can double check the code like the below still works fine:

val df = spark.range(10)

val df1 = df.withColumn("div_ten", df.col("id") / 10)
val df2 = df1.withColumn("plus_four", df.col("id") + 4)

// This is problematic divide operation that occurs DIVIDE_BY_ZERO.
val df3 = df2.withColumn("div_zero", df.col("id") / 0) // Error here
val df4 = df3.withColumn("minus_five", df.col("id") / 5)
df4.collect()

should report sth like:

org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"div" was called from
<init>(<console>:7)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what get:

org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"div" was called from
<init>(<console>:1)

So that checks out. TBH this part is almost the same as before.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good otherwise. Mostly minor comments. Only one real comment from me is https://github.com/apache/spark/pull/47688/files#r1719271705 to make sure error reporting is still working fine.

@hvanhovell hvanhovell changed the title [WIP][SPARK-49022] Use Column Node API in Column [SPARK-49022] Use Column Node API in Column Aug 17, 2024
@hvanhovell
Copy link
Contributor Author

Tests have passed. I am merging this to master.

@HyukjinKwon
Copy link
Member

Merged to master.

@EnricoMi
Copy link
Contributor

@hvanhovell what is the recommended way to migrate user code like

new Column(MyExpression())

to this Column Node Api?

The following are all private[sql] / private[spark]:

Column(MyExpression())
new Column(ExpressionColumnNode(MyExpression()))

@hvanhovell
Copy link
Contributor Author

@EnricoMi what are you exactly doing? Could you share an example? We can definitely open up the interface to allow for user extensions.

If it is basically adding functions, then I'd probably go with the Column.fn(...) approach in combination with a SparkSessionExtension that will register your custom expressions. This way we can easily make what you have created work with Spark Connect (and with SQL). I understand some folks are not interested in this, in that case we can still open up an expression way of creating a Column.

@EnricoMi
Copy link
Contributor

... in that case we can still open up an expression way of creating a Column.

That would shortcut a lot of extra complexity and simplify backward compatibility.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 18, 2024

backward compatibility

Which backward compatibility? Showing an example would help a lot to understand. All those expressions are subject to be internal and private.

@EnricoMi
Copy link
Contributor

The spark-extension packages provides some Dataset diff tooling. There, a user-defined comparison can simply be defined by implementing the scala.math.Equiv interface: https://github.com/G-Research/spark-extension/blob/master/src/main/scala/uk/co/gresearch/spark/diff/DiffComparators.scala#L41

That Equiv implementation is wrapped into an Expression (including codegen) and turned into a Comparatorthat is then used by the package to diff columns: given two columns left and right, return a Column that evaluates (compares the columns) to Boolean:

This obviously won't work for Spark connect, but with Column Node API this does not work for classic Spark client either.

That package supports Spark 3.0 - 3.5. Creating a Column from an Expression would allow for minimal changes to keep this working for Spark 4.0 with non-Connect client. This is what I meant with backward compatibility.

In order to support Spark Connect, there is no way around using the Spark Connect plugin / extensions.

github-merge-queue bot pushed a commit to G-Research/spark-extension that referenced this pull request Aug 20, 2024
@EnricoMi
Copy link
Contributor

Currently working around this by gaining access to Column(Expression) in private[spark] package: https://github.com/G-Research/spark-extension/pull/256/files

IvanK-db pushed a commit to IvanK-db/spark that referenced this pull request Sep 20, 2024
### What changes were proposed in this pull request?
This PR makes the org.apache.spark.sql.Column and friends use the recently introduced ColumnNode API. This is a stepping stone towards making the Column API implementation agnostic.

Most of the changes are fairly mechanical, and they are mostly caused by the removal of the Column(Expression) constructor.

### Why are the changes needed?
We want to create unified Scala interface for Classic and Connect. A language agnostic Column API implementation is part of this.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47688 from hvanhovell/SPARK-49022.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?
This PR makes the org.apache.spark.sql.Column and friends use the recently introduced ColumnNode API. This is a stepping stone towards making the Column API implementation agnostic.

Most of the changes are fairly mechanical, and they are mostly caused by the removal of the Column(Expression) constructor.

### Why are the changes needed?
We want to create unified Scala interface for Classic and Connect. A language agnostic Column API implementation is part of this.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47688 from hvanhovell/SPARK-49022.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Oct 9, 2024
… keep the behavior same

### What changes were proposed in this pull request?

This PR is a followup of #47688 that keeps `Column.toString` as the same before.

### Why are the changes needed?

To keep the same behaviour with Spark Classic and Connect.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released out yet.

### How was this patch tested?

Will be added separately. I manually tested:

```scala
import org.apache.spark.sql.functions.col
val name = "with`!#$%dot".replace("`", "``")
col(s"`${name}`").toString.equals("with`!#$%dot")
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48376 from HyukjinKwon/SPARK-49022-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants