Skip to content

Commit

Permalink
[SPARK-44264][PYTHON][ML][FOLLOW-UP] Clean Up Deepspeed Code
Browse files Browse the repository at this point in the history
Follow up on HyukjinKwon comments on [this pr](apache#41770).
### Checklist
- [x] Add docs on Deepspeed Distributor to the pyspark.ml.rst
- [x] Make all variable names lowercase in test_deepspeed_distributor per PEP8
- [ ] Make docstrings adhere to numpy standard
- [ ] Add an example of how to use deepspeed distributor in the examples section(same as TorchDistributor basically)
- [ ]

Closes apache#41968 from mathewjacob1002/deepspeed_fine_touches.

Authored-by: Mathew Jacob <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
mathewjacob1002 authored and HyukjinKwon committed Jul 16, 2023
1 parent a00a32a commit 001e93d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 55 deletions.
7 changes: 7 additions & 0 deletions python/docs/source/reference/pyspark.ml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ Distributor

TorchDistributor

.. currentmodule:: pyspark.ml.deepspeed.deepspeed_distributor
.. autosummary::
:template: autosummary/class_with_docs.rst
:toctree: api/

DeepspeedTorchDistributor


Utilities
---------
Expand Down
6 changes: 1 addition & 5 deletions python/pyspark/ml/deepspeed/deepspeed_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ def __init__(
----------
num_gpus: int
The number of GPUs to use per node (analagous to num_gpus in deepspeed command).
nnodes: int
The number of nodes that should be used for the run.
local_mode: bool
Whether or not to run the training in a distributed fashion or just locally.
use_gpu: bool
Boolean flag to determine whether to utilize gpus.
deepspeed_config: Union[Dict[str,Any], str] or None:
The configuration file to be used for launching the deepspeed application.
If it's a dictionary containing the parameters, then we will create the file.
Expand Down Expand Up @@ -111,7 +107,7 @@ def _create_torchrun_command(
f"--nproc_per_node={processes_per_node}",
train_path,
*args_string,
"-deepspeed",
"--deepspeed",
]

# Don't have the deepspeed_config argument if no path is provided or no parameters set
Expand Down
100 changes: 50 additions & 50 deletions python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,137 +31,137 @@ def _get_env_var(self, var_name: str, default_value: Any) -> Any:
return default_value

def _get_env_variables_distributed(self) -> Tuple[Any, Any, Any]:
MASTER_ADDR = self._get_env_var("MASTER_ADDR", "127.0.0.1")
MASTER_PORT = self._get_env_var("MASTER_PORT", 2000)
RANK = self._get_env_var("RANK", 0)
return MASTER_ADDR, MASTER_PORT, RANK
master_addr = self._get_env_var("MASTER_ADDR", "127.0.0.1")
master_port = self._get_env_var("MASTER_PORT", 2000)
rank = self._get_env_var("RANK", 0)
return master_addr, master_port, rank

def test_get_torchrun_args_local(self) -> None:
number_of_processes = 5
EXPECTED_TORCHRUN_ARGS_LOCAL = ["--standalone", "--nnodes=1"]
EXPECTED_PROCESSES_PER_NODE_LOCAL = number_of_processes
expected_torchrun_args_local = ["--standalone", "--nnodes=1"]
expected_processes_per_node_local = number_of_processes
(
get_local_mode_torchrun_args,
process_per_node,
) = DeepspeedTorchDistributor._get_torchrun_args(True, number_of_processes)
self.assertEqual(get_local_mode_torchrun_args, EXPECTED_TORCHRUN_ARGS_LOCAL)
self.assertEqual(EXPECTED_PROCESSES_PER_NODE_LOCAL, process_per_node)
self.assertEqual(get_local_mode_torchrun_args, expected_torchrun_args_local)
self.assertEqual(expected_processes_per_node_local, process_per_node)

def test_get_torchrun_args_distributed(self) -> None:
number_of_processes = 5
MASTER_ADDR, MASTER_PORT, RANK = self._get_env_variables_distributed()
EXPECTED_TORCHRUN_ARGS_DISTRIBUTED = [
master_addr, master_port, rank = self._get_env_variables_distributed()
expected_torchrun_args_distributed = [
f"--nnodes={number_of_processes}",
f"--node_rank={RANK}",
f"--rdzv_endpoint={MASTER_ADDR}:{MASTER_PORT}",
f"--node_rank={rank}",
f"--rdzv_endpoint={master_addr}:{master_port}",
"--rdzv_id=0",
]
torchrun_args_distributed, process_per_node = DeepspeedTorchDistributor._get_torchrun_args(
False, number_of_processes
)
self.assertEqual(torchrun_args_distributed, EXPECTED_TORCHRUN_ARGS_DISTRIBUTED)
self.assertEqual(torchrun_args_distributed, expected_torchrun_args_distributed)
self.assertEqual(process_per_node, 1)

def test_create_torchrun_command_local(self) -> None:
DEEPSPEED_CONF = "path/to/deepspeed"
TRAIN_FILE_PATH = "path/to/exec"
NUM_PROCS = 10
deepspeed_conf = "path/to/deepspeed"
train_file_path = "path/to/exec"
num_procs = 10
input_params: Dict[str, Any] = {}
input_params["local_mode"] = True
input_params["num_processes"] = NUM_PROCS
input_params["deepspeed_config"] = DEEPSPEED_CONF
input_params["num_processes"] = num_procs
input_params["deepspeed_config"] = deepspeed_conf

torchrun_local_args_expected = ["--standalone", "--nnodes=1"]
with self.subTest(msg="Testing local training with no extra args"):
LOCAL_CMD_NO_ARGS_EXPECTED = [
local_cmd_no_args_expected = [
sys.executable,
"-m",
"torch.distributed.run",
*torchrun_local_args_expected,
f"--nproc_per_node={NUM_PROCS}",
TRAIN_FILE_PATH,
"-deepspeed",
f"--nproc_per_node={num_procs}",
train_file_path,
"--deepspeed",
"--deepspeed_config",
DEEPSPEED_CONF,
deepspeed_conf,
]
local_cmd = DeepspeedTorchDistributor._create_torchrun_command(
input_params, TRAIN_FILE_PATH
input_params, train_file_path
)
self.assertEqual(local_cmd, LOCAL_CMD_NO_ARGS_EXPECTED)
self.assertEqual(local_cmd, local_cmd_no_args_expected)
with self.subTest(msg="Testing local training with extra args for the training script"):
local_mode_version_args = ["--arg1", "--arg2"]
LOCAL_CMD_ARGS_EXPECTED = [
local_cmd_args_expected = [
sys.executable,
"-m",
"torch.distributed.run",
*torchrun_local_args_expected,
f"--nproc_per_node={NUM_PROCS}",
TRAIN_FILE_PATH,
f"--nproc_per_node={num_procs}",
train_file_path,
*local_mode_version_args,
"-deepspeed",
"--deepspeed",
"--deepspeed_config",
DEEPSPEED_CONF,
deepspeed_conf,
]

local_cmd_with_args = DeepspeedTorchDistributor._create_torchrun_command(
input_params, TRAIN_FILE_PATH, *local_mode_version_args
input_params, train_file_path, *local_mode_version_args
)
self.assertEqual(local_cmd_with_args, LOCAL_CMD_ARGS_EXPECTED)
self.assertEqual(local_cmd_with_args, local_cmd_args_expected)

def test_create_torchrun_command_distributed(self) -> None:
DEEPSPEED_CONF = "path/to/deepspeed"
TRAIN_FILE_PATH = "path/to/exec"
NUM_PROCS = 10
deepspeed_conf = "path/to/deepspeed"
train_file_path = "path/to/exec"
num_procs = 10
input_params: Dict[str, Any] = {}
input_params["local_mode"] = True
input_params["num_processes"] = NUM_PROCS
input_params["deepspeed_config"] = DEEPSPEED_CONF
input_params["num_processes"] = num_procs
input_params["deepspeed_config"] = deepspeed_conf
(
distributed_master_address,
distributed_master_port,
distributed_rank,
) = self._get_env_variables_distributed()
distributed_torchrun_args = [
f"--nnodes={NUM_PROCS}",
f"--nnodes={num_procs}",
f"--node_rank={distributed_rank}",
f"--rdzv_endpoint={distributed_master_address}:{distributed_master_port}",
"--rdzv_id=0",
]
with self.subTest(msg="Distributed training command verification with no extra args"):
DISTRIBUTED_CMD_NO_ARGS_EXPECTED = [
distributed_cmd_no_args_expected = [
sys.executable,
"-m",
"torch.distributed.run",
*distributed_torchrun_args,
"--nproc_per_node=1",
TRAIN_FILE_PATH,
"-deepspeed",
train_file_path,
"--deepspeed",
"--deepspeed_config",
DEEPSPEED_CONF,
deepspeed_conf,
]
input_params["local_mode"] = False
distributed_command = DeepspeedTorchDistributor._create_torchrun_command(
input_params, TRAIN_FILE_PATH
input_params, train_file_path
)
self.assertEqual(DISTRIBUTED_CMD_NO_ARGS_EXPECTED, distributed_command)
self.assertEqual(distributed_cmd_no_args_expected, distributed_command)
with self.subTest(msg="Distributed training command verification with extra arguments"):
distributed_extra_args = ["-args1", "--args2"]
DISTRIBUTED_CMD_ARGS_EXPECTED = [
distributed_cmd_args_expected = [
sys.executable,
"-m",
"torch.distributed.run",
*distributed_torchrun_args,
"--nproc_per_node=1",
TRAIN_FILE_PATH,
train_file_path,
*distributed_extra_args,
"-deepspeed",
"--deepspeed",
"--deepspeed_config",
DEEPSPEED_CONF,
deepspeed_conf,
]
distributed_command_with_args = DeepspeedTorchDistributor._create_torchrun_command(
input_params, TRAIN_FILE_PATH, *distributed_extra_args
input_params, train_file_path, *distributed_extra_args
)
self.assertEqual(DISTRIBUTED_CMD_ARGS_EXPECTED, distributed_command_with_args)
self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)


if __name__ == "__main__":
Expand Down

0 comments on commit 001e93d

Please sign in to comment.