-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1679] Estimated ApplicationDiskUsage in cluster should be multiplied by worker count. #2865
base: main
Are you sure you want to change the base?
Conversation
f36b9cc
to
f1a7c63
Compare
… cluster should be multiplied by worker size.
f1a7c63
to
b1b6c74
Compare
cc @FMX |
Thanks for this PR but the assumption is not solid. Every worker will report its disk usage metrics to the master node by the worker heartbeat. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is incorrect for the following reasons:
- The master node collects the total disk usage from all workers, so this value should not be multiplied by the number of workers. Multiplying the usage by the worker count would result in an inflated and inaccurate total, significantly exceeding the actual usage.
- Additionally, the shuffle distribution may not be evenly distributed among the workers, particularly with Celeborn workers that support the 'LOADAWARE' slot assignment policy.
Thanks for your review and two issues you mentioned are reasonable. But if I've understood correctly, in current implementation, it appears that an application's usage on a single worker is considered as the usage for that application across the entire cluster, as shown in code blow: // org.apache.celeborn.common.meta.AppDiskUsageSnapShot#updateAppDiskUsage
// param: usage -> application disk usage in one worker, such as worker A
def updateAppDiskUsage(appId: String, usage: Long): Unit = {
// drop old application disk usage in topNitems
val dropIndex = topNItems.indexWhere(usage => usage != null && usage.appId == appId)
if (dropIndex != -1) {
drop(dropIndex)
}
// find the position to insert to persist the sorted order
val insertIndex = findInsertPosition(usage)
// put application disk usage in worker A into topNitems as application disk usage in cluster
if (insertIndex != -1) {
shift(insertIndex)
topNItems(insertIndex) = AppDiskUsage(appId, usage)
}
} Due to the issue previously mentioned, this approach would result in the reported Application Disk Usage being significantly lower than the actual usage of the Application across the cluster. |
@Z1Wu Thank you for your enthusiasm! The feature you're interested in has been addressed in this pull request. I recommend removing the |
What changes were proposed in this pull request?
Assumption : For an application, its shuffle data will be equally distributed to every worker, so we can use application disk usage in one worker to estimate application disk usage in whole cluster.
Logic for estimating application disk usage:
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?