Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable megatron core loggers for GPT pretraining #8354

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/nlp/language_modeling/conf/megatron_gpt_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ model:
## Network
sharp: False # Enable the use of SHARP for NCCL data-parallel communications. This is going to be ignored if the network doesn't support SHARP.

## Megatron timers
enable_megatron_timers: False
megatron_timer_kwargs:
log_every_n_steps: 10
log_mode: minmax
barrier: False

data:
# Path to data must be specified by the user.
# Supports List, String and Dictionary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ normalize_attention_scores: True # Whether to scale the output Q * K^T by 1 / sq
num_moe_experts: 1 # When >1, FFNs are changed to MoE layers
moe_frequency: 1 # every Nth ffn layer will be made MoE
moe_dropout: 0.0 # Dropout value for MoE layers
use_flash_attention: false # Use flash attention in self-attention module
use_flash_attention: false # Use flash attention in self-attention module
enable_megatron_timers: false # Megatron timers
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@

HAVE_MEGATRON_CORE = False

try:
from megatron.core import Timers

HAVE_MEGATRON_CORE_TIMERS = True
except (ImportError, ModuleNotFoundError):
HAVE_MEGATRON_CORE_TIMERS = False

__all__ = ["MegatronBaseModel"]


Expand Down Expand Up @@ -124,6 +131,17 @@ def __init__(self, cfg: DictConfig, trainer: Trainer, no_lm_init=True):
else torch.float32
)

self.megatron_timers = None
if self.cfg.get('enable_megatron_timers', False) and HAVE_MEGATRON_CORE_TIMERS:
self.megatron_timers_cfg = dict(self.cfg.get('megatron_timer_kwargs', dict()))
if 'log_every_n_steps' not in self.megatron_timers_cfg:
self.megatron_timers_cfg['log_every_n_steps'] = self.trainer.log_every_n_steps
if 'log_option' not in self.megatron_timers_cfg:
self.megatron_timers_cfg['log_option'] = 'minmax' # minmax, max, all
if 'barrier' not in self.megatron_timers_cfg:
self.megatron_timers_cfg['barrier'] = False
self.megatron_timers = Timers(log_level=2, log_option=self.megatron_timers_cfg['log_option'])

# set the megatron core model parallel config
self.model_parallel_config: ModelParallelConfig = self.build_model_parallel_config()

Expand Down Expand Up @@ -615,6 +633,13 @@ def sync_overlap_parameters(self, params=None):
def on_train_batch_end(self, outputs, dataloader_iter: Any, batch_idx: int, unused: Optional[int] = 0) -> None:
super().on_train_batch_end(outputs, dataloader_iter, batch_idx)

# Megatron Timers
if self.megatron_timers:
if self.global_step % self.megatron_timers_cfg["log_every_n_steps"] == 0:
logging.info(
"\n " + self.megatron_timers.get_all_timers_string(barrier=self.megatron_timers_cfg["barrier"])
)

# TODO: Replace with newer override for scheduler.step() instead of
# search for plugins for fp16 GradScalar
if self.trainer.precision_plugin is not None and isinstance(
Expand Down Expand Up @@ -1044,7 +1069,7 @@ def build_model_parallel_config(self) -> ModelParallelConfig:
and megatron_amp_O2, # NeMo does not currently support fp16 training with megatron amp O2, eval and inference is supported
"bf16": self.torch_dtype == torch.bfloat16 and megatron_amp_O2,
"params_dtype": self.params_dtype,
"timers": None, # NeMo does not currently support megatron core timers
"timers": self.megatron_timers,
"async_tensor_model_parallel_allreduce": self.cfg.get('tensor_model_parallel_world_size', 1) > 1
and not self.cfg.get('sequence_parallel', False),
"pipeline_dtype": pipeline_dtype,
Expand Down Expand Up @@ -1157,3 +1182,16 @@ def configure_sharded_model(self):
# Move the CPU-initialized model (with `use_cpu_initialization=True`) to GPU, which is to avoid
# out-of-memory carash before sharding. In case of GPU-initialized model, this is no-op.
self.model = self.model.cuda(torch.cuda.current_device())

def megatron_timer_start(self, name, log_level):
if self.megatron_timers:
self.megatron_timers(name, log_level).start(barrier=False)

def megatron_timer_stop(self, name):
if self.megatron_timers:
self.megatron_timers(name).stop()

def optimizer_step(self, *args, **kwargs):
self.megatron_timer_start('optimizer', log_level=1)
super().optimizer_step(*args, **kwargs)
self.megatron_timer_stop('optimizer')
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,11 @@ def training_step(self, dataloader_iter, batch_idx):

# when using sequence parallelism, the sequence parallel layernorm grads must be all-reduced
if self.cfg.get('tensor_model_parallel_size', 1) > 1 and self.cfg.get('sequence_parallel', False):
self.megatron_timer_start('allreduce_sequence_parallel_gradients', log_level=1)
self.allreduce_sequence_parallel_gradients()
self.megatron_timer_stop('allreduce_sequence_parallel_gradients')

self.megatron_timer_start('gradient_allreduce', log_level=1)
if self.use_fsdp:
# Reduce the gradients omitted from FSDP-sharding
self.allreduce_fsdp_sharding_omitted_gradients()
Expand All @@ -679,12 +682,15 @@ def training_step(self, dataloader_iter, batch_idx):
# async grad allreduce is not currently implemented for O1/autocasting mixed precision training
# so we all-reduce gradients after the pipeline
self.allreduce_gradients() # @sangkug we think this is causing memory to blow up (hurts perf)
self.megatron_timer_stop('gradient_allreduce')

if self.cfg.get('pipeline_model_parallel_size', 1) > 1 and self.cfg.get(
'share_embeddings_and_output_weights', True
):
self.megatron_timer_start('allreduce_first_last_embeddings', log_level=1)
# when using pipeline parallelism the first and last stage must keep embeddings in sync
self.allreduce_first_last_embeddings()
self.megatron_timer_stop('allreduce_first_last_embeddings')

## logging
if self.log_train_loss:
Expand Down
Loading