-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Train][Observability] Track Train Run Info with TrainStateActor
#44585
[Train][Observability] Track Train Run Info with TrainStateActor
#44585
Conversation
Signed-off-by: woshiyyya <[email protected]>
Signed-off-by: woshiyyya <[email protected]>
Signed-off-by: woshiyyya <[email protected]>
Signed-off-by: woshiyyya <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
python/ray/train/_internal/stats.py
Outdated
def __init__(self) -> None: | ||
self.stats_actor = get_or_launch_stats_actor() | ||
|
||
def register_train_run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to make sure all the pydantic related code is encapsulated in TrainRunStatsManager
. So that OSS users who are not using ray[default]
will not get an error.
@@ -58,6 +59,7 @@ def setup(self, config): | |||
logdir=self._storage.trial_driver_staging_path, | |||
driver_ip=None, | |||
experiment_name=self._storage.experiment_dir_name, | |||
run_id=uuid.uuid4().hex, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a unique ID that differentiate each run.
- Cannot use trial id because
trainer.restore
will reuse trial id. - Cannot use job id because there could be multiple train runs in one job.
- Cannot use trial id + job id because one can restore a run multiple times in one job.
Signed-off-by: Yunxuan Xiao <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looking good!
python/ray/train/_internal/stats.py
Outdated
trial_name: str, | ||
trainer_actor_id: str, | ||
datasets: Dict[str, Dataset], | ||
worker_group: WorkerGroup, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it feels a bit weird to pass the WorkerGroup
here, but not sure if there is another cleaner way to organize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consideration here is: when we do elastic/fault-tolerant training, we can avoid using an old workergroup if we pass it through the function arguments.
Co-authored-by: matthewdeng <[email protected]> Co-authored-by: Justin Yu <[email protected]> Signed-off-by: Yunxuan Xiao <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
TrainStateActor
Signed-off-by: yunxuanx <[email protected]>
This manager class is created on the train controller layer for each run. | ||
""" | ||
|
||
def __init__(self, worker_group: WorkerGroup) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is actually better to pass it in register_train_run
as before so we don't keep unnecessary state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about having a api to update the worker group? We can re-register the worker group in the TrainRunStateManager
when we it's updated (e.g. for elastic training)
description="The key of the dataset dict specified in Ray Train Trainer." | ||
) | ||
plan_name: str = Field(description="The name of the internal dataset plan.") | ||
plan_uuid: str = Field(description="The uuid of the internal dataset plan.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- let's name the above 2 as "dataset_name" and "dataset_uuid", we don't want to expose the concept of plan.
- also, I think we can prefix train-level dataset name to the data-level dataset name. So it's easier to identify them on the data dashboard.
dataset_key = "train"
dataset._set_name(dataset_key + "_" + dataset._name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'll update it with dataset_name
and dataset_uuid
.
For setting the prefix, probably we should do it when we initialize the trainer. I'll post another PR to do this.
namespace=TRAIN_STATE_ACTOR_NAMESPACE, | ||
get_if_exists=True, | ||
lifetime="detached", | ||
resources={"node:__internal_head__": 0.001}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of forcing it on the head node, it'd be better to force it on the current node. Because we may launch a job driver on a worker node.
scheduling_strategy = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=False,
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, actually the StateActor
is tracking all Train Runs, we don't want the dashboard crash because of a worker node crash.
For example, we have a 2 train runs
- run A on node 1
- run B on node 2.
If we launch StateActor
on node 1, and node 1 died, we will not be able to track run B.
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: woshiyyya <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! I just left some nits.
python/ray/train/tests/test_state.py
Outdated
os.environ["RAY_TRAIN_ENABLE_STATE_TRACKING"] = "0" | ||
e = BackendExecutor( | ||
backend_config=TestConfig(), num_workers=4, resources_per_worker={"GPU": 1} | ||
) | ||
e.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can just create a WorkerGroup
directly instead of using BackendExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to WorkerGroup
.
I was originally trying to also test the session initialization in this test. But since we already covered that in the end-2-end test, let's go with WorkerGroup
instead.
return state_actor | ||
|
||
|
||
def get_state_actor(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_state_actor() -> Optional[ActorHandle[TrainStateActor]]:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. But seems that ActorHandle
is not subscriptable since it's not a generic class. I've changed to def get_state_actor() -> Optional[ActorHandle]:
python/ray/train/tests/test_state.py
Outdated
|
||
|
||
def test_state_manager(ray_start_gpu_cluster): | ||
os.environ["RAY_TRAIN_ENABLE_STATE_TRACKING"] = "0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We don't need this one after removing BackendExecutor
, but we should use monkeypatch.setenv(...)
fixture so that the env var is only set within the test.
Otherwise it will spill over to the next one. We should also change the next test's env var setting to use monkeypatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it! Good to see that after removing BackendExecutor
, we don't need to setenv
anymore.
Signed-off-by: yunxuanx <[email protected]>
Signed-off-by: yunxuanx <[email protected]>
Why are these changes needed?
This PR added
StateActor
to collect Train Run metadata for Train Dashboard.The main components include:
StatsActor
:TrainRunStatsManager
:BackendExecutor
here) of each train run.TrainRunInfo
,TrainWorkerInfo
,TrainDatasetInfo
):How to launch the StateActor?
We decide to launch StateActor in the driver instead of the trainable (controller). Below are the reasons behind it:
Resource Limitation
Ray Tune turns
placement_group_capture_child_tasks=True
, which limit the restricts the total resource a trainable can use.ray/python/ray/air/execution/resources/placement_group.py
Line 35 in 3606da8
In Ray Train, workers and controllers already used up all the resources specified in ScalingConfig, thus no additional actors can be launched in
TrainTrainable
. Therefore, we cannot launchTrainStateActor
in controller, since it requires"node:__head__": 0.001
.Force Cleanup
Ray Tune will force clean up all the actors and subactors launched inside the trainable. Therefore, if we launched the detached actor in the trainable, it will be purged after
trainer.fit
finished.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.