diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index ecd1f522657c..ee6041a24248 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -152,8 +152,10 @@ def start( # for more context. # TODO remove passing in trial_driver_ip. - trial_driver_ip = self._trial_info.driver_ip if self._trial_info else None - self.worker_group.sort_workers_by_ip_and_gpu_id(trial_driver_ip) + trial_driver_node_id = ( + self._trial_info.driver_node_id if self._trial_info else None + ) + self.worker_group.sort_workers_by_node_id_and_gpu_id(trial_driver_node_id) try: if initialization_hook: @@ -366,14 +368,14 @@ def _create_rank_world_size_mappings(self) -> List[Dict]: - node_rank_map, which maps from world rank to node rank Example: - Worker 0: 0.0.0.0 - Worker 1: 0.0.0.0 - Worker 2: 0.0.0.1 - Worker 3: 0.0.0.0 - Worker 4: 0.0.0.1 + Worker 0: node 0 + Worker 1: node 0 + Worker 2: node 1 + Worker 3: node 0 + Worker 4: node 1 - Workers 0, 1, 3 are on 0.0.0.0. - Workers 2, 4 are on 0.0.0.1. + Workers 0, 1, 3 are on node 0. + Workers 2, 4 are on node 1. Expected local_rank_map: { @@ -406,31 +408,33 @@ def _create_rank_world_size_mappings(self) -> List[Dict]: local_rank_map = {} # map from world rank to local rank local_world_size_map = {} # map from world rank to local world size node_rank_map = {} # map from world rank to node rank - node_ips = {} # map from node ip to node index + node_ids = {} # map from node id to node index node_cnt = 0 # count the number of nodes - ip_dict = defaultdict(int) # map from node ip to the number of workers on it. + node_id_dict = defaultdict( + int + ) # map from node id to the number of workers on it. for world_rank in range(len(self.worker_group)): worker = self.worker_group.workers[world_rank] - node_ip = worker.metadata.node_ip - local_rank_map[world_rank] = ip_dict[node_ip] - ip_dict[node_ip] += 1 + node_id = worker.metadata.node_id + local_rank_map[world_rank] = node_id_dict[node_id] + node_id_dict[node_id] += 1 - if node_ip not in node_ips: - node_ips[node_ip] = node_cnt + if node_id not in node_ids: + node_ids[node_id] = node_cnt node_cnt += 1 - node_rank_map[world_rank] = node_ips[node_ip] + node_rank_map[world_rank] = node_ids[node_id] for world_rank in range(len(self.worker_group)): worker = self.worker_group.workers[world_rank] - node_ip = worker.metadata.node_ip - local_world_size_map[world_rank] = ip_dict[node_ip] + node_id = worker.metadata.node_id + local_world_size_map[world_rank] = node_id_dict[node_id] workers_info = "\n".join( [ - f"- (ip={w.metadata.node_ip}, pid={w.metadata.pid}) " - f"world_rank={i}, local_rank={local_rank_map[i]}, " - f"node_rank={node_rank_map[i]}" + f"- (node_id={w.metadata.node_id}, ip={w.metadata.node_ip}, " + f"pid={w.metadata.pid}) world_rank={i}, " + f"local_rank={local_rank_map[i]}, node_rank={node_rank_map[i]}" for i, w in enumerate(self.worker_group.workers) ] ) diff --git a/python/ray/train/_internal/session.py b/python/ray/train/_internal/session.py index 8ad87c65314d..7918675a22ea 100644 --- a/python/ray/train/_internal/session.py +++ b/python/ray/train/_internal/session.py @@ -60,6 +60,7 @@ class TrialInfo: resources: Dict[str, float] logdir: str driver_ip: str + driver_node_id: str experiment_name: Optional[str] = None run_id: Optional[str] = None diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index e72e55f0fa2c..72da84e3c105 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -360,43 +360,43 @@ def add_workers(self, num_workers: int): for i in range(len(new_actors)): self.workers.append(Worker(actor=new_actors[i], metadata=metadata[i])) - def sort_workers_by_ip_and_gpu_id(self, _first_ip: Optional[str] = None): - """Reorder the workers by their node ip and the lowest GPU id. + def sort_workers_by_node_id_and_gpu_id(self, _first_node_id: Optional[str] = None): + """Reorder the workers by their node id and the lowest GPU id. This is useful for collocating workers on the same node. Example: Given workers with the following attributes: - worker_0: ip=1, gpu_ids=[1] - worker_1: ip=0, gpu_ids=[0] - worker_2: ip=1, gpu_ids=[0] - worker_3: ip=0, gpu_ids=[1] + worker_0: node_id=1, gpu_ids=[1] + worker_1: node_id=0, gpu_ids=[0] + worker_2: node_id=1, gpu_ids=[0] + worker_3: node_id=0, gpu_ids=[1] The function will perform the following steps: - 1. Group by node IP: - ip=0: worker_1, worker_3 - ip=1: worker_0, worker_2 + 1. Group by node ID: + node_id=0: worker_1, worker_3 + node_id=1: worker_0, worker_2 2. Sort each group by GPU ID: - ip=0: worker_1 (gpu_id=0), worker_3 (gpu_id=1) - ip=1: worker_2 (gpu_id=0), worker_0 (gpu_id=1) + node_id=0: worker_1 (gpu_id=0), worker_3 (gpu_id=1) + node_id=1: worker_2 (gpu_id=0), worker_0 (gpu_id=1) Resulting in the order: [worker_1, worker_3, worker_2, worker_0] Args: - _first_ip: The first IP to group by. - Set this to the node IP of the trainer coordinator to ensure that the + _first_node_id: The first ID to group by. + Set this to the node ID of the trainer coordinator to ensure that the rank 0 worker is on the same node, allowing additional resources to be specified for rank 0 workers via `ScalingConfig(trainer_resources=)`. """ - ip_to_workers = defaultdict(list) + node_id_to_workers = defaultdict(list) - if _first_ip is not None: - ip_to_workers[_first_ip] = [] + if _first_node_id is not None: + node_id_to_workers[_first_node_id] = [] for worker in self.workers: - ip_to_workers[worker.metadata.node_ip].append(worker) + node_id_to_workers[worker.metadata.node_id].append(worker) # Sort workers on the same node by the lowest GPU id # More details: https://github.com/ray-project/ray/issues/40803 @@ -413,11 +413,11 @@ def get_lowest_gpu_id(worker) -> int: except ValueError: return min(gpu_ids) - for node_ip in ip_to_workers: - ip_to_workers[node_ip].sort(key=get_lowest_gpu_id) + for node_id in node_id_to_workers: + node_id_to_workers[node_id].sort(key=get_lowest_gpu_id) sorted_workers = [] - for workers in ip_to_workers.values(): + for workers in node_id_to_workers.values(): sorted_workers.extend(workers) self.workers = sorted_workers diff --git a/python/ray/train/data_parallel_trainer.py b/python/ray/train/data_parallel_trainer.py index 0c47e4409553..a14dc47d36dd 100644 --- a/python/ray/train/data_parallel_trainer.py +++ b/python/ray/train/data_parallel_trainer.py @@ -442,6 +442,7 @@ def training_loop(self) -> None: resources=session.get_trial_resources(), logdir=session.get_trial_dir(), driver_ip=ray.util.get_node_ip_address(), + driver_node_id=ray.get_runtime_context().get_node_id(), experiment_name=session.get_experiment_name(), run_id=uuid.uuid4().hex, ) diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index 881f14b2d367..08099808142f 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -96,7 +96,7 @@ def mock_add_workers(self, num_workers): original_add_workers(self, num_workers) for i, worker in enumerate(self.workers): metadata = WorkerMetadata( - node_id=0, + node_id=str(i % 2), node_ip=str(i % 2), hostname=0, resource_ids={"GPU": ["0"]}, @@ -105,6 +105,19 @@ def mock_add_workers(self, num_workers): worker.metadata = metadata +def mock_add_workers_to_nodes_with_same_ip(self, num_workers): + original_add_workers(self, num_workers) + for i, worker in enumerate(self.workers): + metadata = WorkerMetadata( + node_id=str(i % 2), + node_ip=0, + hostname=0, + resource_ids={"GPU": ["0"]}, + pid=0, + ) + worker.metadata = metadata + + def test_start(ray_start_2_cpus): config = TestConfig() e = BackendExecutor(config, num_workers=2) @@ -165,6 +178,18 @@ def train_func(): assert set(e.finish_training()) == {0, 1} +def test_local_ranks_with_same_ip_nodes(ray_2_node_2_cpu): + config = TestConfig() + e = BackendExecutor(config, num_workers=4) + e.start() + + def train_func(): + return train.get_context().get_local_rank() + + _start_training(e, train_func) + assert list(e.finish_training()) == [0, 1, 0, 1] + + def test_local_world_size(ray_2_node_2_cpu): config = TestConfig() with patch.object(WorkerGroup, "add_workers", mock_add_workers): @@ -178,6 +203,21 @@ def train_func(): assert list(e.finish_training()) == [2, 2, 1] +def test_local_world_size_with_same_ip_nodes(ray_2_node_2_cpu): + config = TestConfig() + with patch.object( + WorkerGroup, "add_workers", mock_add_workers_to_nodes_with_same_ip + ): + e = BackendExecutor(config, num_workers=3) + e.start() + + def train_func(): + return train.get_context().get_local_world_size() + + _start_training(e, train_func) + assert list(e.finish_training()) == [2, 2, 1] + + def test_node_ranks(ray_2_node_2_cpu): config = TestConfig() with patch.object(WorkerGroup, "add_workers", mock_add_workers): @@ -191,6 +231,21 @@ def train_func(): assert list(e.finish_training()) == [0, 0, 1] +def test_node_ranks_with_same_ip_nodes(ray_2_node_2_cpu): + config = TestConfig() + with patch.object( + WorkerGroup, "add_workers", mock_add_workers_to_nodes_with_same_ip + ): + e = BackendExecutor(config, num_workers=3) + e.start() + + def train_func(): + return train.get_context().get_node_rank() + + _start_training(e, train_func) + assert list(e.finish_training()) == [0, 0, 1] + + def test_train_failure(ray_start_2_cpus): config = TestConfig() e = BackendExecutor(config, num_workers=2) diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index e124aff61e20..597f5781a515 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -180,81 +180,82 @@ def test_execute_args(ray_start_2_cpus): assert all(o == 1 for o in outputs) -def test_group_workers_by_ip(ray_start_2_cpus): - def create_worker_group(ips): +def test_group_workers_by_node_id(ray_start_2_cpus): + def create_worker_group(node_ids): wg = WorkerGroup(num_workers=2) wg.workers = [ Worker( actor=None, metadata=WorkerMetadata( - node_id="dummy", - node_ip=ip, + node_id=node_id, + node_ip="dummy", hostname="dummy", resource_ids={}, pid=0, ), ) - for ip in ips + for node_id in node_ids ] return wg wg = create_worker_group(["2", "3", "1", "4", "2", "1", "3", "3", "4", "2"]) - wg.sort_workers_by_ip_and_gpu_id() + wg.sort_workers_by_node_id_and_gpu_id() expected = ["2", "2", "2", "3", "3", "3", "1", "1", "4", "4"] - ips = [w.metadata.node_ip for w in wg.workers] - assert ips == expected, ( - "Workers should be grouped by IP " - "and follow the same original order of IPs encountered (2, 3, 1, 4)." + node_ids = [w.metadata.node_id for w in wg.workers] + assert node_ids == expected, ( + "Workers should be grouped by Node ID " + "and follow the same original order of IDs encountered (2, 3, 1, 4)." ) wg = create_worker_group(["2", "3", "1", "4", "2", "1", "3", "3", "4", "2"]) - wg.sort_workers_by_ip_and_gpu_id(_first_ip="1") + wg.sort_workers_by_node_id_and_gpu_id(_first_node_id="1") expected = ["1", "1", "2", "2", "2", "3", "3", "3", "4", "4"] - ips = [w.metadata.node_ip for w in wg.workers] + node_ids = [w.metadata.node_id for w in wg.workers] assert ( - ips == expected - ), "Workers should be grouped by IP, with the first IP being 1." + node_ids == expected + ), "Workers should be grouped by Node ID, with the first ID being 1." def test_sort_local_workers_by_gpu_id(ray_start_2_cpus): - def create_worker_group(pids, ips, gpu_ids): + def create_worker_group(pids, node_ids, gpu_ids): wg = WorkerGroup(num_workers=2) wg.workers = [ Worker( actor=None, metadata=WorkerMetadata( - node_id="dummy", - node_ip=ip, + node_id=node_id, + node_ip="dummy", hostname="dummy", resource_ids={"GPU": gpu_id.split() if gpu_id else []}, pid=pid, ), ) - for pid, ip, gpu_id in zip(pids, ips, gpu_ids) + for pid, node_id, gpu_id in zip(pids, node_ids, gpu_ids) ] return wg - def setup_and_check_worker_group(pids, ips, gpu_ids, expected_local_ranks): + def setup_and_check_worker_group(pids, node_ids, gpu_ids, expected_local_ranks): """ - Create a worker group, group workers by IP, and check local ranks assignment. + Create a worker group, group workers by Node ID, + and check local ranks assignment. Args: pids: List of unique process IDs. - ips: List of IP addresses corresponding to each PID. + node_ids: List of Node IDs corresponding to each PID. gpu_ids: List of GPU IDs or None for each PID. expected_local_ranks: Dictionary mapping PID to the expected local rank. """ - wg = create_worker_group(pids=pids, ips=ips, gpu_ids=gpu_ids) - wg.sort_workers_by_ip_and_gpu_id() + wg = create_worker_group(pids=pids, node_ids=node_ids, gpu_ids=gpu_ids) + wg.sort_workers_by_node_id_and_gpu_id() # Build local ranks according to the logics in # `BackendExecutor._create_rank_world_size_mappings()` - ip_dict = defaultdict(int) + node_id_dict = defaultdict(int) local_ranks_map = defaultdict(int) for w in wg.workers: - local_ranks_map[w.metadata.pid] = ip_dict[w.metadata.node_ip] - ip_dict[w.metadata.node_ip] += 1 + local_ranks_map[w.metadata.pid] = node_id_dict[w.metadata.node_id] + node_id_dict[w.metadata.node_id] += 1 local_ranks = [local_ranks_map[pid] for pid in pids] @@ -267,14 +268,14 @@ def setup_and_check_worker_group(pids, ips, gpu_ids, expected_local_ranks): # For workers without GPU resources, their original order will be preserved cpu_workers_config = { "pids": [0, 1, 2, 3, 4, 5, 6, 7], - "ips": ["2", "2", "1", "1", "2", "1", "1", "2"], + "node_ids": ["2", "2", "1", "1", "2", "1", "1", "2"], "gpu_ids": [None] * 8, "expected_local_ranks": [0, 1, 0, 1, 2, 2, 3, 3], } gpu_workers_single_gpu_config = { "pids": [0, 1, 2, 3, 4, 5, 6, 7], - "ips": ["2", "2", "1", "1", "2", "1", "1", "2"], + "node_ids": ["2", "2", "1", "1", "2", "1", "1", "2"], "gpu_ids": ["1", "0", "3", "2", "2", "0", "1", "3"], "expected_local_ranks": [1, 0, 3, 2, 2, 0, 1, 3], } @@ -282,7 +283,7 @@ def setup_and_check_worker_group(pids, ips, gpu_ids, expected_local_ranks): # For workers with multiple gpus, sort by their lowest gpu id gpu_workers_multiple_gpus_config = { "pids": [0, 1, 2, 3], - "ips": ["2", "1", "1", "2"], + "node_ids": ["2", "1", "1", "2"], "gpu_ids": ["1,3", "2,1", "0,3", "0,2"], "expected_local_ranks": [1, 1, 0, 0], } diff --git a/python/ray/tune/trainable/function_trainable.py b/python/ray/tune/trainable/function_trainable.py index 7c0344989d41..bde0a556d769 100644 --- a/python/ray/tune/trainable/function_trainable.py +++ b/python/ray/tune/trainable/function_trainable.py @@ -49,6 +49,7 @@ def setup(self, config): resources=self.trial_resources, logdir=self._storage.trial_driver_staging_path, driver_ip=None, + driver_node_id=None, experiment_name=self._storage.experiment_dir_name, ), storage=self._storage, @@ -178,6 +179,7 @@ def reset_config(self, new_config): resources=self.trial_resources, logdir=self._storage.trial_working_directory, driver_ip=None, + driver_node_id=None, experiment_name=self._storage.experiment_dir_name, ), storage=self._storage,