From d85f484096ec6c2f308a60708b509562c0434a7f Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Mon, 5 Feb 2024 16:55:17 -0800 Subject: [PATCH 01/10] [train] support memory per worker Signed-off-by: Matthew Deng --- python/ray/train/_internal/worker_group.py | 5 ++++- python/ray/train/tests/test_worker_group.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index c17429b4cbb2..987a2a2a3a70 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -169,11 +169,13 @@ def __init__( "`actor_cls_args` or `actor_class_kwargs` are " "passed in but no `actor_cls` is passed in." ) - + self.num_workers = num_workers self.num_cpus_per_worker = num_cpus_per_worker self.num_gpus_per_worker = num_gpus_per_worker self.additional_resources_per_worker = additional_resources_per_worker + self.memory_per_worker = self.additional_resources_per_worker.pop("memory", None) + self.workers = [] self._base_cls = create_executable_class(actor_cls) assert issubclass(self._base_cls, RayTrainWorker) @@ -188,6 +190,7 @@ def __init__( self._remote_cls = ray.remote( num_cpus=self.num_cpus_per_worker, num_gpus=self.num_gpus_per_worker, + memory=self.memory_per_worker, resources=self.additional_resources_per_worker, )(self._base_cls) self.start() diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index 5b0959c2d32b..eb988d17d1cd 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -32,6 +32,14 @@ def ray_start_2_cpus_and_neuron_core_accelerator(): ray.shutdown() +@pytest.fixture +def ray_start_2_cpus_and_10kb_memory(): + address_info = ray.init(num_cpus=2, _memory=10_000) + yield address_info + # The code after the yield will run as teardown code. + ray.shutdown() + + def test_worker_creation(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2 wg = WorkerGroup(num_workers=2) @@ -51,6 +59,9 @@ def test_worker_creation_num_cpus(ray_start_2_cpus): assert "CPU" not in ray.available_resources() wg.shutdown() +def test_worker_creation_with_memory(ray_start_2_cpus_and_10kb_memory): + wg = WorkerGroup(num_workers=2, additional_resources_per_worker={"memory": 1_000}) + assert len(wg.workers) == 2 def test_worker_shutdown(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2 From 12c35bf0d6d27a0c44fc6ce6a6f9b3aff36aa371 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Mon, 5 Feb 2024 16:56:04 -0800 Subject: [PATCH 02/10] lint Signed-off-by: Matthew Deng --- python/ray/train/_internal/worker_group.py | 6 ++++-- python/ray/train/tests/test_worker_group.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 987a2a2a3a70..00c33560ad61 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -169,12 +169,14 @@ def __init__( "`actor_cls_args` or `actor_class_kwargs` are " "passed in but no `actor_cls` is passed in." ) - + self.num_workers = num_workers self.num_cpus_per_worker = num_cpus_per_worker self.num_gpus_per_worker = num_gpus_per_worker self.additional_resources_per_worker = additional_resources_per_worker - self.memory_per_worker = self.additional_resources_per_worker.pop("memory", None) + self.memory_per_worker = self.additional_resources_per_worker.pop( + "memory", None + ) self.workers = [] self._base_cls = create_executable_class(actor_cls) diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index eb988d17d1cd..c64c4b54eb4e 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -59,10 +59,12 @@ def test_worker_creation_num_cpus(ray_start_2_cpus): assert "CPU" not in ray.available_resources() wg.shutdown() + def test_worker_creation_with_memory(ray_start_2_cpus_and_10kb_memory): wg = WorkerGroup(num_workers=2, additional_resources_per_worker={"memory": 1_000}) assert len(wg.workers) == 2 + def test_worker_shutdown(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2 wg = WorkerGroup(num_workers=2) From 88aa83850f2176306ef31ff295c242f438ebf179 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Mon, 5 Feb 2024 18:40:28 -0800 Subject: [PATCH 03/10] handle None Signed-off-by: Matthew Deng --- python/ray/train/_internal/worker_group.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 00c33560ad61..fa73ccf49fa1 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -174,9 +174,12 @@ def __init__( self.num_cpus_per_worker = num_cpus_per_worker self.num_gpus_per_worker = num_gpus_per_worker self.additional_resources_per_worker = additional_resources_per_worker - self.memory_per_worker = self.additional_resources_per_worker.pop( - "memory", None - ) + + self.memory_per_worker = None + if self.additional_resources_per_worker is not None: + self.memory_per_worker = self.additional_resources_per_worker.pop( + "memory", None + ) self.workers = [] self._base_cls = create_executable_class(actor_cls) From d8057fa35a574dcaec0eff40bf871abd1dbdc489 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Mon, 5 Feb 2024 19:34:48 -0800 Subject: [PATCH 04/10] lint Signed-off-by: Matthew Deng --- python/ray/train/_internal/worker_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index fa73ccf49fa1..81143423b130 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -174,7 +174,7 @@ def __init__( self.num_cpus_per_worker = num_cpus_per_worker self.num_gpus_per_worker = num_gpus_per_worker self.additional_resources_per_worker = additional_resources_per_worker - + self.memory_per_worker = None if self.additional_resources_per_worker is not None: self.memory_per_worker = self.additional_resources_per_worker.pop( From fddecb48bc3faedb09b09c8049e8d2d52a176f09 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Tue, 6 Feb 2024 14:57:38 -0800 Subject: [PATCH 05/10] unify interface on resources_per_worker Signed-off-by: Matthew Deng --- .../ray/train/_internal/backend_executor.py | 64 ++++++++----------- python/ray/train/_internal/worker_group.py | 42 ++++-------- python/ray/train/data_parallel_trainer.py | 6 +- rllib/core/learner/learner_group.py | 27 +++++--- 4 files changed, 59 insertions(+), 80 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index dd6add6a32a5..22971e49909a 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -73,12 +73,9 @@ class BackendExecutor: backend_config: The configurations for this specific backend. num_workers: Number of workers to use for training. - num_cpus_per_worker: Number of CPUs to use per worker. - num_gpus_per_worker: Number of GPUs to use per worker. - additional_resources_per_worker (Optional[Dict[str, float]]): - Dictionary specifying the extra resources that will be - requested for each worker in addition to ``num_cpus_per_worker`` - and ``num_gpus_per_worker``. + resources_per_worker (Optional[Dict[str, float]]): + Dictionary specifying the resources that will be + requested for each worker. Defaults to {"CPU": 1}. max_retries: Number of retries when Ray actors fail. Defaults to 3. Set to -1 for unlimited retries. """ @@ -89,17 +86,18 @@ def __init__( # TODO(xwjiang): Legacy Ray Train trainer clean up! trial_info: Optional[TrialInfo] = None, num_workers: int = 1, - num_cpus_per_worker: float = 1, - num_gpus_per_worker: float = 0, - additional_resources_per_worker: Optional[Dict[str, float]] = None, + resources_per_worker: Optional[Dict[str, float]] = None, max_retries: int = 3, ): + if resources_per_worker is None: + resources_per_worker = {"CPU": 1} + else: + resources_per_worker = resources_per_worker.copy() + self._backend_config = backend_config self._backend = backend_config.backend_cls() self._num_workers = num_workers - self._num_cpus_per_worker = num_cpus_per_worker - self._num_gpus_per_worker = num_gpus_per_worker - self._additional_resources_per_worker = additional_resources_per_worker + self._resources_per_worker = resources_per_worker self._max_failures = max_retries if self._max_failures < 0: self._max_failures = float("inf") @@ -133,9 +131,7 @@ def start( placement_group = self._placement_group or "default" self.worker_group = WorkerGroup( num_workers=self._num_workers, - num_cpus_per_worker=self._num_cpus_per_worker, - num_gpus_per_worker=self._num_gpus_per_worker, - additional_resources_per_worker=self._additional_resources_per_worker, + resources_per_worker=self._resources_per_worker, actor_cls=train_cls, actor_cls_args=train_cls_args, actor_cls_kwargs=train_cls_kwargs, @@ -175,18 +171,20 @@ def _set_driver_dataset_context(ctx: DataContext): ) ) - if self._num_gpus_per_worker > 0 and share_cuda_visible_devices_enabled: + if ( + self._resources_per_worker.get("GPU", 0) > 0 + and share_cuda_visible_devices_enabled + ): self._share_cuda_visible_devices() - elif self._additional_resources_per_worker: - for resource_config in self._resource_configs: - if self._is_share_resources_enabled( + for resource_config in self._resource_configs: + if self._is_share_resources_enabled( + resource_config.resource_name, + resource_config.resource_enable_sharing_env_var, + ): + self._share_resource_ids( resource_config.resource_name, - resource_config.resource_enable_sharing_env_var, - ): - self._share_resource_ids( - resource_config.resource_name, - resource_config.share_resource_ids_env_var, - ) + resource_config.share_resource_ids_env_var, + ) self._backend.on_start(self.worker_group, self._backend_config) except RayActorError as exc: logger.exception(str(exc)) @@ -221,15 +219,9 @@ def _create_placement_group(self): ) if should_create_placement_group: - additional_resources_per_worker = ( - self._additional_resources_per_worker or {} - ) - bundle = { - "CPU": self._num_cpus_per_worker, - "GPU": self._num_gpus_per_worker, - **additional_resources_per_worker, - } - bundles = [bundle.copy() for _ in range(self._num_workers)] + bundles = [ + self._resources_per_worker.copy() for _ in range(self._num_workers) + ] use_spread = bool(env_integer(TRAIN_ENABLE_WORKER_SPREAD_ENV, 0)) strategy = "SPREAD" if use_spread else "PACK" @@ -348,9 +340,7 @@ def _is_share_resources_enabled(self, resource_name: str, enable_sharing_env: st enable_sharing_env: The name of the environment variable to check. """ - has_resource_requested = ( - self._additional_resources_per_worker.get(resource_name, 0) > 0 - ) + has_resource_requested = self._resources_per_worker.get(resource_name, 0) > 0 return has_resource_requested and ray_constants.env_bool( enable_sharing_env, True ) diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 81143423b130..41464970732c 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -112,14 +112,9 @@ class WorkerGroup: Args: num_workers: The number of workers (Ray actors) to launch. Defaults to 1. - num_cpus_per_worker: The number of CPUs to reserve for each - worker. Fractional values are allowed. Defaults to 1. - num_gpus_per_worker: The number of GPUs to reserve for each - worker. Fractional values are allowed. Defaults to 0. - additional_resources_per_worker (Optional[Dict[str, float]]): - Dictionary specifying the extra resources that will be - requested for each worker in addition to ``num_cpus_per_worker`` - and ``num_gpus_per_worker``. + resources_per_worker (Optional[Dict[str, float]]): + Dictionary specifying the resources that will be + requested for each worker. Defaults to {"CPU": 1}. actor_cls (Optional[Type]): If specified use this class as the remote actors. remote_cls_args, remote_cls_kwargs: If ``remote_cls`` is provided, @@ -142,27 +137,23 @@ class WorkerGroup: def __init__( self, num_workers: int = 1, - num_cpus_per_worker: float = 1, - num_gpus_per_worker: float = 0, - additional_resources_per_worker: Optional[Dict[str, float]] = None, + resources_per_worker: Optional[Dict[str, float]] = None, actor_cls: Type = None, actor_cls_args: Optional[Tuple] = None, actor_cls_kwargs: Optional[Dict] = None, placement_group: Union[PlacementGroup, str] = "default", ): + if resources_per_worker is None: + resources_per_worker = {"CPU": 1} + else: + resources_per_worker = resources_per_worker.copy() + if num_workers <= 0: raise ValueError( "The provided `num_workers` must be greater " f"than 0. Received num_workers={num_workers} " f"instead." ) - if num_cpus_per_worker < 0 or num_gpus_per_worker < 0: - raise ValueError( - "The number of CPUs and GPUs per worker must " - "not be negative. Received " - f"num_cpus_per_worker={num_cpus_per_worker} and " - f"num_gpus_per_worker={num_gpus_per_worker}." - ) if (actor_cls_args or actor_cls_kwargs) and not actor_cls: raise ValueError( @@ -171,16 +162,9 @@ def __init__( ) self.num_workers = num_workers - self.num_cpus_per_worker = num_cpus_per_worker - self.num_gpus_per_worker = num_gpus_per_worker - self.additional_resources_per_worker = additional_resources_per_worker - - self.memory_per_worker = None - if self.additional_resources_per_worker is not None: - self.memory_per_worker = self.additional_resources_per_worker.pop( - "memory", None - ) - + self.num_cpus_per_worker = resources_per_worker.pop("CPU", 0) + self.num_gpus_per_worker = resources_per_worker.pop("GPU", 0) + self.memory_per_worker = resources_per_worker.pop("memory", 0) self.workers = [] self._base_cls = create_executable_class(actor_cls) assert issubclass(self._base_cls, RayTrainWorker) @@ -196,7 +180,7 @@ def __init__( num_cpus=self.num_cpus_per_worker, num_gpus=self.num_gpus_per_worker, memory=self.memory_per_worker, - resources=self.additional_resources_per_worker, + resources=resources_per_worker, )(self._base_cls) self.start() diff --git a/python/ray/train/data_parallel_trainer.py b/python/ray/train/data_parallel_trainer.py index 11223d6557a8..fe070684b757 100644 --- a/python/ray/train/data_parallel_trainer.py +++ b/python/ray/train/data_parallel_trainer.py @@ -439,8 +439,6 @@ def training_loop(self) -> None: discard_returns=True, ) - additional_resources_per_worker = scaling_config.additional_resources_per_worker - trial_info = TrialInfo( name=session.get_trial_name(), id=session.get_trial_id(), @@ -454,9 +452,7 @@ def training_loop(self) -> None: backend_config=self._backend_config, trial_info=trial_info, num_workers=scaling_config.num_workers, - num_cpus_per_worker=scaling_config.num_cpus_per_worker, - num_gpus_per_worker=scaling_config.num_gpus_per_worker, - additional_resources_per_worker=additional_resources_per_worker, + resources_per_worker=scaling_config._resources_per_worker_not_none, max_retries=0, ) diff --git a/rllib/core/learner/learner_group.py b/rllib/core/learner/learner_group.py index a812f3334963..075393afc924 100644 --- a/rllib/core/learner/learner_group.py +++ b/rllib/core/learner/learner_group.py @@ -145,19 +145,28 @@ def __init__( # N remote Learner workers. else: backend_config = _get_backend_config(learner_class) - backend_executor = BackendExecutor( - backend_config=backend_config, - num_workers=self.config.num_learner_workers, - # TODO (sven): Cannot set both `num_cpus_per_learner_worker`>1 and - # `num_gpus_per_learner_worker`>0! Users must set one or the other due - # to issues with placement group fragmentation. See - # https://github.com/ray-project/ray/issues/35409 for more details. - num_cpus_per_worker=( + + # TODO (sven): Cannot set both `num_cpus_per_learner_worker`>1 and + # `num_gpus_per_learner_worker`>0! Users must set one or the other due + # to issues with placement group fragmentation. See + # https://github.com/ray-project/ray/issues/35409 for more details. + num_cpus_per_worker = ( + ( self.config.num_cpus_per_learner_worker if not self.config.num_gpus_per_learner_worker else 0 ), - num_gpus_per_worker=self.config.num_gpus_per_learner_worker, + ) + num_gpus_per_worker = (self.config.num_gpus_per_learner_worker,) + resources_per_worker = { + "CPU": num_cpus_per_worker, + "GPU": num_gpus_per_worker, + } + + backend_executor = BackendExecutor( + backend_config=backend_config, + num_workers=self.config.num_learner_workers, + resources_per_worker=resources_per_worker, max_retries=0, ) backend_executor.start( From be7c4412c2b28702179853980f95a92d504b2cd9 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Tue, 6 Feb 2024 15:51:13 -0800 Subject: [PATCH 06/10] fix tests Signed-off-by: Matthew Deng --- python/ray/train/_internal/backend_executor.py | 5 ++--- python/ray/train/_internal/worker_group.py | 6 ++++++ python/ray/train/tests/test_backend.py | 12 +++++------- python/ray/train/tests/test_worker_group.py | 15 +++++++++------ 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/python/ray/train/_internal/backend_executor.py b/python/ray/train/_internal/backend_executor.py index 22971e49909a..cf6210c1bf4d 100644 --- a/python/ray/train/_internal/backend_executor.py +++ b/python/ray/train/_internal/backend_executor.py @@ -90,14 +90,13 @@ def __init__( max_retries: int = 3, ): if resources_per_worker is None: - resources_per_worker = {"CPU": 1} + self._resources_per_worker = {"CPU": 1} else: - resources_per_worker = resources_per_worker.copy() + self._resources_per_worker = resources_per_worker.copy() self._backend_config = backend_config self._backend = backend_config.backend_cls() self._num_workers = num_workers - self._resources_per_worker = resources_per_worker self._max_failures = max_retries if self._max_failures < 0: self._max_failures = float("inf") diff --git a/python/ray/train/_internal/worker_group.py b/python/ray/train/_internal/worker_group.py index 41464970732c..748b18dd8ebf 100644 --- a/python/ray/train/_internal/worker_group.py +++ b/python/ray/train/_internal/worker_group.py @@ -155,6 +155,12 @@ def __init__( f"instead." ) + if any(v < 0 for v in resources_per_worker.values()): + raise ValueError( + "The number of resources per worker must not be negative. " + f"Received resources_per_worker={resources_per_worker}." + ) + if (actor_cls_args or actor_cls_kwargs) and not actor_cls: raise ValueError( "`actor_cls_args` or `actor_class_kwargs` are " diff --git a/python/ray/train/tests/test_backend.py b/python/ray/train/tests/test_backend.py index 27c1c7c01f92..8d76397f0de7 100644 --- a/python/ray/train/tests/test_backend.py +++ b/python/ray/train/tests/test_backend.py @@ -323,7 +323,7 @@ def get_resources(): os.environ[ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV] = "1" e = BackendExecutor( - config, num_workers=num_workers, num_cpus_per_worker=0, num_gpus_per_worker=1 + config, num_workers=num_workers, resources_per_worker={"GPU": 1} ) e.start() _start_training(e, get_resources) @@ -369,7 +369,7 @@ def get_resources(): os.environ[ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV] = "1" e = BackendExecutor( - config, num_workers=num_workers, num_cpus_per_worker=0, num_gpus_per_worker=0.5 + config, num_workers=num_workers, resources_per_worker={"GPU": 0.5} ) e.start() _start_training(e, get_resources) @@ -408,7 +408,7 @@ def get_resources(): os.environ[ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV] = "1" e = BackendExecutor( - config, num_workers=num_workers, num_cpus_per_worker=0, num_gpus_per_worker=2 + config, num_workers=num_workers, resources_per_worker={"GPU": 2} ) e.start() _start_training(e, get_resources) @@ -443,8 +443,7 @@ def get_resources(): e = BackendExecutor( config, num_workers=num_workers, - num_cpus_per_worker=0, - additional_resources_per_worker={"neuron_cores": 1}, + resources_per_worker={"neuron_cores": 1}, ) e.start() _start_training(e, get_resources) @@ -481,8 +480,7 @@ def get_resources(): e = BackendExecutor( config, num_workers=num_workers, - num_cpus_per_worker=0, - additional_resources_per_worker={"neuron_cores": 1}, + resources_per_worker={"neuron_cores": 1}, ) e.start() _start_training(e, get_resources) diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index c64c4b54eb4e..c5f0ced15951 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -52,7 +52,7 @@ def test_worker_creation(ray_start_2_cpus): def test_worker_creation_num_cpus(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2 - wg = WorkerGroup(num_cpus_per_worker=2) + wg = WorkerGroup(resources_per_worker={"CPU": 2}) time.sleep(1) assert len(wg.workers) == 1 # Make sure both CPUs are being used by the actor. @@ -61,7 +61,7 @@ def test_worker_creation_num_cpus(ray_start_2_cpus): def test_worker_creation_with_memory(ray_start_2_cpus_and_10kb_memory): - wg = WorkerGroup(num_workers=2, additional_resources_per_worker={"memory": 1_000}) + wg = WorkerGroup(num_workers=2, resources_per_worker={"memory": 1_000}) assert len(wg.workers) == 2 @@ -92,7 +92,7 @@ def test_worker_restart(ray_start_2_cpus): def test_worker_with_gpu_ids(ray_start_2_cpus_and_gpus): num_gpus = 2 - wg = WorkerGroup(num_workers=2, num_gpus_per_worker=1) + wg = WorkerGroup(num_workers=2, resources_per_worker={"GPU": 1}) assert len(wg.workers) == 2 time.sleep(1) assert ray_constants.GPU not in ray.available_resources() @@ -111,7 +111,7 @@ def test_worker_with_neuron_core_accelerator_ids( ): num_nc = 2 wg = WorkerGroup( - num_workers=2, additional_resources_per_worker={ray_constants.NEURON_CORES: 1} + num_workers=2, resources_per_worker={ray_constants.NEURON_CORES: 1} ) assert len(wg.workers) == 2 time.sleep(1) @@ -284,10 +284,13 @@ def test_bad_resources(ray_start_2_cpus): WorkerGroup(num_workers=-1) with pytest.raises(ValueError): - WorkerGroup(num_cpus_per_worker=-1) + WorkerGroup(resources_per_worker={"CPU": -1}) with pytest.raises(ValueError): - WorkerGroup(num_gpus_per_worker=-1) + WorkerGroup(resources_per_worker={"GPU": -1}) + + with pytest.raises(ValueError): + WorkerGroup(resources_per_worker={"memory": -1}) def test_placement_group(ray_start_2_cpus): From 2199de6d65e9b835b4d99f17dba991f7496c0320 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Tue, 6 Feb 2024 15:57:37 -0800 Subject: [PATCH 07/10] fix Signed-off-by: Matthew Deng --- rllib/core/learner/learner_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/core/learner/learner_group.py b/rllib/core/learner/learner_group.py index 075393afc924..680103308f9b 100644 --- a/rllib/core/learner/learner_group.py +++ b/rllib/core/learner/learner_group.py @@ -157,7 +157,7 @@ def __init__( else 0 ), ) - num_gpus_per_worker = (self.config.num_gpus_per_learner_worker,) + num_gpus_per_worker = self.config.num_gpus_per_learner_worker resources_per_worker = { "CPU": num_cpus_per_worker, "GPU": num_gpus_per_worker, From 972dc762c90373239c15b7de115257139f6e6174 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Tue, 6 Feb 2024 16:56:31 -0800 Subject: [PATCH 08/10] fix rllib Signed-off-by: Matthew Deng --- rllib/core/learner/learner_group.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/rllib/core/learner/learner_group.py b/rllib/core/learner/learner_group.py index 680103308f9b..43415f12eab3 100644 --- a/rllib/core/learner/learner_group.py +++ b/rllib/core/learner/learner_group.py @@ -151,16 +151,14 @@ def __init__( # to issues with placement group fragmentation. See # https://github.com/ray-project/ray/issues/35409 for more details. num_cpus_per_worker = ( - ( - self.config.num_cpus_per_learner_worker - if not self.config.num_gpus_per_learner_worker - else 0 - ), + self.config.num_cpus_per_learner_worker + if not self.config.num_gpus_per_learner_worker + else 0 ) num_gpus_per_worker = self.config.num_gpus_per_learner_worker resources_per_worker = { "CPU": num_cpus_per_worker, - "GPU": num_gpus_per_worker, + "GPU": num_gpus_per_worker } backend_executor = BackendExecutor( From 259f3790e5a2d42e2020f4ab7323175934398f87 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Tue, 6 Feb 2024 16:56:40 -0800 Subject: [PATCH 09/10] fix rllib Signed-off-by: Matthew Deng --- rllib/core/learner/learner_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/core/learner/learner_group.py b/rllib/core/learner/learner_group.py index 43415f12eab3..d508cb3ace21 100644 --- a/rllib/core/learner/learner_group.py +++ b/rllib/core/learner/learner_group.py @@ -158,7 +158,7 @@ def __init__( num_gpus_per_worker = self.config.num_gpus_per_learner_worker resources_per_worker = { "CPU": num_cpus_per_worker, - "GPU": num_gpus_per_worker + "GPU": num_gpus_per_worker, } backend_executor = BackendExecutor( From d67801fa33d3889964ec08320749955d697ccbe7 Mon Sep 17 00:00:00 2001 From: Matthew Deng Date: Tue, 6 Feb 2024 17:35:50 -0800 Subject: [PATCH 10/10] update test Signed-off-by: Matthew Deng --- python/ray/train/tests/test_worker_group.py | 36 +++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/python/ray/train/tests/test_worker_group.py b/python/ray/train/tests/test_worker_group.py index c5f0ced15951..e124aff61e20 100644 --- a/python/ray/train/tests/test_worker_group.py +++ b/python/ray/train/tests/test_worker_group.py @@ -5,6 +5,7 @@ import ray import ray._private.ray_constants as ray_constants +from ray.cluster_utils import Cluster from ray.train._internal.worker_group import Worker, WorkerGroup, WorkerMetadata @@ -40,6 +41,21 @@ def ray_start_2_cpus_and_10kb_memory(): ray.shutdown() +@pytest.fixture +def ray_start_5_nodes_with_memory(): + cluster = Cluster() + for _ in range(4): + cluster.add_node(num_cpus=4, memory=500) + cluster.add_node(num_cpus=4, memory=2_000) + + ray.init(address=cluster.address) + + yield + + ray.shutdown() + cluster.shutdown() + + def test_worker_creation(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2 wg = WorkerGroup(num_workers=2) @@ -60,10 +76,26 @@ def test_worker_creation_num_cpus(ray_start_2_cpus): wg.shutdown() -def test_worker_creation_with_memory(ray_start_2_cpus_and_10kb_memory): - wg = WorkerGroup(num_workers=2, resources_per_worker={"memory": 1_000}) +def test_worker_creation_with_memory(ray_start_5_nodes_with_memory): + resources_per_worker = {"memory": 1_000} + wg = WorkerGroup(num_workers=2, resources_per_worker=resources_per_worker) assert len(wg.workers) == 2 + nodes = ray.nodes() + large_node = [node for node in nodes if node["Resources"]["memory"] == 2_000][0] + large_node_id = large_node["NodeID"] + + def validate_scheduling(): + resources = ray.get_runtime_context().get_assigned_resources() + assert resources == resources_per_worker, "Resources should include memory." + + node_id = ray.get_runtime_context().get_node_id() + assert ( + node_id == large_node_id + ), "Workers should be scheduled on the large node." + + wg.execute(validate_scheduling) + def test_worker_shutdown(ray_start_2_cpus): assert ray.available_resources()["CPU"] == 2