-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle. #2819
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls resolve the conficts~
@@ -671,8 +671,14 @@ private[celeborn] class Worker( | |||
val resourceConsumptionSnapshot = storageManager.userResourceConsumptionSnapshot() | |||
val userResourceConsumptions = | |||
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava) | |||
resourceConsumptionSnapshot.foreach { case (userIdentifier, _) => | |||
resourceConsumptionSnapshot.foreach { case (userIdentifier, userResourceConsumption) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better not persist resourceConsumption into ratis When worker heartbeat subResourceConsumption
masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, this.getClass.getSimpleName, Map.empty) { | ||
val clusterQuota = getClusterStorageQuota | ||
var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0) | ||
val userResourceConsumption = statusSystem.workerSnapshot.asScala.flatMap { workerInfo => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack of tenant-level quota checks.
@@ -504,4 +501,10 @@ public boolean isWorkerAvailable(WorkerInfo workerInfo) { | |||
public void updateApplicationMeta(ApplicationMeta applicationMeta) { | |||
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); | |||
} | |||
|
|||
public List<WorkerInfo> workerSnapshot() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not directly use workers for computing resource consumption?
quotaChecker.scheduleWithFixedDelay( | ||
() => { | ||
try { | ||
updateResourceConsumption() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method not ly update source consumption but also check quota for user-level, app-level, so may use another name or separate the logic
What changes were proposed in this pull request?
2.1 When the tenant's resourceConsumption exceeds the tenant's quota, select the app with a larger consumption to mark interrupted.
2.2 When the resourceConsumption of the cluster exceeds the cluster quota, select the app with larger consumption to mark interrupted.
Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UTs.