Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[train] Sort workers by node ID rather than by node IP #46163

Merged
merged 7 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions python/ray/train/_internal/backend_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ def start(
# for more context.
# TODO remove passing in trial_driver_ip.

trial_driver_ip = self._trial_info.driver_ip if self._trial_info else None
self.worker_group.sort_workers_by_ip_and_gpu_id(trial_driver_ip)
trial_driver_node_id = (
self._trial_info.driver_node_id if self._trial_info else None
)
self.worker_group.sort_workers_by_node_id_and_gpu_id(trial_driver_node_id)

try:
if initialization_hook:
Expand Down Expand Up @@ -366,14 +368,14 @@ def _create_rank_world_size_mappings(self) -> List[Dict]:
- node_rank_map, which maps from world rank to node rank

Example:
Worker 0: 0.0.0.0
Worker 1: 0.0.0.0
Worker 2: 0.0.0.1
Worker 3: 0.0.0.0
Worker 4: 0.0.0.1
Worker 0: node 0
Worker 1: node 0
Worker 2: node 1
Worker 3: node 0
Worker 4: node 1

Workers 0, 1, 3 are on 0.0.0.0.
Workers 2, 4 are on 0.0.0.1.
Workers 0, 1, 3 are on node 0.
Workers 2, 4 are on node 1.

Expected local_rank_map:
{
Expand Down Expand Up @@ -406,31 +408,33 @@ def _create_rank_world_size_mappings(self) -> List[Dict]:
local_rank_map = {} # map from world rank to local rank
local_world_size_map = {} # map from world rank to local world size
node_rank_map = {} # map from world rank to node rank
node_ips = {} # map from node ip to node index
node_ids = {} # map from node id to node index
node_cnt = 0 # count the number of nodes

ip_dict = defaultdict(int) # map from node ip to the number of workers on it.
node_id_dict = defaultdict(
int
) # map from node id to the number of workers on it.
for world_rank in range(len(self.worker_group)):
worker = self.worker_group.workers[world_rank]
node_ip = worker.metadata.node_ip
local_rank_map[world_rank] = ip_dict[node_ip]
ip_dict[node_ip] += 1
node_id = worker.metadata.node_id
local_rank_map[world_rank] = node_id_dict[node_id]
node_id_dict[node_id] += 1

if node_ip not in node_ips:
node_ips[node_ip] = node_cnt
if node_id not in node_ids:
node_ids[node_id] = node_cnt
node_cnt += 1
node_rank_map[world_rank] = node_ips[node_ip]
node_rank_map[world_rank] = node_ids[node_id]

for world_rank in range(len(self.worker_group)):
worker = self.worker_group.workers[world_rank]
node_ip = worker.metadata.node_ip
local_world_size_map[world_rank] = ip_dict[node_ip]
node_id = worker.metadata.node_id
local_world_size_map[world_rank] = node_id_dict[node_id]

workers_info = "\n".join(
[
f"- (ip={w.metadata.node_ip}, pid={w.metadata.pid}) "
f"world_rank={i}, local_rank={local_rank_map[i]}, "
f"node_rank={node_rank_map[i]}"
f"- (node_id={w.metadata.node_id}, ip={w.metadata.node_ip}, "
f"pid={w.metadata.pid}) world_rank={i}, "
f"local_rank={local_rank_map[i]}, node_rank={node_rank_map[i]}"
for i, w in enumerate(self.worker_group.workers)
]
)
Expand Down
1 change: 1 addition & 0 deletions python/ray/train/_internal/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TrialInfo:
resources: Dict[str, float]
logdir: str
driver_ip: str
driver_node_id: str
experiment_name: Optional[str] = None
run_id: Optional[str] = None

Expand Down
40 changes: 20 additions & 20 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,43 +360,43 @@ def add_workers(self, num_workers: int):
for i in range(len(new_actors)):
self.workers.append(Worker(actor=new_actors[i], metadata=metadata[i]))

def sort_workers_by_ip_and_gpu_id(self, _first_ip: Optional[str] = None):
"""Reorder the workers by their node ip and the lowest GPU id.
def sort_workers_by_node_id_and_gpu_id(self, _first_node_id: Optional[str] = None):
"""Reorder the workers by their node id and the lowest GPU id.

This is useful for collocating workers on the same node.

Example:
Given workers with the following attributes:
worker_0: ip=1, gpu_ids=[1]
worker_1: ip=0, gpu_ids=[0]
worker_2: ip=1, gpu_ids=[0]
worker_3: ip=0, gpu_ids=[1]
worker_0: node_id=1, gpu_ids=[1]
worker_1: node_id=0, gpu_ids=[0]
worker_2: node_id=1, gpu_ids=[0]
worker_3: node_id=0, gpu_ids=[1]

The function will perform the following steps:
1. Group by node IP:
ip=0: worker_1, worker_3
ip=1: worker_0, worker_2
1. Group by node ID:
node_id=0: worker_1, worker_3
node_id=1: worker_0, worker_2

2. Sort each group by GPU ID:
ip=0: worker_1 (gpu_id=0), worker_3 (gpu_id=1)
ip=1: worker_2 (gpu_id=0), worker_0 (gpu_id=1)
node_id=0: worker_1 (gpu_id=0), worker_3 (gpu_id=1)
node_id=1: worker_2 (gpu_id=0), worker_0 (gpu_id=1)

Resulting in the order: [worker_1, worker_3, worker_2, worker_0]

Args:
_first_ip: The first IP to group by.
Set this to the node IP of the trainer coordinator to ensure that the
_first_node_id: The first ID to group by.
Set this to the node ID of the trainer coordinator to ensure that the
rank 0 worker is on the same node, allowing additional resources to
be specified for rank 0 workers via
`ScalingConfig(trainer_resources=)`.
"""
ip_to_workers = defaultdict(list)
node_id_to_workers = defaultdict(list)

if _first_ip is not None:
ip_to_workers[_first_ip] = []
if _first_node_id is not None:
node_id_to_workers[_first_node_id] = []

for worker in self.workers:
ip_to_workers[worker.metadata.node_ip].append(worker)
node_id_to_workers[worker.metadata.node_id].append(worker)

# Sort workers on the same node by the lowest GPU id
# More details: https://github.com/ray-project/ray/issues/40803
Expand All @@ -413,11 +413,11 @@ def get_lowest_gpu_id(worker) -> int:
except ValueError:
return min(gpu_ids)

for node_ip in ip_to_workers:
ip_to_workers[node_ip].sort(key=get_lowest_gpu_id)
for node_id in node_id_to_workers:
node_id_to_workers[node_id].sort(key=get_lowest_gpu_id)

sorted_workers = []
for workers in ip_to_workers.values():
for workers in node_id_to_workers.values():
sorted_workers.extend(workers)

self.workers = sorted_workers
Expand Down
1 change: 1 addition & 0 deletions python/ray/train/data_parallel_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def training_loop(self) -> None:
resources=session.get_trial_resources(),
logdir=session.get_trial_dir(),
driver_ip=ray.util.get_node_ip_address(),
driver_node_id=ray.get_runtime_context().get_node_id(),
experiment_name=session.get_experiment_name(),
run_id=uuid.uuid4().hex,
)
Expand Down
57 changes: 56 additions & 1 deletion python/ray/train/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def mock_add_workers(self, num_workers):
original_add_workers(self, num_workers)
for i, worker in enumerate(self.workers):
metadata = WorkerMetadata(
node_id=0,
node_id=str(i % 2),
node_ip=str(i % 2),
hostname=0,
resource_ids={"GPU": ["0"]},
Expand All @@ -105,6 +105,19 @@ def mock_add_workers(self, num_workers):
worker.metadata = metadata


def mock_add_workers_to_nodes_with_same_ip(self, num_workers):
original_add_workers(self, num_workers)
for i, worker in enumerate(self.workers):
metadata = WorkerMetadata(
node_id=str(i % 2),
node_ip=0,
hostname=0,
resource_ids={"GPU": ["0"]},
pid=0,
)
worker.metadata = metadata


def test_start(ray_start_2_cpus):
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
Expand Down Expand Up @@ -165,6 +178,18 @@ def train_func():
assert set(e.finish_training()) == {0, 1}


def test_local_ranks_with_same_ip_nodes(ray_2_node_2_cpu):
config = TestConfig()
e = BackendExecutor(config, num_workers=4)
e.start()

def train_func():
return train.get_context().get_local_rank()

_start_training(e, train_func)
assert list(e.finish_training()) == [0, 1, 0, 1]


def test_local_world_size(ray_2_node_2_cpu):
config = TestConfig()
with patch.object(WorkerGroup, "add_workers", mock_add_workers):
Expand All @@ -178,6 +203,21 @@ def train_func():
assert list(e.finish_training()) == [2, 2, 1]


def test_local_world_size_with_same_ip_nodes(ray_2_node_2_cpu):
config = TestConfig()
with patch.object(
WorkerGroup, "add_workers", mock_add_workers_to_nodes_with_same_ip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 virtual nodes will have the same node_ip. Can we also have a test without patching the add_worker method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

):
e = BackendExecutor(config, num_workers=3)
e.start()

def train_func():
return train.get_context().get_local_world_size()

_start_training(e, train_func)
assert list(e.finish_training()) == [2, 2, 1]


def test_node_ranks(ray_2_node_2_cpu):
config = TestConfig()
with patch.object(WorkerGroup, "add_workers", mock_add_workers):
Expand All @@ -191,6 +231,21 @@ def train_func():
assert list(e.finish_training()) == [0, 0, 1]


def test_node_ranks_with_same_ip_nodes(ray_2_node_2_cpu):
config = TestConfig()
with patch.object(
WorkerGroup, "add_workers", mock_add_workers_to_nodes_with_same_ip
):
e = BackendExecutor(config, num_workers=3)
e.start()

def train_func():
return train.get_context().get_node_rank()

_start_training(e, train_func)
assert list(e.finish_training()) == [0, 0, 1]


def test_train_failure(ray_start_2_cpus):
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
Expand Down
Loading