diff --git a/python/docs/source/reference/pyspark.ml.rst b/python/docs/source/reference/pyspark.ml.rst index 6c80751468745..965cbe7eb5a57 100644 --- a/python/docs/source/reference/pyspark.ml.rst +++ b/python/docs/source/reference/pyspark.ml.rst @@ -352,6 +352,13 @@ Distributor TorchDistributor +.. currentmodule:: pyspark.ml.deepspeed.deepspeed_distributor +.. autosummary:: + :template: autosummary/class_with_docs.rst + :toctree: api/ + + DeepspeedTorchDistributor + Utilities --------- diff --git a/python/pyspark/ml/deepspeed/deepspeed_distributor.py b/python/pyspark/ml/deepspeed/deepspeed_distributor.py index 46c430824c696..df1aac21e1f89 100644 --- a/python/pyspark/ml/deepspeed/deepspeed_distributor.py +++ b/python/pyspark/ml/deepspeed/deepspeed_distributor.py @@ -52,16 +52,12 @@ def __init__( ---------- num_gpus: int The number of GPUs to use per node (analagous to num_gpus in deepspeed command). - nnodes: int The number of nodes that should be used for the run. - local_mode: bool Whether or not to run the training in a distributed fashion or just locally. - use_gpu: bool Boolean flag to determine whether to utilize gpus. - deepspeed_config: Union[Dict[str,Any], str] or None: The configuration file to be used for launching the deepspeed application. If it's a dictionary containing the parameters, then we will create the file. @@ -111,7 +107,7 @@ def _create_torchrun_command( f"--nproc_per_node={processes_per_node}", train_path, *args_string, - "-deepspeed", + "--deepspeed", ] # Don't have the deepspeed_config argument if no path is provided or no parameters set diff --git a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py index 149062f0b3ce5..4c4606699a37e 100644 --- a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py +++ b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py @@ -31,137 +31,137 @@ def _get_env_var(self, var_name: str, default_value: Any) -> Any: return default_value def _get_env_variables_distributed(self) -> Tuple[Any, Any, Any]: - MASTER_ADDR = self._get_env_var("MASTER_ADDR", "127.0.0.1") - MASTER_PORT = self._get_env_var("MASTER_PORT", 2000) - RANK = self._get_env_var("RANK", 0) - return MASTER_ADDR, MASTER_PORT, RANK + master_addr = self._get_env_var("MASTER_ADDR", "127.0.0.1") + master_port = self._get_env_var("MASTER_PORT", 2000) + rank = self._get_env_var("RANK", 0) + return master_addr, master_port, rank def test_get_torchrun_args_local(self) -> None: number_of_processes = 5 - EXPECTED_TORCHRUN_ARGS_LOCAL = ["--standalone", "--nnodes=1"] - EXPECTED_PROCESSES_PER_NODE_LOCAL = number_of_processes + expected_torchrun_args_local = ["--standalone", "--nnodes=1"] + expected_processes_per_node_local = number_of_processes ( get_local_mode_torchrun_args, process_per_node, ) = DeepspeedTorchDistributor._get_torchrun_args(True, number_of_processes) - self.assertEqual(get_local_mode_torchrun_args, EXPECTED_TORCHRUN_ARGS_LOCAL) - self.assertEqual(EXPECTED_PROCESSES_PER_NODE_LOCAL, process_per_node) + self.assertEqual(get_local_mode_torchrun_args, expected_torchrun_args_local) + self.assertEqual(expected_processes_per_node_local, process_per_node) def test_get_torchrun_args_distributed(self) -> None: number_of_processes = 5 - MASTER_ADDR, MASTER_PORT, RANK = self._get_env_variables_distributed() - EXPECTED_TORCHRUN_ARGS_DISTRIBUTED = [ + master_addr, master_port, rank = self._get_env_variables_distributed() + expected_torchrun_args_distributed = [ f"--nnodes={number_of_processes}", - f"--node_rank={RANK}", - f"--rdzv_endpoint={MASTER_ADDR}:{MASTER_PORT}", + f"--node_rank={rank}", + f"--rdzv_endpoint={master_addr}:{master_port}", "--rdzv_id=0", ] torchrun_args_distributed, process_per_node = DeepspeedTorchDistributor._get_torchrun_args( False, number_of_processes ) - self.assertEqual(torchrun_args_distributed, EXPECTED_TORCHRUN_ARGS_DISTRIBUTED) + self.assertEqual(torchrun_args_distributed, expected_torchrun_args_distributed) self.assertEqual(process_per_node, 1) def test_create_torchrun_command_local(self) -> None: - DEEPSPEED_CONF = "path/to/deepspeed" - TRAIN_FILE_PATH = "path/to/exec" - NUM_PROCS = 10 + deepspeed_conf = "path/to/deepspeed" + train_file_path = "path/to/exec" + num_procs = 10 input_params: Dict[str, Any] = {} input_params["local_mode"] = True - input_params["num_processes"] = NUM_PROCS - input_params["deepspeed_config"] = DEEPSPEED_CONF + input_params["num_processes"] = num_procs + input_params["deepspeed_config"] = deepspeed_conf torchrun_local_args_expected = ["--standalone", "--nnodes=1"] with self.subTest(msg="Testing local training with no extra args"): - LOCAL_CMD_NO_ARGS_EXPECTED = [ + local_cmd_no_args_expected = [ sys.executable, "-m", "torch.distributed.run", *torchrun_local_args_expected, - f"--nproc_per_node={NUM_PROCS}", - TRAIN_FILE_PATH, - "-deepspeed", + f"--nproc_per_node={num_procs}", + train_file_path, + "--deepspeed", "--deepspeed_config", - DEEPSPEED_CONF, + deepspeed_conf, ] local_cmd = DeepspeedTorchDistributor._create_torchrun_command( - input_params, TRAIN_FILE_PATH + input_params, train_file_path ) - self.assertEqual(local_cmd, LOCAL_CMD_NO_ARGS_EXPECTED) + self.assertEqual(local_cmd, local_cmd_no_args_expected) with self.subTest(msg="Testing local training with extra args for the training script"): local_mode_version_args = ["--arg1", "--arg2"] - LOCAL_CMD_ARGS_EXPECTED = [ + local_cmd_args_expected = [ sys.executable, "-m", "torch.distributed.run", *torchrun_local_args_expected, - f"--nproc_per_node={NUM_PROCS}", - TRAIN_FILE_PATH, + f"--nproc_per_node={num_procs}", + train_file_path, *local_mode_version_args, - "-deepspeed", + "--deepspeed", "--deepspeed_config", - DEEPSPEED_CONF, + deepspeed_conf, ] local_cmd_with_args = DeepspeedTorchDistributor._create_torchrun_command( - input_params, TRAIN_FILE_PATH, *local_mode_version_args + input_params, train_file_path, *local_mode_version_args ) - self.assertEqual(local_cmd_with_args, LOCAL_CMD_ARGS_EXPECTED) + self.assertEqual(local_cmd_with_args, local_cmd_args_expected) def test_create_torchrun_command_distributed(self) -> None: - DEEPSPEED_CONF = "path/to/deepspeed" - TRAIN_FILE_PATH = "path/to/exec" - NUM_PROCS = 10 + deepspeed_conf = "path/to/deepspeed" + train_file_path = "path/to/exec" + num_procs = 10 input_params: Dict[str, Any] = {} input_params["local_mode"] = True - input_params["num_processes"] = NUM_PROCS - input_params["deepspeed_config"] = DEEPSPEED_CONF + input_params["num_processes"] = num_procs + input_params["deepspeed_config"] = deepspeed_conf ( distributed_master_address, distributed_master_port, distributed_rank, ) = self._get_env_variables_distributed() distributed_torchrun_args = [ - f"--nnodes={NUM_PROCS}", + f"--nnodes={num_procs}", f"--node_rank={distributed_rank}", f"--rdzv_endpoint={distributed_master_address}:{distributed_master_port}", "--rdzv_id=0", ] with self.subTest(msg="Distributed training command verification with no extra args"): - DISTRIBUTED_CMD_NO_ARGS_EXPECTED = [ + distributed_cmd_no_args_expected = [ sys.executable, "-m", "torch.distributed.run", *distributed_torchrun_args, "--nproc_per_node=1", - TRAIN_FILE_PATH, - "-deepspeed", + train_file_path, + "--deepspeed", "--deepspeed_config", - DEEPSPEED_CONF, + deepspeed_conf, ] input_params["local_mode"] = False distributed_command = DeepspeedTorchDistributor._create_torchrun_command( - input_params, TRAIN_FILE_PATH + input_params, train_file_path ) - self.assertEqual(DISTRIBUTED_CMD_NO_ARGS_EXPECTED, distributed_command) + self.assertEqual(distributed_cmd_no_args_expected, distributed_command) with self.subTest(msg="Distributed training command verification with extra arguments"): distributed_extra_args = ["-args1", "--args2"] - DISTRIBUTED_CMD_ARGS_EXPECTED = [ + distributed_cmd_args_expected = [ sys.executable, "-m", "torch.distributed.run", *distributed_torchrun_args, "--nproc_per_node=1", - TRAIN_FILE_PATH, + train_file_path, *distributed_extra_args, - "-deepspeed", + "--deepspeed", "--deepspeed_config", - DEEPSPEED_CONF, + deepspeed_conf, ] distributed_command_with_args = DeepspeedTorchDistributor._create_torchrun_command( - input_params, TRAIN_FILE_PATH, *distributed_extra_args + input_params, train_file_path, *distributed_extra_args ) - self.assertEqual(DISTRIBUTED_CMD_ARGS_EXPECTED, distributed_command_with_args) + self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args) if __name__ == "__main__":