Skip to content

Commit

Permalink
Add operators to ignore list and update WindowExpr parser (#890)
Browse files Browse the repository at this point in the history
* Add operators to ignore list and update WindowExpr parser

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 authored Apr 1, 2024
1 parent 12423b5 commit b15659c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,22 @@ object SQLPlanParser extends Logging {
// Avoid counting duplicate nodes. We mark them as shouldRemove to neutralize their impact on
// speedups.
val isDupNode = reusedNodeIds.contains(node.id)
// Normalize the execName by removing the trailing '$' character, if present.
// This is necessary because in Scala, the '$' character is often appended to the names of
// generated classes or objects, and we want to match the base name regardless of this suffix.
val normalizedNodeName = node.name.stripSuffix("$")
if (isDupNode) {
// log that information. This should not cause significant increase in log size.
logDebug(s"Marking [sqlID = ${sqlID}, node = ${node.name}] as shouldRemove. " +
logDebug(s"Marking [sqlID = ${sqlID}, node = ${normalizedNodeName}] as shouldRemove. " +
s"Reason: duplicate - ancestor of ReusedExchange")
}
if (node.name.contains("WholeStageCodegen")) {
if (normalizedNodeName.contains("WholeStageCodegen")) {
// this is special because it is a SparkPlanGraphCluster vs SparkPlanGraphNode
WholeStageExecParser(node.asInstanceOf[SparkPlanGraphCluster], checker, sqlID, app,
reusedNodeIds).parse
} else {
val execInfos = try {
node.name match {
normalizedNodeName match {
case "AggregateInPandas" =>
AggregateInPandasExecParser(node, checker, sqlID).parse
case "ArrowEvalPython" =>
Expand All @@ -481,11 +485,6 @@ object SQLPlanParser extends Logging {
CoalesceExecParser(node, checker, sqlID).parse
case "CollectLimit" =>
CollectLimitExecParser(node, checker, sqlID).parse
case c if c.contains("CreateDataSourceTableAsSelectCommand") =>
// create data source table doesn't show the format so we can't determine
// if we support it
ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id,
isSupported = false, None)
case "CustomShuffleReader" | "AQEShuffleRead" =>
CustomShuffleReaderExecParser(node, checker, sqlID).parse
case "Exchange" =>
Expand Down Expand Up @@ -546,8 +545,8 @@ object SQLPlanParser extends Logging {
// Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as
// supported but with shouldRemove flag set to True.
// Setting the "shouldRemove" is handled at the end of the function.
ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id,
isSupported = reuseExecs.contains(node.name), None)
ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id,
isSupported = reuseExecs.contains(normalizedNodeName), None)
}
} catch {
// Error parsing expression could trigger an exception. If the exception is not handled,
Expand All @@ -558,9 +557,9 @@ object SQLPlanParser extends Logging {
// - No need to add the SQL to the failed SQLs, because this will cause the app to be
// labeled as "Not Applicable" which is not preferred at this point.
case NonFatal(e) =>
logWarning(s"Unexpected error parsing plan node ${node.name}. " +
logWarning(s"Unexpected error parsing plan node ${normalizedNodeName}. " +
s" sqlID = ${sqlID}", e)
ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id,
ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id,
isSupported = false, None)
}
val stagesInNode = getStagesInSQLNode(node, app)
Expand Down Expand Up @@ -760,15 +759,17 @@ object SQLPlanParser extends Logging {
trim.replaceAll("""^\[+""", "").replaceAll("""\]+$""", "").
replaceAll("cast\\(", "").split("windowspecdefinition").map(_.trim)

// Get functionname from each array element except the last one as it doesn't contain
// Get function name from each array element except the last one as it doesn't contain
// any window function
for ( i <- 0 to windowExprs.size - 1 ) {
val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList
val expr = windowFunc.last
val functionName = getFunctionName(windowFunctionPattern, expr)
functionName match {
case Some(func) => parsedExpressions += func
case _ => // NO OP
if (windowExprs.nonEmpty) {
windowExprs.dropRight(1).foreach { windowExprString =>
val windowFunc = windowFunctionPattern.findAllIn(windowExprString).toList
val expr = windowFunc.lastOption.getOrElse("")
val functionName = getFunctionName(windowFunctionPattern, expr)
functionName match {
case Some(func) => parsedExpressions += func
case _ => // NO OP
}
}
}
parsedExpressions.distinct.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ object ExecHelper {
private val ExecuteRefreshTable = "Execute RefreshTable"
private val ExecuteRepairTableCommand = "Execute RepairTableCommand"
private val ExecuteShowPartitionsCommand = "Execute ShowPartitionsCommand"
private val ExecuteClearCacheCommand = "Execute ClearCacheCommand"
private val ExecuteOptimizeTableCommandEdge = "Execute OptimizeTableCommandEdge"
// DeltaLakeOperations
private val ExecUpdateCommandEdge = "Execute UpdateCommandEdge"
private val ExecDeleteCommandEdge = "Execute DeleteCommandEdge"
Expand Down Expand Up @@ -444,6 +446,8 @@ object ExecHelper {
ExecuteRefreshTable,
ExecuteRepairTableCommand,
ExecuteShowPartitionsCommand,
ExecuteClearCacheCommand,
ExecuteOptimizeTableCommandEdge,
SubqueryExecParser.execName
)

Expand Down

0 comments on commit b15659c

Please sign in to comment.