-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SGD] v2 prototype: BackendExecutor
and TorchBackend
implementation
#17357
[SGD] v2 prototype: BackendExecutor
and TorchBackend
implementation
#17357
Conversation
Co-authored-by: Richard Liaw <[email protected]>
Co-authored-by: Richard Liaw <[email protected]>
…into sgd-v2-prototyp-actor-group
…into sgd-v2-prototype-executor
"This Trainer is not active. It is either shutdown already or " | ||
"never started in the first place. Either create a new Trainer " | ||
"or start this one.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trainer
-> BackendExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user isn't aware of BackendExecutor
right?
…v2-prototype-executor
There's a lot of state that gets tossed around between Backend, BackendExecutor, TorchConfig. Can we instead reduce the places where we're keeping track of state? here's one attempt. the tldr is that diff --git a/python/ray/util/sgd/v2/backends/backend.py b/python/ray/util/sgd/v2/backends/backend.py
index 99ea5a20b..734425c0c 100644
--- a/python/ray/util/sgd/v2/backends/backend.py
+++ b/python/ray/util/sgd/v2/backends/backend.py
@@ -59,12 +59,14 @@ class BackendExecutor:
self._num_gpus_per_worker = num_gpus_per_worker
self.worker_group = DeactivatedWorkerGroup()
+ self._backend = get_backend(self._backend_config)
def start(self):
"""Starts the worker group."""
self.worker_group = WorkerGroup(self._num_workers,
self._num_cpus_per_worker,
self._num_gpus_per_worker)
+ self._backend.on_start(self.worker_group, self.backend_config)
def execute(self, train_func: Callable[[], T]) -> Iterator[Any]:
"""Executes training function on all workers and yield results.
@@ -156,6 +158,7 @@ class BackendExecutor:
def shutdown(self):
"""Shuts down the workers in the worker group."""
+ self._backend.on_shutdown(self.worker_group)
self.worker_group.shutdown()
self.worker_group = DeactivatedWorkerGroup()
diff --git a/python/ray/util/sgd/v2/backends/torch.py b/python/ray/util/sgd/v2/backends/torch.py
index 53d72e098..6170f2b94 100644
--- a/python/ray/util/sgd/v2/backends/torch.py
+++ b/python/ray/util/sgd/v2/backends/torch.py
@@ -91,57 +91,57 @@ def shutdown_torch():
torch.cuda.empty_cache()
-class TorchExecutor(BackendExecutor):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._backend_config.validate(name="torch")
-
- if self._backend_config.backend is None:
- if self._num_gpus_per_worker > 0:
- self.backend = "nccl"
- else:
- self.backend = "gloo"
-
- def start(self):
- super().start()
- if self._num_workers > 1:
-
+class TorchBackend:
+ # def __init__(self, backend_config, use_gpu=False):
+ # self._backend_config = backend_config
+
+ # # Can we actually just resolve this in config dataclass?
+ # self._backend_config.validate(name="torch")
+
+ # # Can we actually just resolve this too in config dataclass?
+ # if self._backend_config.backend is None:
+ # if use_gpu:
+ # self.backend = "nccl"
+ # else:
+ # self.backend = "gloo"
+
+ def on_start(worker_group, backend_config):
+ if len(worker_group) > 1:
def get_address():
addr = ray.util.get_node_ip_address()
port = find_free_port()
return addr, port
- master_addr, master_port = self.worker_group.execute_single(
+ master_addr, master_port = worker_group.execute_single(
0, get_address)
- if self._backend_config.init_method == "env":
+ if backend_config.init_method == "env":
def set_env_vars(addr, port):
os.environ["MASTER_ADDR"] = addr
os.environ["MASTER_PORT"] = str(port)
- self.worker_group.execute(
+ worker_group.execute(
set_env_vars, addr=master_addr, port=master_port)
url = "env://"
- elif self._backend_config == "tcp":
+ elif backend_config == "tcp":
url = f"tcp://{master_addr}:{master_port}"
else:
raise ValueError(
f"The provided init_method ("
- f"{self._backend_config.init_method} is not supported.")
+ f"{backend_config.init_method} is not supported.")
- for i in range(len(self.worker_group)):
- self.worker_group.execute_single(
+ for i in range(len(worker_group)):
+ worker_group.execute_single(
i,
setup_torch_process_group,
- backend=self.backend,
+ backend=self._backend_config.backend,
world_rank=i,
- world_size=len(self.worker_group),
+ world_size=len(worker_group),
init_method=url,
- timeout_s=self._backend_config.timeout_s)
+ timeout_s=backend_config.timeout_s)
- def shutdown(self):
- self.worker_group.execute_single(
+ def on_shutdown(worker_group):
+ worker_group.execute_single(
0, torch.distributed.destroy_process_group)
- self.worker_group.execute(shutdown_torch)
- super().shutdown()
+ worker_group.execute(shutdown_torch) |
Ok I addressed all the comments and removed all the reporting functionality. It would be great if you guys could take another look. |
@pytest.mark.parametrize("init_method", ["env", "tcp"]) | ||
def test_torch_start_shutdown(ray_start_2_cpus, init_method): | ||
torch_config = TorchConfig(backedn="gloo", init_method=init_method) | ||
e = TorchExecutor(torch_config, num_workers=2) | ||
|
||
def check_process_group(): | ||
import torch | ||
return torch.distributed.is_initialized( | ||
) and torch.distributed.get_world_size() == 2 | ||
|
||
assert all(e.run(check_process_group)) | ||
|
||
e._backend.on_shutdown(e.worker_group, e._backend_config) | ||
|
||
assert not any(e.run(check_process_group)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move tests for individual backends into their own test files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! merge when tests pass.
…v2-prototype-executor
…to sgd-v2-prototype-executor
BackendExecutor
and TorchExecutor
implementationBackendExecutor
and TorchBackend
implementation
…on (ray-project#17357) * wip * formatting * increase timeouts * wip * address comments * comments * fix * address comments * Update python/ray/util/sgd/v2/worker_group.py Co-authored-by: Richard Liaw <[email protected]> * Update python/ray/util/sgd/v2/worker_group.py Co-authored-by: Richard Liaw <[email protected]> * address comments * formatting * fix * wip * finish * fix * formatting * remove reporting * split TorchBackend * fix tests * address comments * add file * more fixes * remove default value * update run method doc * add comment * minor doc fixes * lint * add args to BaseWorker.execute * address comments * remove extra parentheses * properly instantiate backend * fix some of the tests * fix torch setup * fix type hint Co-authored-by: Richard Liaw <[email protected]> Co-authored-by: matthewdeng <[email protected]>
Why are these changes needed?
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.