Skip to content

Commit

Permalink
[Train] Fixed HorovodBackend to automatically detect network interfac…
Browse files Browse the repository at this point in the history
…es (#19533)

* Moved Horovod into package

* Move in Ludwig fix

* Undo git mv

* Cleanup

* Cleanup

* flake8

* Update python/ray/train/backends/horovod.py

Co-authored-by: matthewdeng <[email protected]>

* Whitespace

Co-authored-by: matthewdeng <[email protected]>
  • Loading branch information
tgaddair and matthewdeng committed Oct 21, 2021
1 parent 8792714 commit 702315b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
1 change: 1 addition & 0 deletions python/ray/setup-dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def do_link(package, force=False, local_path=None):
do_link("rllib", force=args.yes, local_path="../../../rllib")
do_link("tune", force=args.yes)
do_link("sgd", force=args.yes)
do_link("train", force=args.yes)
do_link("autoscaler", force=args.yes)
do_link("ray_operator", force=args.yes)
do_link("cloudpickle", force=args.yes)
Expand Down
60 changes: 58 additions & 2 deletions python/ray/train/backends/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import ray
from ray.train.backends.backend import BackendConfig, Backend
from ray.train.utils import update_env_vars
from ray.train.worker_group import WorkerGroup
from ray.train.worker_group import WorkerGroup, Worker

try:
from horovod.ray.runner import Coordinator
from horovod.ray.utils import detect_nics, nics_to_env_var
from horovod.runner.common.util import secret, timeout
except ImportError:
Coordinator = None
detect_nics = None
Expand All @@ -29,16 +30,48 @@ class HorovodConfig(BackendConfig):
nics (Optional[Set[str]): Network interfaces that can be used for
communication.
verbose (int): Horovod logging verbosity.
key (Optional[str]): Secret used for communication between workers.
ssh_port (Optional[int]): Port for SSH server running on worker nodes.
ssh_identity_file (Optional[str]): Path to the identity file to
ssh into different hosts on the cluster.
ssh_str (Optional[str]): CAUTION WHEN USING THIS. Private key
file contents. Writes the private key to ssh_identity_file.
timeout_s (int): Timeout parameter for Gloo rendezvous.
placement_group_timeout_s (int): Timeout parameter for Ray
Placement Group creation. Currently unused.
"""
nics: Optional[Set[str]] = None
verbose: int = 1
key: Optional[str] = None
ssh_port: Optional[int] = None
ssh_identity_file: Optional[str] = None
ssh_str: Optional[str] = None
timeout_s: int = 300
placement_group_timeout_s: int = 100

@property
def start_timeout(self):
return timeout.Timeout(
self.timeout_s,
message="Timed out waiting for {activity}. Please "
"check connectivity between servers. You "
"may need to increase the --start-timeout "
"parameter if you have too many servers.")

def __post_init__(self):
if Coordinator is None:
raise ValueError(
"`horovod[ray]` is not installed. "
"Please install 'horovod[ray]' to use this backend.")

if self.ssh_str and not os.path.exists(self.ssh_identity_file):
with open(self.ssh_identity_file, "w") as f:
os.chmod(self.ssh_identity_file, 0o600)
f.write(self.ssh_str)

if self.key is None:
self.key = secret.make_secret_key()

@property
def backend_cls(self):
return HorovodBackend
Expand All @@ -51,6 +84,28 @@ def init_env_vars(world_rank: int, world_size: int, node_id: str):
os.environ["HOROVOD_SIZE"] = str(world_size)


# TODO(tgaddair): temporary workaround for Horovod's worker discovery logic,
# which requires passing in an extra parameter as part of the RayExecutor
# API. This will be removed in the future as we migrate more of the
# RayExecutor utils into Ray Train.
# See: https://github.com/horovod/horovod/blob/v0.23.0/horovod/ray/driver_service.py#L9 # noqa: E501
@dataclass
class HorovodWorkerWrapper:
w: Worker

@property
def execute(self):
w = self.w

class ExecuteHandle:
def remote(self, func, *args, **kwargs):
_ = None
return w.actor._BaseWorkerMixin__execute.remote(
func, _, *args, **kwargs)

return ExecuteHandle()


class HorovodBackend(Backend):
share_cuda_visible_devices: bool = True

Expand Down Expand Up @@ -90,11 +145,12 @@ def on_start(self, worker_group: WorkerGroup,
ray.get(setup_futures)

coordinator_envs = self.coordinator.establish_rendezvous()
node_workers = [HorovodWorkerWrapper(w) for w in worker_group.workers]

nics = detect_nics(
backend_config,
all_host_names=list(self.coordinator.hostnames),
node_workers=worker_group.workers)
node_workers=node_workers)
coordinator_envs.update(nics_to_env_var(nics))

worker_group.execute(update_env_vars, coordinator_envs)

0 comments on commit 702315b

Please sign in to comment.