Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor cluster-task-manage 4/n] refactor cluster_task_manager into distributed and local part #21660

Merged
merged 12 commits into from
Feb 17, 2022

Conversation

scv119
Copy link
Contributor

@scv119 scv119 commented Jan 18, 2022

Why are these changes needed?

This is a working in progress PR that splits cluster_task_manager into local and distributed parts.

For the distributed scheduler (cluster_task_manager_:
/// Schedules a task onto one node of the cluster. The logic is as follows:
/// 1. Queue tasks for scheduling.
/// 2. Pick a node on the cluster which has the available resources to run a
/// task.
/// * Step 2 should occur any time the state of the cluster is
/// changed, or a new task is queued.
/// 3. For tasks that's infeasible, put them into infeasible queue and reports
/// it to gcs, where the auto scaler will be notified and start new node
/// to accommodate the requirement.

For the local task manager:

/// It Manages the lifetime of a task on the local node. It receives request from
/// cluster_task_manager (the distributed scheduler) and does the following
/// steps:
/// 1. Pulling task dependencies, add the task into to_dispatch queue.
/// 2. Once task's dependencies are all pulled locally, the task becomes ready
/// to dispatch.
/// 3. For all tasks that are dispatch-ready, we schedule them by acquiring
/// local resources (including pinning the objects in memory and deduct
/// cpu/gpu and other resources from local resource manager), and start
/// a worker.
/// 4. If task failed to acquire resources in step 3, we will try to
/// spill it to a different remote node.
/// 5. When a worker finishes executing its task(s), the requester will return
/// it and we should release the resources in our view of the node's state.
/// 6. If a task has been waiting for arguments for too long, it will also be
/// spilled back to a different node.
///

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@scv119 scv119 force-pushed the unify-scheduler branch 3 times, most recently from f4667be to aac0d2c Compare January 20, 2022 08:46
@bveeramani
Copy link
Member

‼️ ACTION REQUIRED ‼️

We've switched our code formatter from YAPF to Black (see #21311).

To prevent issues with merging your code, here's what you'll need to do:

  1. Install Black
pip install -I black==21.12b0
  1. Format changed files with Black
curl -o format-changed.sh https://gist.githubusercontent.com/bveeramani/42ef0e9e387b755a8a735b084af976f2/raw/7631276790765d555c423b8db2b679fd957b984a/format-changed.sh
chmod +x ./format-changed.sh
./format-changed.sh
rm format-changed.sh
  1. Commit your changes.
git add --all
git commit -m "Format Python code with Black"
  1. Merge master into your branch.
git pull upstream master
  1. Resolve merge conflicts (if necessary).

After running these steps, you'll have the updated format.sh.

@scv119 scv119 force-pushed the unify-scheduler branch 2 times, most recently from 5b3f9d7 to d64fbf9 Compare February 4, 2022 08:46
@scv119 scv119 changed the title [prototype] Unify scheduler refactor cluster_task_manager into distributed and local part Feb 4, 2022
@scv119 scv119 marked this pull request as ready for review February 4, 2022 08:47
@scv119 scv119 changed the title refactor cluster_task_manager into distributed and local part [WIP] refactor cluster_task_manager into distributed and local part Feb 4, 2022
@scv119 scv119 changed the title [WIP] refactor cluster_task_manager into distributed and local part [refactor cluster-task-manage 1/n][WIP] refactor cluster_task_manager into distributed and local part Feb 7, 2022
@scv119 scv119 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 7, 2022
@jjyao
Copy link
Collaborator

jjyao commented Feb 8, 2022

Any plan to make the PR smaller?

@scv119
Copy link
Contributor Author

scv119 commented Feb 8, 2022

Yeah i think once I fixed all the tests I'll try to see how to make it easier to review!

@scv119 scv119 force-pushed the unify-scheduler branch 3 times, most recently from e435fa8 to 59e5aa3 Compare February 11, 2022 06:58
@fishbone
Copy link
Contributor

I'm good about this PR since most of them are just to move things around.
But given the PR is so big, the reviewer probably won't be able to review it in detail. Could you help point out what's the thing you think we should pay more attention to in this PR when reviewing?

@scv119 scv119 changed the title [refactor cluster-task-manage 1/n][WIP] refactor cluster_task_manager into distributed and local part [refactor cluster-task-manage 4/n] refactor cluster_task_manager into distributed and local part Feb 14, 2022
info_by_sched_cls_.erase(scheduling_class);
}
if (is_infeasible) {
// TODO(scv119): fail the request.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iycheng @rkooo567
this is one major change. as I discussed with Sang, this could only happen (a request is previously feasible and latter becomes infeasible) when a placement group resource being deallocated. thus we should just go ahead and fail the request.

Spillback(node_id, work);
}
NodeID node_id = NodeID::FromBinary(node_id_string);
ScheduleOnNode(node_id, work);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is another major change: we hide the scheduling on local node/remote node into this scheduleOnNode function.

