diff --git a/docs/source/en/features/pipeline_parallel.md b/docs/source/en/features/pipeline_parallel.md index 8b5f228a9e5e..cb19f9815bf2 100644 --- a/docs/source/en/features/pipeline_parallel.md +++ b/docs/source/en/features/pipeline_parallel.md @@ -1,14 +1,15 @@ # Pipeline Parallel -Author: Guangyang Lu, Hongxin Liu, Yongbin Li +Author: Guangyang Lu, Hongxin Liu, Yongbin Li, Mingyan Jiang **Prerequisite** -- [Define Your Configuration](../basics/define_your_config.md) -- [Use Engine and Trainer in Training](../basics/engine_trainer.md) -- [Configure Parallelization](../basics/configure_parallelization.md) +- [Paradigms of Parallelism](../concepts/paradigms_of_parallelism.md) +- [Use Booster to Training](../basics/booster_api.md) +- [Shardformer](../features/shardformer.md) +- [Plugin of Booster](../basics/booster_plugins.md) **Example Code** -- [ColossalAI-Examples ResNet with pipeline](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/features/pipeline_parallel) +- [Fine-tune Bert with pipeline](https://github.com/hpcaitech/ColossalAI/blob/main/examples/language/bert/finetune.py) **Related Paper** - [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883) @@ -17,7 +18,7 @@ Author: Guangyang Lu, Hongxin Liu, Yongbin Li ## Quick introduction -In this tutorial, you will learn how to use pipeline parallel. In Colossal-AI, we use 1F1B pipeline, introduced by Nvidia. In this case, ViT and Imagenet are too large to use. Therefore, here we use ResNet and Cifar as example. +In this tutorial, you will learn how to use pipeline parallel. In Colossal-AI, we use 1F1B pipeline, introduced by Nvidia. In this case, ViT and Imagenet are too large to use. Therefore, here we use bert model and glue dataset as example. ## Table Of Content @@ -25,7 +26,7 @@ In this tutorial we will cover: 1. Introduction of 1F1B pipeline. 2. Usage of non-interleaved and interleaved schedule. -3. Training ResNet with pipeline. +3. Finetune Bert with pipeline. ## Introduction of 1F1B pipeline @@ -60,101 +61,158 @@ In this schedule, each device can perform computation for multiple subsets of la This mode is both memory-efficient and time-efficient. -## Usage of non-interleaved and interleaved schedule +## Colossal-AI's Implementation -In Colossal-AI, we provided both non-interleaved(as `PipelineSchedule`) and interleaved schedule(as `InterleavedPipelineSchedule`). +In Colossal-AI, pipeline parallelism relies on the `scheduler` and [`Shardformer`](../features/shardformer.md). We provide both non-interleaved (`OneForwardOneBackwardSchedule`) and interleaved (`InterleavedSchedule`) schedules. While `Shardformer` implements layer splitting for models and replaces the `forward` function of the model to make it compatible with the scheduler. -You just need to set `NUM_MICRO_BATCHES` in config file and set `NUM_CHUNKS` in config file if you want to use Interleaved Pipeline Schedule. If you certainly know the shape of each pipeline stage's output tensor and the shapes are all the same, you can set `TENSOR_SHAPE` in config file to further reduce communication. Otherwise, you can just ignore `tensor_shape`, and the shape will be exchanged over pipeline stages automatically. Then we will generate an appropriate schedule for you. +In Colossal-AI, the `HybridParallelPlugin` encapsulates pipeline execution strategies. It manages pipeline parallel communication groups and a scheduler. When boosting the model with this plugin, the model's layers are split by calling the `shardformer.optimize` function, and then `execute_pipeline` is called to execute the model in segments using `OneForwardOneBackwardSchedule` which is default scheduler used in `HybridParallelPlugin`, and `InterleavedSchedule` will be integrated later. -## Training ResNet with pipeline +You can customize your parallel strategy by setting parameters for the `HybridParallelPlugin`. -Let's build the `ResNet` model first with Colossal PipelinableContext: +For more usage details, please refer to the [documentation](../basics/booster_plugins.md) for `HybridParallelPlugin`. + +## Fine-tune Bert with pipeline + +First, we define the necessary training components, including model, dataloader, optimizer, lr_scheduler, criterion: ```python -import os -from typing import Callable, List, Optional, Type, Union +import argparse +from typing import Callable, List, Union + import torch import torch.nn as nn +from data import GLUEDataBuilder +from torch.optim import Adam, Optimizer +from torch.optim.lr_scheduler import _LRScheduler as LRScheduler +from torch.utils.data import DataLoader +from tqdm import tqdm +from transformers import ( + AlbertForSequenceClassification, + AutoConfig, + BertForSequenceClassification, + get_linear_schedule_with_warmup, +) + import colossalai -import colossalai.nn as col_nn +from colossalai.booster import Booster +from colossalai.booster.plugin import HybridParallelPlugin +from colossalai.cluster import DistCoordinator +from colossalai.nn.optimizer import HybridAdam -from colossalai.core import global_context as gpc -from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.legacy.trainer import Trainer, hooks -from colossalai.utils import MultiTimer, get_dataloader -from colossalai.context import ParallelMode -from colossalai.pipeline.pipelinable import PipelinableContext +# Define some config +NUM_EPOCHS = 3 +BATCH_SIZE = 32 +LEARNING_RATE = 2.4e-5 +WEIGHT_DECAY = 0.01 +WARMUP_FRACTION = 0.1 + +coordinator = DistCoordinator() + +def move_to_cuda(batch): + return {k: v.cuda() for k, v in batch.items()} + + +# Define 'criterion' function with two inputs, which will be passed to 'execute_pipeline'. +def _criterion(outputs, inputs): + return outputs.loss + +# Define optimizer +lr = LEARNING_RATE +no_decay = ["bias", "LayerNorm.weight"] +optimizer_grouped_parameters = [ + { + "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], + "weight_decay": WEIGHT_DECAY, + }, + { + "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, +] -from titans.dataloader.cifar10 import build_cifar -from torchvision.models import resnet50 -from torchvision.models.resnet import BasicBlock, Bottleneck, conv1x1 +optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, eps=1e-8) -# Define some config -BATCH_SIZE = 64 -NUM_EPOCHS = 2 -NUM_CHUNKS = 1 -CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2)) - -# Train -disable_existing_loggers() -parser = colossalai.get_default_parser() -args = parser.parse_args() -colossalai.launch_from_torch(backend=args.backend, config=CONFIG) -logger = get_dist_logger() -pipelinable = PipelinableContext() - -# build model -with pipelinable: - model = resnet50() -``` -Define an execution sequence. -```python -exec_seq = [ - 'conv1', 'bn1', 'relu', 'maxpool', 'layer1', 'layer2', 'layer3', 'layer4', 'avgpool', - (lambda x: torch.flatten(x, 1), "behind"), 'fc' -] -pipelinable.to_layer_list(exec_seq) +# Define lr_scheduler +total_steps = len(train_dataloader) * NUM_EPOCHS +num_warmup_steps = int(WARMUP_FRACTION * total_steps) +lr_scheduler = get_linear_schedule_with_warmup( + optimizer, + num_warmup_steps=num_warmup_steps, + num_training_steps=total_steps, +) + + +# Define Bert model +model = BertForSequenceClassification.from_pretrained("bert-base-uncased", config=cfg).cuda() + +# Define a dataloader +data_builder = GLUEDataBuilder(model_name, + plugin, + args.task, + train_batch_size=BATCH_SIZE, + eval_batch_size=BATCH_SIZE) +train_dataloader = data_builder.train_dataloader() ``` -Partition the model into pipeline. +Define a booster with the `HybridParallelPlugin`. ```python -model = pipelinable.partition(NUM_CHUNKS, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE)) +plugin = HybridParallelPlugin(tp_size=1, + pp_size=2, + num_microbatches=None, + microbatch_size=1, + enable_all_optimization=True, + zero_stage=1, + precision='fp16', + initial_scale=1) +booster = Booster(plugin=plugin) ``` -In this tutorial, we use `Trainer` to train `ResNet`: +Boost these train componts with the booster created. ```python -# build criterion -criterion = nn.CrossEntropyLoss() - -# optimizer -optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) - -# build dataloader -root = os.environ.get('DATA', './data') -train_dataloader, test_dataloader = build_cifar(BATCH_SIZE, root, padding=4, crop=32, resize=32) - -lr_scheduler = col_nn.lr_scheduler.LinearWarmupLR(optimizer, NUM_EPOCHS, warmup_steps=1) -engine, train_dataloader, test_dataloader, lr_scheduler = colossalai.initialize(model, optimizer, criterion, - train_dataloader, test_dataloader, - lr_scheduler) -timer = MultiTimer() +model, optimizer, _criterion, _, lr_scheduler = booster.boost(model, + optimizer, + criterion=_criterion, + lr_scheduler=lr_scheduler) +``` -trainer = Trainer(engine=engine, timer=timer, logger=logger) +Train the model at last. -hook_list = [ - hooks.LossHook(), - hooks.AccuracyHook(col_nn.metric.Accuracy()), - hooks.LogMetricByEpochHook(logger), - hooks.LRSchedulerHook(lr_scheduler, by_epoch=True) -] - -trainer.fit(train_dataloader=train_dataloader, - epochs=NUM_EPOCHS, - test_dataloader=test_dataloader, - test_interval=1, - hooks=hook_list, - display_progress=True) +```python +# Define a train function +def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion: Callable, lr_scheduler: LRScheduler, + train_dataloader: DataLoader, booster: Booster, coordinator: DistCoordinator): + + is_pp_last_stage = booster.plugin.stage_manager.is_last_stage() + total_step = len(train_dataloader) + + model.train() + optimizer.zero_grad() + # convert train_dataloader to a iterator + train_dataloader_iter = iter(train_dataloader) + with tqdm(range(total_step), + desc=f'Epoch [{epoch + 1}/{NUM_EPOCHS}]', + disable=not (is_pp_last_stage)) as pbar: + # Forward pass + for _ in pbar: + outputs = booster.execute_pipeline(train_dataloader_iter, + model, + _criterion, + optimizer, + return_loss=True, + return_outputs=True) + # Backward and optimize + if is_pp_last_stage: + loss = outputs['loss'] + pbar.set_postfix({'loss': loss.item()}) + + optimizer.step() + optimizer.zero_grad() + lr_scheduler.step() + +# Train model +for epoch in range(NUM_EPOCHS): + train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, train_dataloader, booster, coordinator) ``` -We use `2` pipeline stages and the batch will be split into `4` micro batches. +We use `2` pipeline stages and the micro batches is 1. (these parameters can be configured to an appropriate value) diff --git a/docs/source/zh-Hans/features/pipeline_parallel.md b/docs/source/zh-Hans/features/pipeline_parallel.md index 1497dc399f6c..e688020556d8 100644 --- a/docs/source/zh-Hans/features/pipeline_parallel.md +++ b/docs/source/zh-Hans/features/pipeline_parallel.md @@ -1,14 +1,15 @@ # 流水并行 -作者: Guangyang Lu, Hongxin Liu, Yongbin Li +作者: Guangyang Lu, Hongxin Liu, Yongbin Li, Mingyan Jiang **前置教程** -- [定义配置文件](../basics/define_your_config.md) -- [在训练中使用Engine和Trainer](../basics/engine_trainer.md) -- [并行配置](../basics/configure_parallelization.md) +- [并行技术](../concepts/paradigms_of_parallelism.md) +- [Booster API](../basics/booster_api.md) +- [Shardformer](../features/shardformer.md) +- [Booster 插件](../basics/booster_plugins.md) **示例代码** -- [ColossalAI-Examples ResNet with pipeline](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/features/pipeline_parallel) +- [使用pipeline并行策略微调Bert](https://github.com/hpcaitech/ColossalAI/blob/main/examples/language/bert/finetune.py) **相关论文** - [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883) @@ -17,7 +18,7 @@ ## 快速预览 -在本教程中,你将学习如何使用流水并行。在 Colossal-AI 中, 我们使用 NVIDIA 推出的 1F1B 流水线。由于在本例中, 使用 ViT 和 ImageNet 太过庞大,因此我们使用 ResNet 和 CIFAR 为例. +在本教程中,你将学习如何使用流水并行。在 Colossal-AI 中, 我们使用 NVIDIA 推出的 1F1B 流水线。由于在本例中, 使用 ViT 和 ImageNet 太过庞大,因此我们使用 Bert 和 Glue数据集 为例. ## 目录 @@ -25,7 +26,7 @@ 1. 介绍 1F1B 流水线; 2. 使用非交错和交错 schedule; -3. 使用流水线训练 ResNet。 +3. 使用流水线微调 Bert ## 认识 1F1B 流水线 @@ -59,101 +60,154 @@ 这种模式既节省内存又节省时间。 -## 使用schedule +## Colossal-AI中的实现 -在 Colossal-AI 中, 我们提供非交错(`PipelineSchedule`) 和交错(`InterleavedPipelineSchedule`)schedule。 +在 Colossal-AI 中,流水线并行依赖于 `scheduler` 和 `Shardformer`。我们提供了非交错的(`OneForwardOneBackwardSchedule`)和交错的(`InterleavedSchedule`)两种调度方式。而 Shardformer 实现了对模型的层分割,并替换了模型的 `forward` 函数,使其与调度器兼容。 -你只需要在配置文件中,设置 `NUM_MICRO_BATCHES` 并在你想使用交错schedule的时候,设置 `NUM_CHUNKS`。 如果你确定性地知道每个管道阶段的输出张量的形状,而且形状都是一样的,你可以设置 `tensor_shape` 以进一步减少通信。否则,你可以忽略 `tensor_shape` , 形状将在管道阶段之间自动交换。 我们将会根据用户提供的配置文件,生成一个合适schedule来支持用户的流水并行训练。 +在 Colossal-AI 中,`HybridParallelPlugin` 封装了流水线执行策略。它管理流水线并行通信组和一个 `scheduler`。当使用此插件增强模型时,模型的层将通过调用 `shardformer.optimize` 函数进行分割,然后调用 `execute_pipeline` 使用 `scheduler` 来分别执行模型的各个部分。 `HybridParallelPlugin`暂时只支持`OneForwardOneBackwardSchedule`, `InterleavedSchedule`将会在不久后支持。 -## 使用流水线训练 ResNet +您可以通过设置 `HybridParallelPlugin` 的参数来自定义您的并行策略。更多使用细节请参考`HybridParallelPlugin`的[使用文档](../basics/booster_plugins.md)。 -我们首先用Colossal PipelinableContext方式建立 `ResNet` 模型: +## 使用流水线微调 Bert模型 + +首先我们定义好需要的训练组件,包括`model`, `dataloader`, `optimizer`, `lr_scheduler`, `criterion` 等: ```python -import os -from typing import Callable, List, Optional, Type, Union +import argparse +from typing import Callable, List, Union + import torch import torch.nn as nn +from data import GLUEDataBuilder +from torch.optim import Adam, Optimizer +from torch.optim.lr_scheduler import _LRScheduler as LRScheduler +from torch.utils.data import DataLoader +from tqdm import tqdm +from transformers import ( + AlbertForSequenceClassification, + AutoConfig, + BertForSequenceClassification, + get_linear_schedule_with_warmup, +) + import colossalai -import colossalai.nn as col_nn +from colossalai.booster import Booster +from colossalai.booster.plugin import HybridParallelPlugin +from colossalai.cluster import DistCoordinator +from colossalai.nn.optimizer import HybridAdam -from colossalai.core import global_context as gpc -from colossalai.logging import disable_existing_loggers, get_dist_logger -from colossalai.legacy.trainer import Trainer, hooks -from colossalai.utils import MultiTimer, get_dataloader -from colossalai.context import ParallelMode -from colossalai.pipeline.pipelinable import PipelinableContext +# Define some config +NUM_EPOCHS = 3 +BATCH_SIZE = 32 +LEARNING_RATE = 2.4e-5 +WEIGHT_DECAY = 0.01 +WARMUP_FRACTION = 0.1 + +coordinator = DistCoordinator() + +def move_to_cuda(batch): + return {k: v.cuda() for k, v in batch.items()} + +# Define 'criterion' function with two inputs, which will be passed to 'execute_pipeline'. +def _criterion(outputs, inputs): + return outputs.loss + +# Define optimizer +lr = LEARNING_RATE +no_decay = ["bias", "LayerNorm.weight"] +optimizer_grouped_parameters = [ + { + "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], + "weight_decay": WEIGHT_DECAY, + }, + { + "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, +] -from titans.dataloader.cifar10 import build_cifar -from torchvision.models import resnet50 -from torchvision.models.resnet import BasicBlock, Bottleneck, conv1x1 +optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, eps=1e-8) -# Define some config -BATCH_SIZE = 64 -NUM_EPOCHS = 2 -NUM_CHUNKS = 1 -CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2)) - -# Train -disable_existing_loggers() -parser = colossalai.get_default_parser() -args = parser.parse_args() -colossalai.launch_from_torch(backend=args.backend, config=CONFIG) -logger = get_dist_logger() -pipelinable = PipelinableContext() - -# build model -with pipelinable: - model = resnet50() + +# Define lr_scheduler +total_steps = len(train_dataloader) * NUM_EPOCHS +num_warmup_steps = int(WARMUP_FRACTION * total_steps) +lr_scheduler = get_linear_schedule_with_warmup( + optimizer, + num_warmup_steps=num_warmup_steps, + num_training_steps=total_steps, +) + + +# Define Bert model +model = BertForSequenceClassification.from_pretrained("bert-base-uncased", config=cfg).cuda() + +# Define a dataloader +data_builder = GLUEDataBuilder(model_name, + plugin, + args.task, + train_batch_size=BATCH_SIZE, + eval_batch_size=BATCH_SIZE) +train_dataloader = data_builder.train_dataloader() ``` -给定切分顺序,module直接给出name,部分函数需要手动添加。 +使用`HybridParallelPlugin`初始化一个booster. ```python -exec_seq = [ - 'conv1', 'bn1', 'relu', 'maxpool', 'layer1', 'layer2', 'layer3', 'layer4', 'avgpool', - (lambda x: torch.flatten(x, 1), "behind"), 'fc' -] -pipelinable.to_layer_list(exec_seq) +plugin = HybridParallelPlugin(tp_size=1, + pp_size=2, + num_microbatches=None, + microbatch_size=1, + enable_all_optimization=True, + zero_stage=1, + precision='fp16', + initial_scale=1) +booster = Booster(plugin=plugin) ``` -将模型切分成流水线阶段。 +使用`booster`将优化特性注入到训练组件中。 ```python -model = pipelinable.partition(NUM_CHUNKS, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE)) +model, optimizer, _criterion, _, lr_scheduler = booster.boost(model, + optimizer, + criterion=_criterion, + lr_scheduler=lr_scheduler) ``` -我们使用`Trainer`训练`ResNet`: +最后训练模型 ```python -# build criterion -criterion = nn.CrossEntropyLoss() - -# optimizer -optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) - -# build dataloader -root = os.environ.get('DATA', './data') -train_dataloader, test_dataloader = build_cifar(BATCH_SIZE, root, padding=4, crop=32, resize=32) - -lr_scheduler = col_nn.lr_scheduler.LinearWarmupLR(optimizer, NUM_EPOCHS, warmup_steps=1) -engine, train_dataloader, test_dataloader, lr_scheduler = colossalai.initialize(model, optimizer, criterion, - train_dataloader, test_dataloader, - lr_scheduler) -timer = MultiTimer() - -trainer = Trainer(engine=engine, timer=timer, logger=logger) - -hook_list = [ - hooks.LossHook(), - hooks.AccuracyHook(col_nn.metric.Accuracy()), - hooks.LogMetricByEpochHook(logger), - hooks.LRSchedulerHook(lr_scheduler, by_epoch=True) -] - -trainer.fit(train_dataloader=train_dataloader, - epochs=NUM_EPOCHS, - test_dataloader=test_dataloader, - test_interval=1, - hooks=hook_list, - display_progress=True) +# Define a train function +def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion: Callable, lr_scheduler: LRScheduler, + train_dataloader: DataLoader, booster: Booster, coordinator: DistCoordinator): + + is_pp_last_stage = booster.plugin.stage_manager.is_last_stage() + total_step = len(train_dataloader) + + model.train() + optimizer.zero_grad() + # convert train_dataloader to a iterator + train_dataloader_iter = iter(train_dataloader) + with tqdm(range(total_step), + desc=f'Epoch [{epoch + 1}/{NUM_EPOCHS}]', + disable=not (is_pp_last_stage)) as pbar: + # Forward pass + for _ in pbar: + outputs = booster.execute_pipeline(train_dataloader_iter, + model, + _criterion, + optimizer, + return_loss=True, + return_outputs=True) + # Backward and optimize + if is_pp_last_stage: + loss = outputs['loss'] + pbar.set_postfix({'loss': loss.item()}) + + optimizer.step() + optimizer.zero_grad() + lr_scheduler.step() + +# Train model +for epoch in range(NUM_EPOCHS): + train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, train_dataloader, booster, coordinator) ``` -我们使用 `2` 个流水段,并且 batch 将被切分为 `4` 个 micro batches。 +我们使用 `2` 个流水段,并且 batch 将被切分为 `1` 个 micro batches。(这些参数都可根据实际情况设置为合适的值)