From d9773a198462c488eacc6df1ef87a35723e06ab7 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Fri, 21 Oct 2022 14:18:47 -0700 Subject: [PATCH 1/6] remove Signed-off-by: Amog Kamsetty --- python/ray/train/torch/config.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/python/ray/train/torch/config.py b/python/ray/train/torch/config.py index 346de18d5815..267c71f01800 100644 --- a/python/ray/train/torch/config.py +++ b/python/ray/train/torch/config.py @@ -95,12 +95,17 @@ def _setup_torch_process_group( ) logger.debug(f"using {backend}") - if backend == "nccl" and "NCCL_BLOCKING_WAIT" not in os.environ: + # See the `timeout` arg in https://pytorch.org/docs/master/ + # distributed.html#torch.distributed.init_process_group for description of + # NCCL_ASYNC_ERROR_HANDLING. We do not use NCCL_BLOCKING_WAIT due to performance + # overhead. + if backend == "nccl" and "NCCL_ASYNC_ERROR_HANDLING" not in os.environ: logger.debug( - "Setting NCCL_BLOCKING_WAIT for detecting node failure. " - "To override this behavior, you can set NCCL_BLOCKING_WAIT=0." + "Setting NCCL_ASYNC_ERROR_HANDLING to fail if NCCL collective " + "communication operations are timing out. " + "To override this behavior, you can set NCCL_ASYNC_ERROR_HANDLING=0." ) - os.environ["NCCL_BLOCKING_WAIT"] = "1" + os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1" dist.init_process_group( backend=backend, From c1cf70b844d525d0a649e64f74a8479d333636c9 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Fri, 21 Oct 2022 14:52:31 -0700 Subject: [PATCH 2/6] update Signed-off-by: Amog Kamsetty --- python/ray/train/torch/config.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/train/torch/config.py b/python/ray/train/torch/config.py index 267c71f01800..6b540927e02a 100644 --- a/python/ray/train/torch/config.py +++ b/python/ray/train/torch/config.py @@ -99,7 +99,11 @@ def _setup_torch_process_group( # distributed.html#torch.distributed.init_process_group for description of # NCCL_ASYNC_ERROR_HANDLING. We do not use NCCL_BLOCKING_WAIT due to performance # overhead. - if backend == "nccl" and "NCCL_ASYNC_ERROR_HANDLING" not in os.environ: + if ( + backend == "nccl" + and "NCCL_ASYNC_ERROR_HANDLING" not in os.environ + and "NCCL_BLOCKING_WAIT" not in os.environ + ): logger.debug( "Setting NCCL_ASYNC_ERROR_HANDLING to fail if NCCL collective " "communication operations are timing out. " From 8eccb641c04952507371aa0d777e6762eb6c3be4 Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 15 Nov 2022 16:13:44 -0800 Subject: [PATCH 3/6] add test Signed-off-by: amogkam --- python/ray/train/tests/test_gpu.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index 1b6ec2c699d3..7462f0aeab30 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -307,6 +307,32 @@ def assert_env_var_set(): worker_group.execute(assert_env_var_set) +def test_torch_fail_on_nccl_timeout(ray_start_4_cpus_2_gpus): + """Tests that TorchTrainer raises exception on NCCL timeouts.""" + + def train_fn(): + model = torch.nn.Linear(1, 1) + model = train.torch.prepare_model(model) + + # Rank 0 worker will never reach the collective operation. + # NCCL should timeout. + if session.get_world_rank() == 0: + while True: + pass + + torch.distributed.barrier() + + trainer = TorchTrainer( + train_fn, + scaling_config=ScalingConfig(num_workers=2, use_gpu=True), + torch_config=TorchConfig(timeout_s=5), + ) + + # Training should fail and not hang. + with pytest.raises(RuntimeError): + trainer.fit() + + if __name__ == "__main__": import sys From 9b2d047a1a1fd5f3b1ea2c7c0811099f8a9a14b8 Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 15 Nov 2022 16:36:09 -0800 Subject: [PATCH 4/6] time sleep Signed-off-by: amogkam --- python/ray/train/tests/test_gpu.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index 7462f0aeab30..d87407c0175b 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -1,5 +1,6 @@ import os from collections import Counter +import time from unittest.mock import patch import pytest @@ -318,7 +319,7 @@ def train_fn(): # NCCL should timeout. if session.get_world_rank() == 0: while True: - pass + time.sleep(100) torch.distributed.barrier() From d59c0e6c3f52cf17c7cf52e21ebd89af48c975b0 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 15 Nov 2022 18:02:55 -0800 Subject: [PATCH 5/6] update exceptio type --- python/ray/train/tests/test_gpu.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index d87407c0175b..d74c58208465 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -10,6 +10,7 @@ from torch.utils.data import DataLoader, DistributedSampler import ray +from ray.exceptions import RayActorError from ray.air import session from ray import tune @@ -330,7 +331,7 @@ def train_fn(): ) # Training should fail and not hang. - with pytest.raises(RuntimeError): + with pytest.raises(RayActorError): trainer.fit() From 2e46de2e9d040d9dfe37463baa6de2f2f44f696c Mon Sep 17 00:00:00 2001 From: amogkam Date: Wed, 16 Nov 2022 14:02:41 -0800 Subject: [PATCH 6/6] update Signed-off-by: amogkam --- python/ray/train/tests/test_gpu.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/train/tests/test_gpu.py b/python/ray/train/tests/test_gpu.py index d74c58208465..9d096cf66a3e 100644 --- a/python/ray/train/tests/test_gpu.py +++ b/python/ray/train/tests/test_gpu.py @@ -10,7 +10,7 @@ from torch.utils.data import DataLoader, DistributedSampler import ray -from ray.exceptions import RayActorError +from ray.exceptions import RayTaskError from ray.air import session from ray import tune @@ -331,7 +331,7 @@ def train_fn(): ) # Training should fail and not hang. - with pytest.raises(RayActorError): + with pytest.raises(RayTaskError): trainer.fit()