Skip to content

Commit

Permalink
catch NoSuchTypeException of ValidateSparkPlan and NoSuchMethodError …
Browse files Browse the repository at this point in the history
…of HashAggregateExec.copy (#602)
  • Loading branch information
XorSum authored Oct 11, 2024
1 parent 2812843 commit c6f6b76
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,27 @@
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.pool.TypePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ValidateSparkPlanInjector {

private static final Logger logger = LoggerFactory.getLogger(ValidateSparkPlanInjector.class);

public static void inject() {
ByteBuddyAgent.install();
TypeDescription typeDescription = TypePool.Default.ofSystemLoader()
.describe("org.apache.spark.sql.execution.adaptive.ValidateSparkPlan$")
.resolve();
new ByteBuddy()
.redefine(typeDescription, ClassFileLocator.ForClassLoader.ofSystemLoader())
.method(named("apply"))
.intercept(MethodDelegation.to(ValidateSparkPlanApplyInterceptor.class))
.make()
.load(ClassLoader.getSystemClassLoader(), ClassLoadingStrategy.Default.INJECTION);
try {
ByteBuddyAgent.install();
TypeDescription typeDescription = TypePool.Default.ofSystemLoader()
.describe("org.apache.spark.sql.execution.adaptive.ValidateSparkPlan$")
.resolve();
new ByteBuddy()
.redefine(typeDescription, ClassFileLocator.ForClassLoader.ofSystemLoader())
.method(named("apply"))
.intercept(MethodDelegation.to(ValidateSparkPlanApplyInterceptor.class))
.make()
.load(ClassLoader.getSystemClassLoader(), ClassLoadingStrategy.Default.INJECTION);
} catch (TypePool.Resolution.NoSuchTypeException e) {
logger.warn("No such type of ValidateSparkPlan", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,30 @@ object BlazeConverters extends Logging {
// for enabling filter-project optimization in native side
getPartialAggProjection(exec.aggregateExpressions, exec.groupingExpressions) match {
case Some((transformedAggregateExprs, transformedGroupingExprs, projections)) =>
return convertHashAggregateExec(
exec.copy(
aggregateExpressions = transformedAggregateExprs,
groupingExpressions = transformedGroupingExprs,
child = convertProjectExec(ProjectExec(projections, exec.child))))
val transformedExec =
try {
exec.copy(
aggregateExpressions = transformedAggregateExprs,
groupingExpressions = transformedGroupingExprs,
child = convertProjectExec(ProjectExec(projections, exec.child)))
} catch {
case _: NoSuchMethodError =>
import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
val mirror = currentMirror.reflect(exec)
val copyMethod = typeOf[HashAggregateExec].decl(TermName("copy")).asMethod
val params = copyMethod.paramLists.flatten
val args = params.map { param =>
param.name.toString match {
case "aggregateExpressions" => transformedAggregateExprs
case "groupingExpressions" => transformedGroupingExprs
case "child" => convertProjectExec(ProjectExec(projections, exec.child))
case _ => mirror.reflectField(param.asTerm).get
}
}
mirror.reflectMethod(copyMethod)(args: _*).asInstanceOf[HashAggregateExec]
}
return convertHashAggregateExec(transformedExec)
case None => // passthrough
}

Expand Down

0 comments on commit c6f6b76

Please sign in to comment.