@scv119 scv119 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 14, 2022
@jjyao
Copy link
Collaborator

jjyao commented Feb 14, 2022

Are you going to split into more PRs or this is the PR for review?

@scv119 scv119 force-pushed the unify-scheduler branch 2 times, most recently from 57f3c6c to 2bb01fc Compare February 15, 2022 22:30
@scv119
Copy link
Contributor Author

scv119 commented Feb 15, 2022

@jjyao I think there is no more low hanging fruits to break down this refactor at the moment.

info_by_sched_cls_.erase(scheduling_class);
}
if (is_infeasible) {
// TODO(scv119): fail the request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced with Chen. It seems that this should never happen. Maybe let's just RAY_CHECK here in case of any failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, but why is it a bug? Is this code path not reachable when pg is removed?

Copy link
Contributor

@fishbone fishbone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM and thanks for the work. The architecture is much better than before.

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also went it over with Chen. I think it is good to go. The current form seems to be the best for a single PR

@scv119
Copy link
Contributor Author

scv119 commented Feb 17, 2022

[this](const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
return GetObjectsFromPlasma(object_ids, results);
},
max_task_args_memory);
cluster_task_manager_ = std::make_shared<ClusterTaskManager>(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The std::dynamic_pointer_cast could be removed.

/// \param readyIds: The tasks which are now ready to be dispatched.
virtual void TasksUnblocked(const std::vector<TaskID> &ready_ids) = 0;

;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a redundant ;?

@scv119 scv119 merged commit ab53848 into ray-project:master Feb 17, 2022
@scv119 scv119 added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Feb 17, 2022
@scv119
Copy link
Contributor Author

scv119 commented Feb 17, 2022

oops, will address those comments in follow up PRs!

simonsays1980 pushed a commit to simonsays1980/ray that referenced this pull request Feb 27, 2022
… distributed and local part (ray-project#21660)

This is a working in progress PR that splits cluster_task_manager into local and distributed parts.

For the distributed scheduler (cluster_task_manager_:
/// Schedules a task onto one node of the cluster. The logic is as follows:
/// 1. Queue tasks for scheduling.
/// 2. Pick a node on the cluster which has the available resources to run a
/// task.
/// * Step 2 should occur any time the state of the cluster is
/// changed, or a new task is queued.
/// 3. For tasks that's infeasible, put them into infeasible queue and reports
/// it to gcs, where the auto scaler will be notified and start new node
/// to accommodate the requirement.

For the local task manager:

/// It Manages the lifetime of a task on the local node. It receives request from
/// cluster_task_manager (the distributed scheduler) and does the following
/// steps:
/// 1. Pulling task dependencies, add the task into to_dispatch queue.
/// 2. Once task's dependencies are all pulled locally, the task becomes ready
/// to dispatch.
/// 3. For all tasks that are dispatch-ready, we schedule them by acquiring
/// local resources (including pinning the objects in memory and deduct
/// cpu/gpu and other resources from local resource manager), and start
/// a worker.
/// 4. If task failed to acquire resources in step 3, we will try to
/// spill it to a different remote node.
/// 5. When a worker finishes executing its task(s), the requester will return
/// it and we should release the resources in our view of the node's state.
/// 6. If a task has been waiting for arguments for too long, it will also be
/// spilled back to a different node.
///
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants