-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Stage info code between Q/P tools #971
Refactor Stage info code between Q/P tools #971
Conversation
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]> Contributes to NVIDIA#477 This code change aims at bringing the Q/P tools handling of stages and their accumulator to a common ground. There is a couple of fixes done in this code change including: - Capture accumulator IDs of a stage during a stage completion event. - Fix the construction of MLFunctions - Fix the implementation of `jobAndStageMetricsAggregation` which was not efficient in iterating multiple times of the tasks list. - Remove redundant Data structure that maps between accumulators and stages.
@@ -88,7 +88,7 @@ object GenerateDot { | |||
val accumSummary = accums.map { a => | |||
Seq(a.sqlID, a.accumulatorId, a.total) | |||
} | |||
val accumIdToStageId = app.accumIdToStageId | |||
val accumIdToStageId = app.stageManager.reduceAccumMapping() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hack to get the generateDot to work with the 1-to-M map.
GenerateDot
is rarely used and fixing the implementation to be 1-to-M is going to bloat the PR
@@ -32,7 +32,8 @@ class CompareApplications(apps: Seq[ApplicationInfo]) extends Logging { | |||
def findMatchingStages(): (Seq[CompareProfileResults], Seq[CompareProfileResults]) = { | |||
val normalizedByAppId = apps.map { app => | |||
val normalized = app.sqlPlans.mapValues { plan => | |||
SparkPlanInfoWithStage(plan, app.accumIdToStageId).normalizeForStageComparison | |||
SparkPlanInfoWithStage(plan, | |||
app.stageManager.reduceAccumMapping()).normalizeForStageComparison |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduceAccumMapping()
is a hack to get the generateDot to work with the 1-to-M map.
GenerateDot is rarely used and fixing the implementation to be 1-to-M is going to bloat the PR
tasksInJob.map(_.sw_recordsWritten).sum, | ||
tasksInJob.map(_.sw_writeTime).sum | ||
)) | ||
// first get all stage aggregated levels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old code used to iterate on all jobs to get the stages, then iterate on all tasks within each stage to aggregate. This will create all the jobs rows.
Then it will do the same sequence to get all the stage rows.
This is clearly very time consuming.
Instead the new code does the following:
- Loop on all the stages and aggreate them.
- Cache the results in a hashMap.
- Loop on all the jobs and aggregate the values cached within the hashMap.
@@ -1,2 +1,2 @@ | |||
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) | |||
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.08,4479.65,392.34,1306,14353,4872,558,62.67,"","","JSON","","","",1306,4477,8214,6139,3.36,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 | |||
"Rapids Spark Profiling Tool Unit Tests","local-1622043423018","Not Recommended",1.06,4564.93,307.06,1306,14353,4872,472,62.67,"","","JSON","","","",1306,4477,9164,5189,2.86,true,"SerializeFromObject;Execute InsertIntoHadoopFsRelationCommand json;DeserializeToObject;Filter;MapElements;Scan","",30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I compared manually the output of the qualification before and after.
This PR fixed a bug resulting in linking more Execs to their Stage. That caused the transitions and the supported duration to be different compared to the previous code.
@@ -208,7 +208,6 @@ class ApplicationInfo( | |||
var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]() | |||
var sqlPlanMetricsAdaptive: ArrayBuffer[SQLPlanMetricsCase] = ArrayBuffer[SQLPlanMetricsCase]() | |||
|
|||
val accumIdToStageId: mutable.HashMap[Long, Int] = new mutable.HashMap[Long, Int]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant hashMap because appBase already defined such 1-toM
val existingStages = app.accumulatorToStages.getOrElse(accumId, Set.empty) | ||
app.accumulatorToStages.put(accumId, existingStages + event.stageInfo.stageId) | ||
} | ||
app.getOrCreateStage(event.stageInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Encapsulate the initialization and teh claculation of duration within the retrieval of the stage object.
@@ -226,8 +225,8 @@ abstract class AppBase( | |||
} | |||
|
|||
if (mlOps.nonEmpty) { | |||
Some(MLFunctions(Some(appId.toString), stageInfo.info.stageId, mlOps, | |||
stageInfo.duration.getOrElse(0))) | |||
Some(MLFunctions(appId, stageModel.sId, mlOps, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was a bug that the stageId is used instead of the AppID
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Outdated
Show resolved
Hide resolved
* Spark-information. | ||
*/ | ||
@param @field @getter @setter @beanGetter @beanSetter | ||
class Calculated(desc: String = "") extends scala.annotation.Annotation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have my reservations on how useful this is going to be. Or atleast how useful it should be. Ideally most things read from Spark are named appropriately that they should be fairly obvious. Otherwise everything is calculated so you would just end up with annotations everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. My thought is that this annotation could help at least temporarily as we won't have to rename the fields everywhere.
I had some hard time in understanding where each field comes from and whether we need to revisit how it is deduced.
stage.duration = ProfileUtils.optionLongMinusOptionLong(stage.completionTime, | ||
stage.info.submissionTime) | ||
val stageAccumulatorIds = event.stageInfo.accumulables.values.map { m => m.id }.toSeq | ||
stageAccumulatorIds.foreach { accumId => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here as I thought part of the change here was to fix the tracking of accumulators on stage completd, this looks like we were? Were we just not using them or was it the taskend one we weren't adding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were not adding them in the taskEnd.
The code is removed from EventProcessorBase
and moved to the StageManager
during the creating of the StageModel instance.
stage.completionTime
andstage.failureReason
are redundant because we were capturing Spark's stageInfo. In the new version, we will read them from Spark's StageInfo
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Outdated
Show resolved
Hide resolved
var duration: Option[Long] = None | ||
|
||
// Whenever an event is triggered, the object should update the Stage info. | ||
private def updatedInfo(newSInfo: StageInfo): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function should be updateInfo (without d) if its actively doing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Show resolved
Hide resolved
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala
Show resolved
Hide resolved
// We keep track of the attemptId to allow improvement down the road if we decide to handle | ||
// different Attempts. | ||
// - 1st level maps between [Int: StageId -> 2nd Level] | ||
// - 2nd level maps between [Int:, StageModel] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing attemptId in the desription
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein !
Signed-off-by: Ahmed Hussein (amahussein) [email protected]
Contributes to #980
This code change aims at bringing the Q/P tools handling of stages and their accumulator to a common ground.
There is a couple of fixes done in this code change including:
jobAndStageMetricsAggregation
which was not efficient in iterating multiple times of the tasks list.Changes
StageCompleted
/StageSubmitted
events to capture the accumulator IDsAnalysis.jobAndStageMetricsAggregation()
because it was unecessarily iterating several times on all the tasks. The new implementationiterates only once on the tasks, then uses the cached values for each stage to calculate the job aggregated metrics.Calculated
: To distinguish between fields that are loaded from Spark Vs the ones that are calculated by the Tools.WallClock
: to be used to distinguish fields that represent wallClock Vs DF durations (aggregations on tasks)Since
: Similar to Spark to make it easy to know when a specific logic was added to the tools.MLFunctions
. The constructor mistakenly used the stageId instead of appID.