diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index daafe0ddc..82817559c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -451,18 +451,22 @@ object SQLPlanParser extends Logging { // Avoid counting duplicate nodes. We mark them as shouldRemove to neutralize their impact on // speedups. val isDupNode = reusedNodeIds.contains(node.id) + // Normalize the execName by removing the trailing '$' character, if present. + // This is necessary because in Scala, the '$' character is often appended to the names of + // generated classes or objects, and we want to match the base name regardless of this suffix. + val normalizedNodeName = node.name.stripSuffix("$") if (isDupNode) { // log that information. This should not cause significant increase in log size. - logDebug(s"Marking [sqlID = ${sqlID}, node = ${node.name}] as shouldRemove. " + + logDebug(s"Marking [sqlID = ${sqlID}, node = ${normalizedNodeName}] as shouldRemove. " + s"Reason: duplicate - ancestor of ReusedExchange") } - if (node.name.contains("WholeStageCodegen")) { + if (normalizedNodeName.contains("WholeStageCodegen")) { // this is special because it is a SparkPlanGraphCluster vs SparkPlanGraphNode WholeStageExecParser(node.asInstanceOf[SparkPlanGraphCluster], checker, sqlID, app, reusedNodeIds).parse } else { val execInfos = try { - node.name match { + normalizedNodeName match { case "AggregateInPandas" => AggregateInPandasExecParser(node, checker, sqlID).parse case "ArrowEvalPython" => @@ -481,11 +485,6 @@ object SQLPlanParser extends Logging { CoalesceExecParser(node, checker, sqlID).parse case "CollectLimit" => CollectLimitExecParser(node, checker, sqlID).parse - case c if c.contains("CreateDataSourceTableAsSelectCommand") => - // create data source table doesn't show the format so we can't determine - // if we support it - ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id, - isSupported = false, None) case "CustomShuffleReader" | "AQEShuffleRead" => CustomShuffleReaderExecParser(node, checker, sqlID).parse case "Exchange" => @@ -546,8 +545,8 @@ object SQLPlanParser extends Logging { // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as // supported but with shouldRemove flag set to True. // Setting the "shouldRemove" is handled at the end of the function. - ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id, - isSupported = reuseExecs.contains(node.name), None) + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, + isSupported = reuseExecs.contains(normalizedNodeName), None) } } catch { // Error parsing expression could trigger an exception. If the exception is not handled, @@ -558,9 +557,9 @@ object SQLPlanParser extends Logging { // - No need to add the SQL to the failed SQLs, because this will cause the app to be // labeled as "Not Applicable" which is not preferred at this point. case NonFatal(e) => - logWarning(s"Unexpected error parsing plan node ${node.name}. " + + logWarning(s"Unexpected error parsing plan node ${normalizedNodeName}. " + s" sqlID = ${sqlID}", e) - ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id, + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, isSupported = false, None) } val stagesInNode = getStagesInSQLNode(node, app) @@ -760,15 +759,17 @@ object SQLPlanParser extends Logging { trim.replaceAll("""^\[+""", "").replaceAll("""\]+$""", ""). replaceAll("cast\\(", "").split("windowspecdefinition").map(_.trim) - // Get functionname from each array element except the last one as it doesn't contain + // Get function name from each array element except the last one as it doesn't contain // any window function - for ( i <- 0 to windowExprs.size - 1 ) { - val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList - val expr = windowFunc.last - val functionName = getFunctionName(windowFunctionPattern, expr) - functionName match { - case Some(func) => parsedExpressions += func - case _ => // NO OP + if (windowExprs.nonEmpty) { + windowExprs.dropRight(1).foreach { windowExprString => + val windowFunc = windowFunctionPattern.findAllIn(windowExprString).toList + val expr = windowFunc.lastOption.getOrElse("") + val functionName = getFunctionName(windowFunctionPattern, expr) + functionName match { + case Some(func) => parsedExpressions += func + case _ => // NO OP + } } } parsedExpressions.distinct.toArray diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 49d33c08b..279d7b04a 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -415,6 +415,8 @@ object ExecHelper { private val ExecuteRefreshTable = "Execute RefreshTable" private val ExecuteRepairTableCommand = "Execute RepairTableCommand" private val ExecuteShowPartitionsCommand = "Execute ShowPartitionsCommand" + private val ExecuteClearCacheCommand = "Execute ClearCacheCommand" + private val ExecuteOptimizeTableCommandEdge = "Execute OptimizeTableCommandEdge" // DeltaLakeOperations private val ExecUpdateCommandEdge = "Execute UpdateCommandEdge" private val ExecDeleteCommandEdge = "Execute DeleteCommandEdge" @@ -444,6 +446,8 @@ object ExecHelper { ExecuteRefreshTable, ExecuteRepairTableCommand, ExecuteShowPartitionsCommand, + ExecuteClearCacheCommand, + ExecuteOptimizeTableCommandEdge, SubqueryExecParser.execName )