Skip to content

Commit

Permalink
[SPARK-43995][SPARK-43996][CONNECT] Add support for UDFRegistration t…
Browse files Browse the repository at this point in the history
…o the Connect Scala Client

### What changes were proposed in this pull request?

This PR adds support to register a scala UDF from the scala/jvm client.

The following APIs are implemented in `UDFRegistration`:

- `def register(name: String, udf: UserDefinedFunction): UserDefinedFunction`
- `def register[RT: TypeTag, A1: TypeTag ...](name: String, func: (A1, ...) => RT): UserDefinedFunction` for 0 to 22 arguments.

The following API is implemented in `functions`:

- `def call_udf(udfName: String, cols: Column*): Column`

Note: This PR is stacked on apache#41959.
### Why are the changes needed?

To reach parity with classic Spark.

### Does this PR introduce _any_ user-facing change?

Yes. spark.udf.register() is added as shown below:
```scala
class A(x: Int) { def get = x * 100 }
val myUdf = udf((x: Int) => new A(x).get)
spark.udf.register("dummyUdf", myUdf)
spark.sql("select dummyUdf(id) from range(5)").as[Long].collect()
```
The output:
```scala
Array[Long] = Array(0L, 100L, 200L, 300L, 400L)
````

### How was this patch tested?

New tests in `ReplE2ESuite`.

Closes apache#41953 from vicennial/SPARK-43995.

Authored-by: vicennial <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
vicennial authored and HyukjinKwon committed Jul 14, 2023
1 parent d1d9760 commit 7ecdad5
Show file tree
Hide file tree
Showing 7 changed files with 1,139 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,30 @@ class SparkSession private[sql] (
range(start, end, step, Option(numPartitions))
}

/**
* A collection of methods for registering user-defined functions (UDF).
*
* The following example registers a Scala closure as UDF:
* {{{
* sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)
* }}}
*
* The following example registers a UDF in Java:
* {{{
* sparkSession.udf().register("myUDF",
* (Integer arg1, String arg2) -> arg2 + arg1,
* DataTypes.StringType);
* }}}
*
* @note
* The user-defined functions must be deterministic. Due to optimization, duplicate
* invocations may be eliminated or the function may even be invoked more times than it is
* present in the query.
*
* @since 3.5.0
*/
lazy val udf: UDFRegistration = new UDFRegistration(this)

// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/**
Expand Down Expand Up @@ -525,6 +549,13 @@ class SparkSession private[sql] (
client.execute(plan).asScala.toSeq
}

private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction): Unit = {
val command = proto.Command.newBuilder().setRegisterFunction(udf).build()
val plan = proto.Plan.newBuilder().setCommand(command).build()

client.execute(plan)
}

@DeveloperApi
def execute(extension: com.google.protobuf.Any): Unit = {
val command = proto.Command.newBuilder().setExtension(extension).build()
Expand Down
Loading

0 comments on commit 7ecdad5

Please sign in to comment.