From e82635eb896f444ce8d86d47f442e6a8dbd01470 Mon Sep 17 00:00:00 2001 From: Francisc Bungiu Date: Mon, 7 Aug 2023 04:47:37 -0700 Subject: [PATCH] Add ODS logging to all runners Summary: X-link: https://github.com/facebookresearch/detectron2/pull/5050 Pull Request resolved: https://github.com/facebookresearch/d2go/pull/606 Allow attaching a monitoring service to the training loop. Reviewed By: miqueljubert Differential Revision: D47595332 fbshipit-source-id: 49d770207aeea56113c008fcd29ad7b545cec849 --- d2go/runner/default_runner.py | 224 +++++++++++++++++----------------- 1 file changed, 115 insertions(+), 109 deletions(-) diff --git a/d2go/runner/default_runner.py b/d2go/runner/default_runner.py index 995ad2a4..fa48cf94 100644 --- a/d2go/runner/default_runner.py +++ b/d2go/runner/default_runner.py @@ -6,7 +6,7 @@ import os from collections import OrderedDict from functools import lru_cache -from typing import List, Optional, Type, Union +from typing import Any, List, Optional, Type, Union import detectron2.utils.comm as comm import torch @@ -168,6 +168,11 @@ def prepare_fb_model(cfg: CfgNode, model: torch.nn.Module) -> torch.nn.Module: return model +@fb_overwritable() +def get_monitoring_service() -> Any: + pass + + class BaseRunner(object): def __init__(self): identifier = f"D2Go.Runner.{self.__class__.__name__}" @@ -529,123 +534,124 @@ def _get_trainer_hooks( ] def do_train(self, cfg, model, resume): - # Note that flops at the beginning of training is often inaccurate, - # if a model has input-dependent logic - attach_profilers(cfg, model) + with get_monitoring_service(): + # Note that flops at the beginning of training is often inaccurate, + # if a model has input-dependent logic + attach_profilers(cfg, model) + + if cfg.NUMA_BINDING is True: + import numa + + num_gpus_per_node = comm.get_local_size() + num_sockets = numa.get_max_node() + 1 + socket_id = torch.cuda.current_device() // ( + max(num_gpus_per_node // num_sockets, 1) + ) + node_mask = set([socket_id]) + numa.bind(node_mask) - if cfg.NUMA_BINDING is True: - import numa + optimizer = self.build_optimizer(cfg, model) + scheduler = self.build_lr_scheduler(cfg, optimizer) - num_gpus_per_node = comm.get_local_size() - num_sockets = numa.get_max_node() + 1 - socket_id = torch.cuda.current_device() // ( - max(num_gpus_per_node // num_sockets, 1) + checkpointer = self.build_checkpointer( + cfg, + model, + save_dir=cfg.OUTPUT_DIR, + load_ckpt_to_gpu=cfg.LOAD_CKPT_TO_GPU, + optimizer=optimizer, + scheduler=scheduler, ) - node_mask = set([socket_id]) - numa.bind(node_mask) - - optimizer = self.build_optimizer(cfg, model) - scheduler = self.build_lr_scheduler(cfg, optimizer) - - checkpointer = self.build_checkpointer( - cfg, - model, - save_dir=cfg.OUTPUT_DIR, - load_ckpt_to_gpu=cfg.LOAD_CKPT_TO_GPU, - optimizer=optimizer, - scheduler=scheduler, - ) - checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume) - start_iter = ( - checkpoint.get("iteration", -1) - if resume and checkpointer.has_checkpoint() - else -1 - ) - del checkpoint - # The checkpoint stores the training iteration that just finished, thus we start - # at the next iteration (or iter zero if there's no checkpoint). - start_iter += 1 - max_iter = cfg.SOLVER.MAX_ITER - periodic_checkpointer = PeriodicCheckpointer( - checkpointer, cfg.SOLVER.CHECKPOINT_PERIOD, max_iter=max_iter - ) - - data_loader = self.build_detection_train_loader(cfg) - - def _get_model_with_abnormal_checker(model): - if not cfg.ABNORMAL_CHECKER.ENABLED: - return model - - tbx_writer = self.get_tbx_writer(cfg) - writers = get_writers(cfg, tbx_writer) - checker = AbnormalLossChecker(start_iter, writers) - ret = AbnormalLossCheckerWrapper(model, checker) - return ret - - if cfg.SOLVER.AMP.ENABLED: - trainer = AMPTrainer( - _get_model_with_abnormal_checker(model), - data_loader, - optimizer, - gather_metric_period=cfg.GATHER_METRIC_PERIOD, - zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD, - grad_scaler=get_grad_scaler(cfg), - precision=parse_precision_from_string( - cfg.SOLVER.AMP.PRECISION, lightning=False - ), - log_grad_scaler=cfg.SOLVER.AMP.LOG_GRAD_SCALER, - async_write_metrics=cfg.ASYNC_WRITE_METRICS, + checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume) + start_iter = ( + checkpoint.get("iteration", -1) + if resume and checkpointer.has_checkpoint() + else -1 ) - else: - trainer = SimpleTrainer( - _get_model_with_abnormal_checker(model), - data_loader, - optimizer, - gather_metric_period=cfg.GATHER_METRIC_PERIOD, - zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD, - async_write_metrics=cfg.ASYNC_WRITE_METRICS, + del checkpoint + # The checkpoint stores the training iteration that just finished, thus we start + # at the next iteration (or iter zero if there's no checkpoint). + start_iter += 1 + max_iter = cfg.SOLVER.MAX_ITER + periodic_checkpointer = PeriodicCheckpointer( + checkpointer, cfg.SOLVER.CHECKPOINT_PERIOD, max_iter=max_iter ) - if cfg.SOLVER.AMP.ENABLED and torch.cuda.is_available(): - # Allow to use the TensorFloat32 (TF32) tensor cores, available on A100 GPUs. - # For more details https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere. - torch.backends.cuda.matmul.allow_tf32 = True - torch.backends.cudnn.allow_tf32 = True + data_loader = self.build_detection_train_loader(cfg) - trainer_hooks = self._get_trainer_hooks( - cfg, model, optimizer, scheduler, periodic_checkpointer, trainer - ) + def _get_model_with_abnormal_checker(model): + if not cfg.ABNORMAL_CHECKER.ENABLED: + return model - if comm.is_main_process(): - assert ( - cfg.GATHER_METRIC_PERIOD <= cfg.WRITER_PERIOD - and cfg.WRITER_PERIOD % cfg.GATHER_METRIC_PERIOD == 0 - ), "WRITER_PERIOD needs to be divisible by GATHER_METRIC_PERIOD" - tbx_writer = self.get_tbx_writer(cfg) - writers = [ - CommonMetricPrinter(max_iter, window_size=cfg.WRITER_PERIOD), - JSONWriter( - os.path.join(cfg.OUTPUT_DIR, "metrics.json"), - window_size=cfg.WRITER_PERIOD, - ), - tbx_writer, - ] - trainer_hooks.append(hooks.PeriodicWriter(writers, cfg.WRITER_PERIOD)) - update_hooks_from_registry(trainer_hooks, cfg) - trainer.register_hooks(trainer_hooks) - trainer.train(start_iter, max_iter) - - if hasattr(self, "original_cfg"): - table = get_cfg_diff_table(cfg, self.original_cfg) - logger.info( - "GeneralizeRCNN Runner ignoring training config change: \n" + table + tbx_writer = self.get_tbx_writer(cfg) + writers = get_writers(cfg, tbx_writer) + checker = AbnormalLossChecker(start_iter, writers) + ret = AbnormalLossCheckerWrapper(model, checker) + return ret + + if cfg.SOLVER.AMP.ENABLED: + trainer = AMPTrainer( + _get_model_with_abnormal_checker(model), + data_loader, + optimizer, + gather_metric_period=cfg.GATHER_METRIC_PERIOD, + zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD, + grad_scaler=get_grad_scaler(cfg), + precision=parse_precision_from_string( + cfg.SOLVER.AMP.PRECISION, lightning=False + ), + log_grad_scaler=cfg.SOLVER.AMP.LOG_GRAD_SCALER, + async_write_metrics=cfg.ASYNC_WRITE_METRICS, + ) + else: + trainer = SimpleTrainer( + _get_model_with_abnormal_checker(model), + data_loader, + optimizer, + gather_metric_period=cfg.GATHER_METRIC_PERIOD, + zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD, + async_write_metrics=cfg.ASYNC_WRITE_METRICS, + ) + + if cfg.SOLVER.AMP.ENABLED and torch.cuda.is_available(): + # Allow to use the TensorFloat32 (TF32) tensor cores, available on A100 GPUs. + # For more details https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere. + torch.backends.cuda.matmul.allow_tf32 = True + torch.backends.cudnn.allow_tf32 = True + + trainer_hooks = self._get_trainer_hooks( + cfg, model, optimizer, scheduler, periodic_checkpointer, trainer ) - trained_cfg = self.original_cfg.clone() - else: - trained_cfg = cfg.clone() - with temp_defrost(trained_cfg): - trained_cfg.MODEL.WEIGHTS = checkpointer.get_checkpoint_file() - return {"model_final": trained_cfg} + + if comm.is_main_process(): + assert ( + cfg.GATHER_METRIC_PERIOD <= cfg.WRITER_PERIOD + and cfg.WRITER_PERIOD % cfg.GATHER_METRIC_PERIOD == 0 + ), "WRITER_PERIOD needs to be divisible by GATHER_METRIC_PERIOD" + tbx_writer = self.get_tbx_writer(cfg) + writers = [ + CommonMetricPrinter(max_iter, window_size=cfg.WRITER_PERIOD), + JSONWriter( + os.path.join(cfg.OUTPUT_DIR, "metrics.json"), + window_size=cfg.WRITER_PERIOD, + ), + tbx_writer, + ] + trainer_hooks.append(hooks.PeriodicWriter(writers, cfg.WRITER_PERIOD)) + update_hooks_from_registry(trainer_hooks, cfg) + trainer.register_hooks(trainer_hooks) + trainer.train(start_iter, max_iter) + + if hasattr(self, "original_cfg"): + table = get_cfg_diff_table(cfg, self.original_cfg) + logger.info( + "GeneralizeRCNN Runner ignoring training config change: \n" + table + ) + trained_cfg = self.original_cfg.clone() + else: + trained_cfg = cfg.clone() + with temp_defrost(trained_cfg): + trained_cfg.MODEL.WEIGHTS = checkpointer.get_checkpoint_file() + return {"model_final": trained_cfg} @staticmethod def get_evaluator(cfg, dataset_name, output_folder):