-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35384][SQL] Improve performance for InvokeLike.invoke #32527
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine otherwise.
var i = 0 | ||
val len = arguments.length | ||
while (i < len) { | ||
val e = arguments(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks we don't need this intermediate val?
evaluatedArgs(i) = arguments(i).eval(input).asInstanceOf[Object]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea let me remove it
evaluatedArgs(i) = e.eval(input).asInstanceOf[Object] | ||
i += 1 | ||
} | ||
if (needNullCheck && evaluatedArgs.exists(_ == null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: my IDE suggests .exists(_ == null)
-> .contains(null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the exists
part is not related to this PR but I'm happy to change it :)
var i = 0 | ||
val len = arguments.length | ||
while (i < len) { | ||
evaluatedArgs(i) = arguments(i).eval(input).asInstanceOf[Object] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to keep evaluatedArgs
as a (lazy) val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean just use val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't we evaluate arguments
for each time invoke
is called? Why not just having val evaluatedArgs: Array[Object] = new Array[Object](arguments.length)
in invoke
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it aims to reuse Array[Object]
itself and only changes the values of array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea even though we evaluate arguments
for each invoke
call we can reuse the same array to store the results of evaluation. I guess it's better than allocating a new Array[Object]
for each input row.
Kubernetes integration test starting |
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. The improvement looks nice!
Test build #138475 has finished for PR 32527 at commit
|
Thank you, @sunchao and all! Merged to master for Apache Spark 3.2.0. |
// return null if one of arguments is null | ||
null | ||
} else { | ||
val ret = try { | ||
method.invoke(obj, args: _*) | ||
method.invoke(obj, evaluatedArgs: _*) | ||
} catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also improve the last piece?
val boxedClass = ScalaReflection.typeBoxedJavaMapping.get(dataType)
if (boxedClass.isDefined) {
boxedClass.get.cast(ret)
} else {
ret
}
We can create a function for it
private lazy val boxing: Any => Any = ScalaReflection.typeBoxedJavaMapping.get(dataType).map(_.cast(_)).getOrElse(identity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do the similar thing in Invoke.eval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea let me try it. In the profiling after this PR, HashMap.get
takes 7.82% from the entire invoke
call so it seems worthwhile to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we can do the similar thing in Invoke.eval
though since obj
in obj.getClass.getMethod(functionName, argClasses: _*)
is different for each call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Another idea: obj
from InternalRow
are always of the same class, we can avoid this
@transient lazy val method = {
val cls = targetObject.dataType match {
case ObjectType(cls) => cls
case StringType => classOf[UTF8String]
case _: DecimalType => classOf[Decimal]
...
}
findMethod(cls, encodedFunctionName, argClasses)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm not sure. Looking at usages of Invoke
, it seems targetObject.dataType
is usually ObjectType
(for instance, in ScalarFunction
we wrap the UDF into a Literal
with ObjectType
), so curious how useful this would be and when we'd use StringType
/DecimalType
for the targetObject
.
Looking at the profiling result for Invoke.eval
, it is now dominated by InvokeLike.invoke
:
Although this is somewhat unrelated to the above as V2FunctionBenchmark
(and ScalarFunction
) uses ObjectType
for Invoke
so it's already handled by the current code:
@transient lazy val method = targetObject.dataType match {
case ObjectType(cls) =>
Some(findMethod(cls, encodedFunctionName, argClasses))
case _ => None
}
we may need new benchmarks if we decide to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, for UDF, it's just an extra method.isDefine
check, and probably not a big issue.
Test build #138480 has finished for PR 32527 at commit
|
What changes were proposed in this pull request?
Change
map
inInvokeLike.invoke
to a while loop to improve performance, following Spark style guide.Why are the changes needed?
InvokeLike.invoke
, which is used in non-codegen path forInvoke
andStaticInvoke
, currently usesmap
to evaluate arguments:which is pretty expensive if the method itself is trivial. We can change it to a plain while loop.
Benchmark results show this can improve as much as 3x from
V2FunctionBenchmark
:Before
After
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.