From 13d4bda556a492d67b4fda44b34e05872b8548cb Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Thu, 7 Mar 2024 08:19:15 +0000 Subject: [PATCH] Fix Distributed Data Samplers in PyTorch Examples (#2012) Signed-off-by: Andrey Velichkevich Signed-off-by: deepanker13 --- examples/pytorch/mnist/mnist.py | 235 ++++--- .../sdk/create-pytorchjob-from-func.ipynb | 605 ++++++++---------- 2 files changed, 436 insertions(+), 404 deletions(-) diff --git a/examples/pytorch/mnist/mnist.py b/examples/pytorch/mnist/mnist.py index 74eb0f803b..372ac953fc 100644 --- a/examples/pytorch/mnist/mnist.py +++ b/examples/pytorch/mnist/mnist.py @@ -10,8 +10,7 @@ import torch.nn as nn import torch.nn.functional as F import torch.optim as optim - -WORLD_SIZE = int(os.environ.get('WORLD_SIZE', 1)) +from torch.utils.data import DistributedSampler class Net(nn.Module): @@ -19,7 +18,7 @@ def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 20, 5, 1) self.conv2 = nn.Conv2d(20, 50, 5, 1) - self.fc1 = nn.Linear(4*4*50, 500) + self.fc1 = nn.Linear(4 * 4 * 50, 500) self.fc2 = nn.Linear(500, 10) def forward(self, x): @@ -27,83 +26,144 @@ def forward(self, x): x = F.max_pool2d(x, 2, 2) x = F.relu(self.conv2(x)) x = F.max_pool2d(x, 2, 2) - x = x.view(-1, 4*4*50) + x = x.view(-1, 4 * 4 * 50) x = F.relu(self.fc1(x)) x = self.fc2(x) return F.log_softmax(x, dim=1) - -def train(args, model, device, train_loader, optimizer, epoch, writer): + + +def train(args, model, device, train_loader, epoch, writer): model.train() + optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) + for batch_idx, (data, target) in enumerate(train_loader): + # Attach tensors to the device. data, target = data.to(device), target.to(device) + optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: - print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format( - epoch, batch_idx * len(data), len(train_loader.dataset), - 100. * batch_idx / len(train_loader), loss.item())) + print( + "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( + epoch, + batch_idx * len(data), + len(train_loader.dataset), + 100.0 * batch_idx / len(train_loader), + loss.item(), + ) + ) niter = epoch * len(train_loader) + batch_idx - writer.add_scalar('loss', loss.item(), niter) + writer.add_scalar("loss", loss.item(), niter) + -def test(args, model, device, test_loader, writer, epoch): +def test(model, device, test_loader, writer, epoch): model.eval() - test_loss = 0 + correct = 0 with torch.no_grad(): for data, target in test_loader: + # Attach tensors to the device. data, target = data.to(device), target.to(device) + output = model(data) - test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss - pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability + # Get the index of the max log-probability. + pred = output.max(1, keepdim=True)[1] correct += pred.eq(target.view_as(pred)).sum().item() - test_loss /= len(test_loader.dataset) - print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset))) - writer.add_scalar('accuracy', float(correct) / len(test_loader.dataset), epoch) - - -def should_distribute(): - return dist.is_available() and WORLD_SIZE > 1 - - -def is_distributed(): - return dist.is_available() and dist.is_initialized() + print("\naccuracy={:.4f}\n".format(float(correct) / len(test_loader.dataset))) + writer.add_scalar("accuracy", float(correct) / len(test_loader.dataset), epoch) def main(): # Training settings - parser = argparse.ArgumentParser(description='PyTorch MNIST Example') - parser.add_argument('--batch-size', type=int, default=64, metavar='N', - help='input batch size for training (default: 64)') - parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', - help='input batch size for testing (default: 1000)') - parser.add_argument('--epochs', type=int, default=1, metavar='N', - help='number of epochs to train (default: 10)') - parser.add_argument('--lr', type=float, default=0.01, metavar='LR', - help='learning rate (default: 0.01)') - parser.add_argument('--momentum', type=float, default=0.5, metavar='M', - help='SGD momentum (default: 0.5)') - parser.add_argument('--no-cuda', action='store_true', default=False, - help='disables CUDA training') - parser.add_argument('--seed', type=int, default=1, metavar='S', - help='random seed (default: 1)') - parser.add_argument('--log-interval', type=int, default=10, metavar='N', - help='how many batches to wait before logging training status') - parser.add_argument('--save-model', action='store_true', default=False, - help='For Saving the current Model') - parser.add_argument('--dir', default='logs', metavar='L', - help='directory where summary logs are stored') - if dist.is_available(): - parser.add_argument('--backend', type=str, help='Distributed backend', - choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI], - default=dist.Backend.GLOO) + parser = argparse.ArgumentParser(description="PyTorch FashionMNIST Example") + parser.add_argument( + "--batch-size", + type=int, + default=64, + metavar="N", + help="input batch size for training (default: 64)", + ) + parser.add_argument( + "--test-batch-size", + type=int, + default=1000, + metavar="N", + help="input batch size for testing (default: 1000)", + ) + parser.add_argument( + "--epochs", + type=int, + default=1, + metavar="N", + help="number of epochs to train (default: 10)", + ) + parser.add_argument( + "--lr", + type=float, + default=0.01, + metavar="LR", + help="learning rate (default: 0.01)", + ) + parser.add_argument( + "--momentum", + type=float, + default=0.5, + metavar="M", + help="SGD momentum (default: 0.5)", + ) + parser.add_argument( + "--no-cuda", + action="store_true", + default=False, + help="disables CUDA training", + ) + parser.add_argument( + "--seed", + type=int, + default=1, + metavar="S", + help="random seed (default: 1)", + ) + parser.add_argument( + "--log-interval", + type=int, + default=10, + metavar="N", + help="how many batches to wait before logging training status", + ) + parser.add_argument( + "--save-model", + action="store_true", + default=False, + help="For Saving the current Model", + ) + parser.add_argument( + "--dir", + default="logs", + metavar="L", + help="directory where summary logs are stored", + ) + + parser.add_argument( + "--backend", + type=str, + help="Distributed backend", + choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI], + default=dist.Backend.GLOO, + ) + args = parser.parse_args() use_cuda = not args.no_cuda and torch.cuda.is_available() if use_cuda: - print('Using CUDA') + print("Using CUDA") + if args.backend != dist.Backend.NCCL: + print( + "Warning. Please use `nccl` distributed backend for the best performance using GPUs" + ) writer = SummaryWriter(args.dir) @@ -111,40 +171,55 @@ def main(): device = torch.device("cuda" if use_cuda else "cpu") - if should_distribute(): - print('Using distributed PyTorch with {} backend'.format(args.backend)) - dist.init_process_group(backend=args.backend) + # Attach model to the device. + model = Net().to(device) - kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} + print("Using distributed PyTorch with {} backend".format(args.backend)) + # Set distributed training environment variables to run this training script locally. + if "WORLD_SIZE" not in os.environ: + os.environ["RANK"] = "0" + os.environ["WORLD_SIZE"] = "1" + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "1234" + + print(f"World Size: {os.environ['WORLD_SIZE']}. Rank: {os.environ['RANK']}") + + dist.init_process_group(backend=args.backend) + Distributor = nn.parallel.DistributedDataParallel + model = Distributor(model) + + # Get FashionMNIST train and test dataset. + train_ds = datasets.FashionMNIST( + "../data", + train=True, + download=True, + transform=transforms.Compose([transforms.ToTensor()]), + ) + test_ds = datasets.FashionMNIST( + "../data", + train=False, + download=True, + transform=transforms.Compose([transforms.ToTensor()]), + ) + # Add train and test loaders. train_loader = torch.utils.data.DataLoader( - datasets.FashionMNIST('../data', train=True, download=True, - transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])), - batch_size=args.batch_size, shuffle=True, **kwargs) + train_ds, + batch_size=args.batch_size, + sampler=DistributedSampler(train_ds), + ) test_loader = torch.utils.data.DataLoader( - datasets.FashionMNIST('../data', train=False, transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ])), - batch_size=args.test_batch_size, shuffle=False, **kwargs) + test_ds, + batch_size=args.test_batch_size, + sampler=DistributedSampler(test_ds), + ) - model = Net().to(device) + for epoch in range(1, args.epochs + 1): + train(args, model, device, train_loader, epoch, writer) + test(model, device, test_loader, writer, epoch) - if is_distributed(): - Distributor = nn.parallel.DistributedDataParallel if use_cuda \ - else nn.parallel.DistributedDataParallelCPU - model = Distributor(model) + if args.save_model: + torch.save(model.state_dict(), "mnist_cnn.pt") - optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) - - for epoch in range(1, args.epochs + 1): - train(args, model, device, train_loader, optimizer, epoch, writer) - test(args, model, device, test_loader, writer, epoch) - if (args.save_model): - torch.save(model.state_dict(),"mnist_cnn.pt") - -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/examples/sdk/create-pytorchjob-from-func.ipynb b/examples/sdk/create-pytorchjob-from-func.ipynb index 08d45afa0e..7c040e6a0d 100644 --- a/examples/sdk/create-pytorchjob-from-func.ipynb +++ b/examples/sdk/create-pytorchjob-from-func.ipynb @@ -52,26 +52,35 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 2, "id": "69f21f33-5c64-452c-90c4-977fc0dadb3b", "metadata": { + "execution": { + "iopub.execute_input": "2024-03-05T21:44:44.851155Z", + "iopub.status.busy": "2024-03-05T21:44:44.850918Z", + "iopub.status.idle": "2024-03-05T21:44:44.862195Z", + "shell.execute_reply": "2024-03-05T21:44:44.860949Z", + "shell.execute_reply.started": "2024-03-05T21:44:44.851138Z" + }, "tags": [] }, "outputs": [], "source": [ - "def train_pytorch_model():\n", + "def train_pytorch_model(parameters):\n", " import logging\n", " import os\n", - " from torchvision import transforms, datasets\n", + "\n", " import torch\n", - " from torch import nn\n", - " import torch.nn.functional as F\n", " import torch.distributed as dist\n", + " import torch.nn.functional as F\n", + " from torch import nn\n", + " from torch.utils.data import DistributedSampler\n", + " from torchvision import datasets, transforms\n", "\n", " logging.basicConfig(\n", " format=\"%(asctime)s %(levelname)-8s %(message)s\",\n", " datefmt=\"%Y-%m-%dT%H:%M:%SZ\",\n", - " level=logging.DEBUG,\n", + " level=logging.INFO,\n", " )\n", "\n", " # Create PyTorch CNN Model.\n", @@ -97,34 +106,53 @@ " # Kubeflow Training Operator automatically set appropriate RANK and WORLD_SIZE based on the configuration.\n", " RANK = int(os.environ[\"RANK\"])\n", " WORLD_SIZE = int(os.environ[\"WORLD_SIZE\"])\n", - " \n", + "\n", + " # IF GPU is available, nccl dist backend is used. Otherwise, gloo dist backend is used.\n", + " if torch.cuda.is_available():\n", + " device = \"cuda\"\n", + " backend = \"nccl\"\n", + " else:\n", + " device = \"cpu\"\n", + " backend = \"gloo\"\n", + "\n", + " logging.info(f\"Using Device: {device}, Backend: {backend}\")\n", + "\n", " model = Net()\n", + " # Attach model to the device.\n", + " model = model.to(device)\n", + "\n", " # Attach model to DistributedDataParallel strategy.\n", " dist.init_process_group(backend=\"gloo\", rank=RANK, world_size=WORLD_SIZE)\n", " Distributor = nn.parallel.DistributedDataParallel\n", " model = Distributor(model)\n", "\n", - " # Split batch size for each worker.\n", - " batch_size = int(128 / WORLD_SIZE)\n", + " # Get Fashion MNIST Dataset.\n", + " dataset = datasets.FashionMNIST(\n", + " \"./data\",\n", + " train=True,\n", + " download=True,\n", + " transform=transforms.Compose([transforms.ToTensor()]),\n", + " )\n", "\n", - " # Get Fashion MNIST DataSet.\n", + " # Every PyTorchJob worker gets distributed sampler of dataset.\n", " train_loader = torch.utils.data.DataLoader(\n", - " datasets.FashionMNIST(\n", - " \"./data\",\n", - " train=True,\n", - " download=True,\n", - " transform=transforms.Compose([transforms.ToTensor()]),\n", - " ),\n", - " batch_size=batch_size,\n", + " dataset,\n", + " batch_size=128,\n", + " sampler=DistributedSampler(dataset),\n", " )\n", "\n", " # Start Training.\n", " logging.info(f\"Start training for RANK: {RANK}. WORLD_SIZE: {WORLD_SIZE}\")\n", - " for epoch in range(1):\n", + "\n", + " for epoch in range(int(parameters[\"NUM_EPOCHS\"])):\n", " model.train()\n", " optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)\n", "\n", " for batch_idx, (data, target) in enumerate(train_loader):\n", + " # Attach tensors to the device.\n", + " data = data.to(device)\n", + " target = target.to(device)\n", + "\n", " optimizer.zero_grad()\n", " output = model(data)\n", " loss = F.nll_loss(output, target)\n", @@ -139,7 +167,9 @@ " 100.0 * batch_idx / len(train_loader),\n", " loss.item(),\n", " )\n", - " )" + " )\n", + "\n", + " logging.info(\"Training is finished\")" ] }, { @@ -162,9 +192,16 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 3, "id": "9e2c6fd8-d0ba-4bc6-ac90-d4cf09751ace", "metadata": { + "execution": { + "iopub.execute_input": "2024-03-05T21:44:47.071420Z", + "iopub.status.busy": "2024-03-05T21:44:47.071188Z", + "iopub.status.idle": "2024-03-05T21:46:56.033826Z", + "shell.execute_reply": "2024-03-05T21:46:56.032986Z", + "shell.execute_reply.started": "2024-03-05T21:44:47.071404Z" + }, "tags": [] }, "outputs": [ @@ -172,10 +209,9 @@ "name": "stderr", "output_type": "stream", "text": [ - "/Users/avelichk/miniconda3/envs/training-operator/lib/python3.9/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", - " from .autonotebook import tqdm as notebook_tqdm\n", - "2023-09-08T22:00:27Z INFO Added key: store_based_barrier_key:1 to store for rank: 0\n", - "2023-09-08T22:00:27Z INFO Rank 0: Completed store-based barrier for key:store_based_barrier_key:1 with 1 nodes.\n" + "2024-03-05T21:44:47Z INFO Using Device: cpu, Backend: gloo\n", + "2024-03-05T21:44:47Z INFO Added key: store_based_barrier_key:1 to store for rank: 0\n", + "2024-03-05T21:44:47Z INFO Rank 0: Completed store-based barrier for key:store_based_barrier_key:1 with 1 nodes.\n" ] }, { @@ -187,11 +223,18 @@ ] }, { - "name": "stderr", - "output_type": "stream", - "text": [ - "100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 26421880/26421880 [00:01<00:00, 22627052.40it/s]\n" - ] + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "f84c269459b842199b83caaee8bee276", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + " 0%| | 0/26421880 [00:00