-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[TASK] Optimize the storage of accumulables in core tools #1263
Conversation
Signed-off-by: Ahmed Hussein <[email protected]>
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
Signed-off-by: Ahmed Hussein <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Sayed Bilal Bari <[email protected]>
Signed-off-by: Ahmed Hussein <[email protected]>
Signed-off-by: Ahmed Hussein <[email protected]>
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/StatisticsMetrics.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor nits. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just minor nits, otherwise looks good
* @param taskId the taskId pulled from the TaskEnd event | ||
* @param accumulableInfo the accumulableInfo from the TaskEnd event | ||
*/ | ||
def addAccToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit rename addAccumToTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* triggered by a taskEnd event and the mapo between stage-Acc has not been | ||
* established yet. | ||
*/ | ||
def addAccToStage(stageId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit rename addAccumToStage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Signed-off-by: Ahmed Hussein <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tgravescs and @cindyyuanjiang for the prompt review
I addressed the comments and renamed classe names "Acc*
" to "Accum*
" to be consistent
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
Outdated
Show resolved
Hide resolved
* triggered by a taskEnd event and the mapo between stage-Acc has not been | ||
* established yet. | ||
*/ | ||
def addAccToStage(stageId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* @param taskId the taskId pulled from the TaskEnd event | ||
* @param accumulableInfo the accumulableInfo from the TaskEnd event | ||
*/ | ||
def addAccToTask(stageId: Int, taskId: Long, accumulableInfo: AccumulableInfo): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumInfo.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bilalbari and @amahussein for the design and implementation of this optimization. The numbers look promising.
NAMES_TABLE.computeIfAbsent(nameKey, AccumNameRef.fromString) | ||
} | ||
|
||
// Intern the accumulator name if it is not already present in the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be insert
?
Fixes #1202
While investigating bottlenecks, it was found that most of the objects being allocated are representing metrics.
In original code the accumulables were stored in a huge map
AppBase.taskStageAccumMap
There is another map hashMap that stored AccumulableIDs to set of StageIDs to build the Exec-to-Stage map.
The class
TaskStageAccumCase
definition is as follows:A new
TaskStageAccumCase
is created for each accumulable when the stage/stage is completed.Lets say there is a stage with 100 tasks.
An accumulator ID will have an ArrayBuffer of size 101. all of those entries will repeat the same common fields and differ only in the TaskID/update values if any.
Changes
This PR revisits the way the accumulables are stored.
AccumNameRef
. This implies that we create only one string and use it across all the threads to represent the same accumulable.AccumMetaRef
that holds<accumId-AccNameRef>
: this encapsulation tends to be very important to propagate the same optimization while dumping the data.AccumMetaRef
are stored in a per-app hashMap because this should not be shared across the different threads/applications. Once analysis is done, the map is collectedAccumProfileResult
is changed to useAccumMetaRef
to optimize the memory consumption. This reduces the number of allocations since accumMetaRef already exists in memory. Finally the CSVformat conversion is also part of the AccumNameRef because we should create only one value for each accumulator instead of reformating a new string for each row (X by number of stages)Unit tests affected
Future work and Followups
See the list of tasks in #815
Performance Optimizations
@bilalbari please share some performance numbers in this PR description:
Heap Usage Before Changes
Heap Usage Post Changes
Example of Failed Run that OOM
Setup -
OOM BASELINES
BenchmarkSuite output without changes
BenchmarkSuite output with changes