From cc09c4f0a4f6250346403d21ccb441cf5e8b628a Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 27 Oct 2021 17:47:42 -0700 Subject: [PATCH 1/6] wip --- python/ray/train/BUILD | 18 ++++++++++++ python/ray/train/backends/tensorflow.py | 38 +++++++++++-------------- python/ray/train/backends/torch.py | 5 ++-- python/ray/train/tests/test_trainer.py | 30 +++++++++++++++---- 4 files changed, 62 insertions(+), 29 deletions(-) diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index 007b0695453b..b730f0a5dd86 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -12,6 +12,24 @@ py_test( args = ["--smoke-test"] ) +py_test( + name = "tensorflow_quick_start", + size = "medium", + main = "examples/tensorflow_quick_start.py", + srcs = ["examples/tensorflow_quick_start.py"], + tags = ["team:ml", "exclusive"], + deps = [":train_lib"] +) + +py_test( + name = "torch_quick_start", + size = "medium", + main = "examples/torch_quick_start.py", + srcs = ["examples/torch_quick_start.py"], + tags = ["team:ml", "exclusive"], + deps = [":train_lib"] +) + py_test( name = "transformers_example", size = "large", diff --git a/python/ray/train/backends/tensorflow.py b/python/ray/train/backends/tensorflow.py index 9a0900095d22..236245f008b5 100644 --- a/python/ray/train/backends/tensorflow.py +++ b/python/ray/train/backends/tensorflow.py @@ -54,27 +54,23 @@ def setup_tensorflow_environment(worker_addresses: List[str], index: int): class TensorflowBackend(Backend): def on_start(self, worker_group: WorkerGroup, backend_config: TensorflowConfig): - if len(worker_group) > 1: - # Compute URL for initializing distributed setup. - def get_url(): - address, port = get_address_and_port() - return f"{address}:{port}" - - urls = worker_group.execute(get_url) - - # Get setup tasks in order to throw errors on failure. - setup_futures = [] - for i in range(len(worker_group)): - setup_futures.append( - worker_group.execute_single_async( - i, - setup_tensorflow_environment, - worker_addresses=urls, - index=i)) - ray.get(setup_futures) - - else: - logger.info("Distributed Tensorflow is not being used.") + # Compute URL for initializing distributed setup. + def get_url(): + address, port = get_address_and_port() + return f"{address}:{port}" + + urls = worker_group.execute(get_url) + + # Get setup tasks in order to throw errors on failure. + setup_futures = [] + for i in range(len(worker_group)): + setup_futures.append( + worker_group.execute_single_async( + i, + setup_tensorflow_environment, + worker_addresses=urls, + index=i)) + ray.get(setup_futures) def handle_failure(self, worker_group: WorkerGroup, failed_worker_indexes: List[int], diff --git a/python/ray/train/backends/torch.py b/python/ray/train/backends/torch.py index 3cffe0f8ccdf..cee6447f96e5 100644 --- a/python/ray/train/backends/torch.py +++ b/python/ray/train/backends/torch.py @@ -1,3 +1,4 @@ +import warnings from dataclasses import dataclass import logging import os @@ -95,7 +96,7 @@ class TorchBackend(Backend): share_cuda_visible_devices: bool = True def on_start(self, worker_group: WorkerGroup, backend_config: TorchConfig): - if len(worker_group) > 1 and dist.is_available(): + if dist.is_available(): # Set the appropriate training backend. if backend_config.backend is None: if worker_group.num_gpus_per_worker > 0: @@ -137,7 +138,7 @@ def set_env_vars(addr, port): timeout_s=backend_config.timeout_s)) ray.get(setup_futures) else: - logger.info("Distributed torch is not being used.") + raise RuntimeError("Distributed torch is not available.") def on_shutdown(self, worker_group: WorkerGroup, backend_config: TorchConfig): diff --git a/python/ray/train/tests/test_trainer.py b/python/ray/train/tests/test_trainer.py index c73b8bc548a1..83876d1c3b03 100644 --- a/python/ray/train/tests/test_trainer.py +++ b/python/ray/train/tests/test_trainer.py @@ -23,6 +23,8 @@ as fashion_mnist_train_func from ray.train.examples.train_linear_example import train_func as \ linear_train_func +from ray.train.examples.torch_quick_start import train_func as \ + quick_start_train_func from ray.train.worker_group import WorkerGroup @@ -549,9 +551,9 @@ def train_func(): assert set(results) == {0, 1} - -def test_tensorflow_mnist(ray_start_2_cpus): - num_workers = 2 +@pytest.mark.parametrize("num_workers", [1, 2]) +def test_tensorflow_mnist(ray_start_2_cpus, num_workers): + num_workers = num_workers epochs = 3 trainer = Trainer("tensorflow", num_workers=num_workers) @@ -571,9 +573,17 @@ def test_tensorflow_mnist(ray_start_2_cpus): assert len(accuracy) == epochs assert accuracy[-1] > accuracy[0] +def test_tf_non_distributed(ray_start_2_cpus): + """Make sure Ray Train works without TF MultiWorkerMirroredStrategy.""" -def test_torch_linear(ray_start_2_cpus): - num_workers = 2 + trainer = Trainer(backend="torch", num_workers=1) + trainer.start() + trainer.run(quick_start_train_func) + trainer.shutdown() + +@pytest.mark.parametrize("num_workers", [1, 2]) +def test_torch_linear(ray_start_2_cpus, num_workers): + num_workers = num_workers epochs = 3 trainer = Trainer("torch", num_workers=num_workers) @@ -588,7 +598,6 @@ def test_torch_linear(ray_start_2_cpus): assert len(result) == epochs assert result[-1]["loss"] < result[0]["loss"] - def test_torch_fashion_mnist(ray_start_2_cpus): num_workers = 2 epochs = 3 @@ -606,6 +615,15 @@ def test_torch_fashion_mnist(ray_start_2_cpus): assert result[-1] < result[0] +def test_torch_non_distributed(ray_start_2_cpus): + """Make sure Ray Train works without torch DDP.""" + + trainer = Trainer(backend="torch", num_workers=1) + trainer.start() + trainer.run(quick_start_train_func) + trainer.shutdown() + + def test_horovod_simple(ray_start_2_cpus): def simple_fn(): hvd_torch.init() From 76b291d3697e00a22d9bc935b4fb884bd753fb3b Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 27 Oct 2021 17:54:54 -0700 Subject: [PATCH 2/6] update --- doc/source/train/train.rst | 183 ++++-------------- python/ray/train/backends/torch.py | 1 - .../train/examples/tensorflow_quick_start.py | 89 +++++++++ .../ray/train/examples/torch_quick_start.py | 86 ++++++++ python/ray/train/tests/test_trainer.py | 4 + 5 files changed, 219 insertions(+), 144 deletions(-) create mode 100644 python/ray/train/examples/tensorflow_quick_start.py create mode 100644 python/ray/train/examples/torch_quick_start.py diff --git a/doc/source/train/train.rst b/doc/source/train/train.rst index c9b7b70c101e..dc113281f889 100644 --- a/doc/source/train/train.rst +++ b/doc/source/train/train.rst @@ -63,58 +63,25 @@ system. Let's take following simple examples: First, set up your dataset and model. - .. code-block:: python - - import torch - import torch.nn as nn - - num_samples = 20 - input_size = 10 - layer_size = 15 - output_size = 5 - - class NeuralNetwork(nn.Module): - def __init__(self): - super(NeuralNetwork, self).__init__() - self.layer1 = nn.Linear(input_size, layer_size) - self.relu = nn.ReLU() - self.layer2 = nn.Linear(layer_size, output_size) - - def forward(self, input): - return self.layer2(self.relu(self.layer1(input))) - - # In this example we use a randomly generated dataset. - input = torch.randn(num_samples, input_size) - labels = torch.randn(num_samples, output_size) + .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py + :language: python + :start-after: __torch_setup_begin__ + :end-before: __torch_setup_end__ Now define your single-worker PyTorch training function. - .. code-block:: python - - import torch.optim as optim - - def train_func(): - num_epochs = 3 - model = NeuralNetwork() - loss_fn = nn.MSELoss() - optimizer = optim.SGD(model.parameters(), lr=0.1) - - for epoch in range(num_epochs): - output = model(input) - loss = loss_fn(output, labels) - optimizer.zero_grad() - loss.backward() - optimizer.step() - print(f"epoch: {epoch}, loss: {loss.item()}") - + .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py + :language: python + :start-after: __torch_single_begin__ + :end-before: __torch_single_end__ This training function can be executed with: - .. code-block:: python - - train_func() - + .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py + :language: python + :start-after: __torch_single_run_begin__ + :end-before: __torch_single_run_end__ Now let's convert this to a distributed multi-worker training function! @@ -123,41 +90,21 @@ system. Let's take following simple examples: data parallel code as as you would normally run it with ``torch.distributed.launch``. - .. code-block:: python - - from torch.nn.parallel import DistributedDataParallel - - def train_func_distributed(): - num_epochs = 3 - model = NeuralNetwork() - model = DistributedDataParallel(model) - loss_fn = nn.MSELoss() - optimizer = optim.SGD(model.parameters(), lr=0.1) - - for epoch in range(num_epochs): - output = model(input) - loss = loss_fn(output, labels) - optimizer.zero_grad() - loss.backward() - optimizer.step() - print(f"epoch: {epoch}, loss: {loss.item()}") + .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py + :language: python + :start-after: __torch_single_distributed_begin__ + :end-before: __torch_single_distributed_end__ Then, instantiate a ``Trainer`` that uses a ``"torch"`` backend with 4 workers, and use it to run the new training function! - .. code-block:: python - - from ray.train import Trainer - - trainer = Trainer(backend="torch", num_workers=4) - trainer.start() - results = trainer.run(train_func_distributed) - trainer.shutdown() - + .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py + :language: python + :start-after: __torch_single_trainer_begin__ + :end-before: __torch_single_trainer_end__ See :ref:`train-porting-code` for a more comprehensive example. - .. group-tab:: TensorFlow This example shows how you can use Ray Train to set up `Multi-worker training @@ -165,52 +112,24 @@ system. Let's take following simple examples: First, set up your dataset and model. - .. code-block:: python - - import numpy as np - import tensorflow as tf - - def mnist_dataset(batch_size): - (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() - # The `x` arrays are in uint8 and have values in the [0, 255] range. - # You need to convert them to float32 with values in the [0, 1] range. - x_train = x_train / np.float32(255) - y_train = y_train.astype(np.int64) - train_dataset = tf.data.Dataset.from_tensor_slices( - (x_train, y_train)).shuffle(60000).repeat().batch(batch_size) - return train_dataset - - - def build_and_compile_cnn_model(): - model = tf.keras.Sequential([ - tf.keras.layers.InputLayer(input_shape=(28, 28)), - tf.keras.layers.Reshape(target_shape=(28, 28, 1)), - tf.keras.layers.Conv2D(32, 3, activation='relu'), - tf.keras.layers.Flatten(), - tf.keras.layers.Dense(128, activation='relu'), - tf.keras.layers.Dense(10) - ]) - model.compile( - loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), - optimizer=tf.keras.optimizers.SGD(learning_rate=0.001), - metrics=['accuracy']) - return model + .. literalinclude:: /../../python/ray/train/examples/tensorflow_quick_start.py + :language: python + :start-after: __tf_setup_begin__ + :end-before: __tf_setup_end__ Now define your single-worker TensorFlow training function. - .. code-block:: python - - def train_func(): - batch_size = 64 - single_worker_dataset = mnist.mnist_dataset(batch_size) - single_worker_model = mnist.build_and_compile_cnn_model() - single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70) + .. literalinclude:: /../../python/ray/train/examples/tensorflow_quick_start.py + :language: python + :start-after: __tf_single_begin__ + :end-before: __tf_single_end__ This training function can be executed with: - .. code-block:: python - - train_func() + .. literalinclude:: /../../python/ray/train/examples/tensorflow_quick_start.py + :language: python + :start-after: __tf_single_run_begin__ + :end-before: __tf_single_run_end__ Now let's convert this to a distributed multi-worker training function! All you need to do is: @@ -220,40 +139,18 @@ system. Let's take following simple examples: 2. Choose your TensorFlow distributed training strategy. In this example we use the ``MultiWorkerMirroredStrategy``. - .. code-block:: python - - import json - import os - - def train_func_distributed(): - per_worker_batch_size = 64 - # This environment variable will be set by Ray Train. - tf_config = json.loads(os.environ['TF_CONFIG']) - num_workers = len(tf_config['cluster']['worker']) - - strategy = tf.distribute.MultiWorkerMirroredStrategy() - - global_batch_size = per_worker_batch_size * num_workers - multi_worker_dataset = mnist_dataset(global_batch_size) - - with strategy.scope(): - # Model building/compiling need to be within `strategy.scope()`. - multi_worker_model = build_and_compile_cnn_model() - - multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70) + .. literalinclude:: /../../python/ray/train/examples/tensorflow_quick_start.py + :language: python + :start-after: __tf_distributed_begin__ + :end-before: __tf_distributed_end__ Then, instantiate a ``Trainer`` that uses a ``"tensorflow"`` backend with 4 workers, and use it to run the new training function! - .. code-block:: python - - from ray.train import Trainer - - trainer = Trainer(backend="tensorflow", num_workers=4) - trainer.start() - results = trainer.run(train_func_distributed) - trainer.shutdown() - + .. literalinclude:: /../../python/ray/train/examples/tensorflow_quick_start.py + :language: python + :start-after: __tf_trainer_begin__ + :end-before: __tf_trainer_end__ See :ref:`train-porting-code` for a more comprehensive example. diff --git a/python/ray/train/backends/torch.py b/python/ray/train/backends/torch.py index cee6447f96e5..3e766501ffa9 100644 --- a/python/ray/train/backends/torch.py +++ b/python/ray/train/backends/torch.py @@ -1,4 +1,3 @@ -import warnings from dataclasses import dataclass import logging import os diff --git a/python/ray/train/examples/tensorflow_quick_start.py b/python/ray/train/examples/tensorflow_quick_start.py new file mode 100644 index 000000000000..c1f90571c852 --- /dev/null +++ b/python/ray/train/examples/tensorflow_quick_start.py @@ -0,0 +1,89 @@ +# flake8: noqa +# yapf: disable + +# __tf_setup_begin__ + +import numpy as np +import tensorflow as tf + + +def mnist_dataset(batch_size): + (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() + # The `x` arrays are in uint8 and have values in the [0, 255] range. + # You need to convert them to float32 with values in the [0, 1] range. + x_train = x_train / np.float32(255) + y_train = y_train.astype(np.int64) + train_dataset = tf.data.Dataset.from_tensor_slices( + (x_train, y_train)).shuffle(60000).repeat().batch(batch_size) + return train_dataset + + +def build_and_compile_cnn_model(): + model = tf.keras.Sequential([ + tf.keras.layers.InputLayer(input_shape=(28, 28)), + tf.keras.layers.Reshape(target_shape=(28, 28, 1)), + tf.keras.layers.Conv2D(32, 3, activation='relu'), + tf.keras.layers.Flatten(), + tf.keras.layers.Dense(128, activation='relu'), + tf.keras.layers.Dense(10) + ]) + model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), + optimizer=tf.keras.optimizers.SGD(learning_rate=0.001), + metrics=['accuracy']) + return model + +# __tf_setup_end__ + +# __tf_single_begin__ + +from keras.datasets import mnist + +def train_func(): + batch_size = 64 + single_worker_dataset = mnist.mnist_dataset(batch_size) + single_worker_model = mnist.build_and_compile_cnn_model() + single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70) + +# __tf_single_end__ + +# __tf_single_run_begin__ + +train_func() + +# __tf_single_run_end__ + +# __tf_distributed_begin__ + +import json +import os + +def train_func_distributed(): + per_worker_batch_size = 64 + # This environment variable will be set by Ray Train. + tf_config = json.loads(os.environ['TF_CONFIG']) + num_workers = len(tf_config['cluster']['worker']) + + strategy = tf.distribute.MultiWorkerMirroredStrategy() + + global_batch_size = per_worker_batch_size * num_workers + multi_worker_dataset = mnist_dataset(global_batch_size) + + with strategy.scope(): + # Model building/compiling need to be within `strategy.scope()`. + multi_worker_model = build_and_compile_cnn_model() + + multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70) + +# __tf_distributed_end__ + +# __tf_trainer_begin__ + +from ray.train import Trainer + +trainer = Trainer(backend="tensorflow", num_workers=4) +trainer.start() +results = trainer.run(train_func_distributed) +trainer.shutdown() + +# __tf_trainer_end__ \ No newline at end of file diff --git a/python/ray/train/examples/torch_quick_start.py b/python/ray/train/examples/torch_quick_start.py new file mode 100644 index 000000000000..e35de2521eee --- /dev/null +++ b/python/ray/train/examples/torch_quick_start.py @@ -0,0 +1,86 @@ +# flake8: noqa +# yapf: disable + +# __torch_setup_begin__ +import torch +import torch.nn as nn + +num_samples = 20 +input_size = 10 +layer_size = 15 +output_size = 5 + +class NeuralNetwork(nn.Module): + def __init__(self): + super(NeuralNetwork, self).__init__() + self.layer1 = nn.Linear(input_size, layer_size) + self.relu = nn.ReLU() + self.layer2 = nn.Linear(layer_size, output_size) + + def forward(self, input): + return self.layer2(self.relu(self.layer1(input))) + +# In this example we use a randomly generated dataset. +input = torch.randn(num_samples, input_size) +labels = torch.randn(num_samples, output_size) + +# __torch_setup_end__ + +# __torch_single_begin__ + +import torch.optim as optim + +def train_func(): + num_epochs = 3 + model = NeuralNetwork() + loss_fn = nn.MSELoss() + optimizer = optim.SGD(model.parameters(), lr=0.1) + + for epoch in range(num_epochs): + output = model(input) + loss = loss_fn(output, labels) + optimizer.zero_grad() + loss.backward() + optimizer.step() + print(f"epoch: {epoch}, loss: {loss.item()}") + +# __torch_single_end__ + +# __torch_single_run_begin__ + +train_func() + +# __torch_single_run_end__ + +# __torch_distributed_begin__ + +from torch.nn.parallel import DistributedDataParallel + +def train_func_distributed(): + num_epochs = 3 + model = NeuralNetwork() + model = DistributedDataParallel(model) + loss_fn = nn.MSELoss() + optimizer = optim.SGD(model.parameters(), lr=0.1) + + for epoch in range(num_epochs): + output = model(input) + loss = loss_fn(output, labels) + optimizer.zero_grad() + loss.backward() + optimizer.step() + print(f"epoch: {epoch}, loss: {loss.item()}") + +# __torch_distributed_end__ + + +# __torch_trainer_begin__ + +from ray.train import Trainer + +trainer = Trainer(backend="torch", num_workers=4) +trainer.start() +results = trainer.run(train_func_distributed) +trainer.shutdown() + +# __torch_trainer_end__ \ No newline at end of file diff --git a/python/ray/train/tests/test_trainer.py b/python/ray/train/tests/test_trainer.py index 83876d1c3b03..14c0abc30142 100644 --- a/python/ray/train/tests/test_trainer.py +++ b/python/ray/train/tests/test_trainer.py @@ -551,6 +551,7 @@ def train_func(): assert set(results) == {0, 1} + @pytest.mark.parametrize("num_workers", [1, 2]) def test_tensorflow_mnist(ray_start_2_cpus, num_workers): num_workers = num_workers @@ -573,6 +574,7 @@ def test_tensorflow_mnist(ray_start_2_cpus, num_workers): assert len(accuracy) == epochs assert accuracy[-1] > accuracy[0] + def test_tf_non_distributed(ray_start_2_cpus): """Make sure Ray Train works without TF MultiWorkerMirroredStrategy.""" @@ -581,6 +583,7 @@ def test_tf_non_distributed(ray_start_2_cpus): trainer.run(quick_start_train_func) trainer.shutdown() + @pytest.mark.parametrize("num_workers", [1, 2]) def test_torch_linear(ray_start_2_cpus, num_workers): num_workers = num_workers @@ -598,6 +601,7 @@ def test_torch_linear(ray_start_2_cpus, num_workers): assert len(result) == epochs assert result[-1]["loss"] < result[0]["loss"] + def test_torch_fashion_mnist(ray_start_2_cpus): num_workers = 2 epochs = 3 From 8e1161fc8fe2d9f75f5f606e29f97f426720ef54 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 27 Oct 2021 18:50:30 -0700 Subject: [PATCH 3/6] fix --- python/ray/train/tests/test_trainer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/ray/train/tests/test_trainer.py b/python/ray/train/tests/test_trainer.py index 14c0abc30142..f576639316c5 100644 --- a/python/ray/train/tests/test_trainer.py +++ b/python/ray/train/tests/test_trainer.py @@ -24,7 +24,9 @@ from ray.train.examples.train_linear_example import train_func as \ linear_train_func from ray.train.examples.torch_quick_start import train_func as \ - quick_start_train_func + torch_quick_start_train_func +from ray.train.examples.tensorflow_quick_start import train_func as \ + tf_quick_start_train_func from ray.train.worker_group import WorkerGroup @@ -580,7 +582,7 @@ def test_tf_non_distributed(ray_start_2_cpus): trainer = Trainer(backend="torch", num_workers=1) trainer.start() - trainer.run(quick_start_train_func) + trainer.run(tf_quick_start_train_func) trainer.shutdown() @@ -624,7 +626,7 @@ def test_torch_non_distributed(ray_start_2_cpus): trainer = Trainer(backend="torch", num_workers=1) trainer.start() - trainer.run(quick_start_train_func) + trainer.run(torch_quick_start_train_func) trainer.shutdown() From 2d8ba1104a0f2e36759a0ada4d47f64aa51f4794 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 27 Oct 2021 19:08:06 -0700 Subject: [PATCH 4/6] fix --- .../train/examples/tensorflow_quick_start.py | 27 ++++++++++--------- .../ray/train/examples/torch_quick_start.py | 27 ++++++++++--------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/python/ray/train/examples/tensorflow_quick_start.py b/python/ray/train/examples/tensorflow_quick_start.py index c1f90571c852..0eb4525eb12a 100644 --- a/python/ray/train/examples/tensorflow_quick_start.py +++ b/python/ray/train/examples/tensorflow_quick_start.py @@ -47,12 +47,6 @@ def train_func(): # __tf_single_end__ -# __tf_single_run_begin__ - -train_func() - -# __tf_single_run_end__ - # __tf_distributed_begin__ import json @@ -77,13 +71,20 @@ def train_func_distributed(): # __tf_distributed_end__ -# __tf_trainer_begin__ +if __name__ == "__main__": + # __tf_single_run_begin__ + + train_func() + + # __tf_single_run_end__ + + # __tf_trainer_begin__ -from ray.train import Trainer + from ray.train import Trainer -trainer = Trainer(backend="tensorflow", num_workers=4) -trainer.start() -results = trainer.run(train_func_distributed) -trainer.shutdown() + trainer = Trainer(backend="tensorflow", num_workers=4) + trainer.start() + results = trainer.run(train_func_distributed) + trainer.shutdown() -# __tf_trainer_end__ \ No newline at end of file + # __tf_trainer_end__ \ No newline at end of file diff --git a/python/ray/train/examples/torch_quick_start.py b/python/ray/train/examples/torch_quick_start.py index e35de2521eee..1b70225c2534 100644 --- a/python/ray/train/examples/torch_quick_start.py +++ b/python/ray/train/examples/torch_quick_start.py @@ -46,12 +46,6 @@ def train_func(): # __torch_single_end__ -# __torch_single_run_begin__ - -train_func() - -# __torch_single_run_end__ - # __torch_distributed_begin__ from torch.nn.parallel import DistributedDataParallel @@ -74,13 +68,20 @@ def train_func_distributed(): # __torch_distributed_end__ -# __torch_trainer_begin__ +if __name__ == "__main__": + # __torch_single_run_begin__ + + train_func() + + # __torch_single_run_end__ + + # __torch_trainer_begin__ -from ray.train import Trainer + from ray.train import Trainer -trainer = Trainer(backend="torch", num_workers=4) -trainer.start() -results = trainer.run(train_func_distributed) -trainer.shutdown() + trainer = Trainer(backend="torch", num_workers=4) + trainer.start() + results = trainer.run(train_func_distributed) + trainer.shutdown() -# __torch_trainer_end__ \ No newline at end of file + # __torch_trainer_end__ \ No newline at end of file From e7e8554c42cf82cfec4f7ed4477acaa1cb78addd Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 27 Oct 2021 20:50:14 -0700 Subject: [PATCH 5/6] fix --- python/ray/train/examples/tensorflow_quick_start.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ray/train/examples/tensorflow_quick_start.py b/python/ray/train/examples/tensorflow_quick_start.py index 0eb4525eb12a..c56670d93202 100644 --- a/python/ray/train/examples/tensorflow_quick_start.py +++ b/python/ray/train/examples/tensorflow_quick_start.py @@ -37,12 +37,10 @@ def build_and_compile_cnn_model(): # __tf_single_begin__ -from keras.datasets import mnist - def train_func(): batch_size = 64 - single_worker_dataset = mnist.mnist_dataset(batch_size) - single_worker_model = mnist.build_and_compile_cnn_model() + single_worker_dataset = mnist_dataset(batch_size) + single_worker_model = build_and_compile_cnn_model() single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70) # __tf_single_end__ From 5176d47f39dd8a4e85d5d95651c015c1d8cebc3a Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 27 Oct 2021 20:55:22 -0700 Subject: [PATCH 6/6] fix --- doc/source/train/train.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/source/train/train.rst b/doc/source/train/train.rst index dc113281f889..9873291e27c9 100644 --- a/doc/source/train/train.rst +++ b/doc/source/train/train.rst @@ -92,16 +92,16 @@ system. Let's take following simple examples: .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py :language: python - :start-after: __torch_single_distributed_begin__ - :end-before: __torch_single_distributed_end__ + :start-after: __torch_distributed_begin__ + :end-before: __torch_distributed_end__ Then, instantiate a ``Trainer`` that uses a ``"torch"`` backend with 4 workers, and use it to run the new training function! .. literalinclude:: /../../python/ray/train/examples/torch_quick_start.py :language: python - :start-after: __torch_single_trainer_begin__ - :end-before: __torch_single_trainer_end__ + :start-after: __torch_trainer_begin__ + :end-before: __torch_trainer_end__ See :ref:`train-porting-code` for a more comprehensive example.