Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34981][SQL] Implement V2 function resolution and evaluation #32082

Closed
wants to merge 21 commits into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Apr 7, 2021

Co-Authored-By: Chao Sun [email protected]
Co-Authored-By: Ryan Blue [email protected]

What changes were proposed in this pull request?

This implements function resolution and evaluation for functions registered through V2 FunctionCatalog SPARK-27658. In particular:

  • Added documentation for how to define the "magic method" in ScalarFunction.
  • Added a new expression ApplyFunctionExpression which evaluates input by delegating to ScalarFunction.produceResult method.
  • added a new expression V2Aggregator which is a type of TypedImperativeAggregate. It's a wrapper of V2 AggregateFunction and mostly delegate methods to the implementation of the latter. It also uses plain Java serde for intermediate state.
  • Added function resolution logic for ScalarFunction and AggregateFunction in Analyzer.
    • For ScalarFunction this checks if the magic method is implemented through Java reflection, and create a Invoke expression if so. Otherwise, it checks if the default produceResult is overridden. If so, it creates a ApplyFunctionExpression which evaluates through InternalRow. Otherwise an analysis exception is thrown.
  • For AggregateFunction, this checks if the update method is overridden. If so, it converts it to V2Aggregator. Otherwise an analysis exception is thrown similar to the case of ScalarFunction.
  • Extended existing InMemoryTableCatalog to add the function catalog capability. Also renamed it to InMemoryCatalog since it no longer only covers tables.

Note: this currently can successfully detect whether a subclass overrides the default produceResult or update method from the parent interface only for Java implementations. It seems in Scala it's hard to differentiate whether a subclass overrides a default method from its parent interface. In this case, it will be a runtime error instead of analysis error.

A few TODOs:

  • Extend V2SessionCatalog with function catalog. This seems a little tricky since API such V2 FunctionCatalog's loadFunction is different from V1 SessionCatalog's lookupFunction.
  • Add magic method for AggregateFunction.
  • Type coercion when looking up functions

Why are the changes needed?

As V2 FunctionCatalog APIs are finalized, we should integrate it with function resolution and evaluation process so that they are actually useful.

Does this PR introduce any user-facing change?

Yes, now a function exposed through V2 FunctionCatalog can be analyzed and evaluated.

How was this patch tested?

Added new unit tests.

@SparkQA
Copy link

SparkQA commented Apr 7, 2021

Test build #137036 has finished for PR 32082 at commit afe0f62.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ApplyFunctionExpression(
  • case class V2Aggregator[BUF <: java.io.Serializable, OUT](
  • implicit class FunctionIdentifierHelper(ident: FunctionIdentifier)

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41614/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41614/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41617/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41617/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41619/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41619/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Test build #137039 has finished for PR 32082 at commit 1ae5520.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Test build #137041 has finished for PR 32082 at commit c522276.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41642/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41642/

@SparkQA
Copy link

SparkQA commented Apr 8, 2021

Test build #137064 has finished for PR 32082 at commit 173f7e1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Test build #137227 has finished for PR 32082 at commit dc9dde6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41807/

@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41817/

@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41817/

@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Test build #137237 has finished for PR 32082 at commit c94e7d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao
Copy link
Member Author

sunchao commented Apr 12, 2021

cc @cloud-fan @rdblue

@@ -1967,6 +1969,9 @@ class Analyzer(override val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = {
val externalFunctionNameSet = new mutable.HashSet[FunctionIdentifier]()
plan.resolveExpressions {
case f @ UnresolvedFunction(NonSessionCatalogAndIdentifier(_, _), _, _, _, _) =>
// no-op if this is from a v2 catalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add the v2 API to check function existence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes looking at SPARK-19737 I think this will be very useful (although the checking won't be complete since bind could still fail). Do you mind if I add a TODO here and do it separately later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Test build #137989 has finished for PR 32082 at commit 97c29c3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42507/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42507/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42509/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42509/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42512/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42512/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Test build #137992 has finished for PR 32082 at commit c453b64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42515/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42515/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Test build #137987 has finished for PR 32082 at commit f25b5e6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Test build #137994 has finished for PR 32082 at commit 790d27f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


bound match {
case scalarFunc: ScalarFunction[_] =>
if (isDistinct) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for readability, can we put the logic of handling ScalarFunction into a method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done. I think we can also consider moving the big chunk of handling of V1 and V2 functions separately into two functions like what I used to have:

case u @ UnresolvedFunction(AsFunctionIdentifier(ident), arguments, isDistinct, 
  filter, ignoreNulls) => withPosition(u) {
    processV1Function(...)
  }

case u @ UnresolvedFunction(nameParts, arguments, isDistinct, filter, ignoreNulls) =>
  withPosition(u) {
    processV2Function(...)
  }

to keep the main logic of pattern matching on unresolved functions clearer. Let me know if you prefer this way too.

}
}
}
case aggFunc: V2AggregateFunction[_, _] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, put into a new method.

*
* <pre>
* public class IntegerAdd implements{@code ScalarFunction<Integer>} {
* public int invoke(int left, int right) {
Copy link
Member Author

@sunchao sunchao Apr 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think we can also consider adding another "static invoke" API for those stateless UDFs. From the benchmark you did sometime back it seems this can give a decent performance improvement. WDYT?

Java HotSpot(TM) 64-Bit Server VM 1.8.0_161-b12 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
UDF perf:                                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
native add                                        14206          14516         535         70.4          14.2       1.0X
udf add                                           24609          25271         898         40.6          24.6       0.6X
new udf add                                       18657          19096         726         53.6          18.7       0.8X
new row udf add                                   21128          22343        1478         47.3          21.1       0.7X
static udf add                                    16678          16887         278         60.0          16.7       0.9X

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao can you spend some time on the API design? I'd love to see this feature!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure will do. It should similar to the current invoke and we can leverage StaticInvoke for the purpose. Do you think we can do this in a separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42525/

@SparkQA
Copy link

SparkQA commented Apr 27, 2021

Test build #138006 has finished for PR 32082 at commit c18715f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 86d3bb5 Apr 28, 2021
@sunchao
Copy link
Member Author

sunchao commented Apr 28, 2021

Thanks all!

@sunchao sunchao deleted the resolve-func-v2 branch April 28, 2021 18:35
override def name: String = function.name()
override def dataType: DataType = function.resultType()

private lazy val reusedRow = new GenericInternalRow(children.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use SpecificInternalRow to avoid boxing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks like we should. Let me see change this and run the benchmark again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delay - just opened #32647 for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants