Skip to content

Commit

Permalink
Add ODS logging to all runners
Browse files Browse the repository at this point in the history
Summary:
X-link: facebookresearch/detectron2#5050

Pull Request resolved: #606

Allow attaching a monitoring service to the training loop.

Reviewed By: miqueljubert

Differential Revision: D47595332

fbshipit-source-id: 49d770207aeea56113c008fcd29ad7b545cec849
  • Loading branch information
Francisc Bungiu authored and facebook-github-bot committed Aug 7, 2023
1 parent 94c7f64 commit e82635e
Showing 1 changed file with 115 additions and 109 deletions.
224 changes: 115 additions & 109 deletions d2go/runner/default_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from collections import OrderedDict
from functools import lru_cache
from typing import List, Optional, Type, Union
from typing import Any, List, Optional, Type, Union

import detectron2.utils.comm as comm
import torch
Expand Down Expand Up @@ -168,6 +168,11 @@ def prepare_fb_model(cfg: CfgNode, model: torch.nn.Module) -> torch.nn.Module:
return model


@fb_overwritable()
def get_monitoring_service() -> Any:
pass


class BaseRunner(object):
def __init__(self):
identifier = f"D2Go.Runner.{self.__class__.__name__}"
Expand Down Expand Up @@ -529,123 +534,124 @@ def _get_trainer_hooks(
]

def do_train(self, cfg, model, resume):
# Note that flops at the beginning of training is often inaccurate,
# if a model has input-dependent logic
attach_profilers(cfg, model)
with get_monitoring_service():
# Note that flops at the beginning of training is often inaccurate,
# if a model has input-dependent logic
attach_profilers(cfg, model)

if cfg.NUMA_BINDING is True:
import numa

num_gpus_per_node = comm.get_local_size()
num_sockets = numa.get_max_node() + 1
socket_id = torch.cuda.current_device() // (
max(num_gpus_per_node // num_sockets, 1)
)
node_mask = set([socket_id])
numa.bind(node_mask)

if cfg.NUMA_BINDING is True:
import numa
optimizer = self.build_optimizer(cfg, model)
scheduler = self.build_lr_scheduler(cfg, optimizer)

num_gpus_per_node = comm.get_local_size()
num_sockets = numa.get_max_node() + 1
socket_id = torch.cuda.current_device() // (
max(num_gpus_per_node // num_sockets, 1)
checkpointer = self.build_checkpointer(
cfg,
model,
save_dir=cfg.OUTPUT_DIR,
load_ckpt_to_gpu=cfg.LOAD_CKPT_TO_GPU,
optimizer=optimizer,
scheduler=scheduler,
)
node_mask = set([socket_id])
numa.bind(node_mask)

optimizer = self.build_optimizer(cfg, model)
scheduler = self.build_lr_scheduler(cfg, optimizer)

checkpointer = self.build_checkpointer(
cfg,
model,
save_dir=cfg.OUTPUT_DIR,
load_ckpt_to_gpu=cfg.LOAD_CKPT_TO_GPU,
optimizer=optimizer,
scheduler=scheduler,
)
checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume)
start_iter = (
checkpoint.get("iteration", -1)
if resume and checkpointer.has_checkpoint()
else -1
)
del checkpoint
# The checkpoint stores the training iteration that just finished, thus we start
# at the next iteration (or iter zero if there's no checkpoint).
start_iter += 1
max_iter = cfg.SOLVER.MAX_ITER
periodic_checkpointer = PeriodicCheckpointer(
checkpointer, cfg.SOLVER.CHECKPOINT_PERIOD, max_iter=max_iter
)

data_loader = self.build_detection_train_loader(cfg)

def _get_model_with_abnormal_checker(model):
if not cfg.ABNORMAL_CHECKER.ENABLED:
return model

tbx_writer = self.get_tbx_writer(cfg)
writers = get_writers(cfg, tbx_writer)
checker = AbnormalLossChecker(start_iter, writers)
ret = AbnormalLossCheckerWrapper(model, checker)
return ret

if cfg.SOLVER.AMP.ENABLED:
trainer = AMPTrainer(
_get_model_with_abnormal_checker(model),
data_loader,
optimizer,
gather_metric_period=cfg.GATHER_METRIC_PERIOD,
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
grad_scaler=get_grad_scaler(cfg),
precision=parse_precision_from_string(
cfg.SOLVER.AMP.PRECISION, lightning=False
),
log_grad_scaler=cfg.SOLVER.AMP.LOG_GRAD_SCALER,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
checkpoint = checkpointer.resume_or_load(cfg.MODEL.WEIGHTS, resume=resume)
start_iter = (
checkpoint.get("iteration", -1)
if resume and checkpointer.has_checkpoint()
else -1
)
else:
trainer = SimpleTrainer(
_get_model_with_abnormal_checker(model),
data_loader,
optimizer,
gather_metric_period=cfg.GATHER_METRIC_PERIOD,
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
del checkpoint
# The checkpoint stores the training iteration that just finished, thus we start
# at the next iteration (or iter zero if there's no checkpoint).
start_iter += 1
max_iter = cfg.SOLVER.MAX_ITER
periodic_checkpointer = PeriodicCheckpointer(
checkpointer, cfg.SOLVER.CHECKPOINT_PERIOD, max_iter=max_iter
)

if cfg.SOLVER.AMP.ENABLED and torch.cuda.is_available():
# Allow to use the TensorFloat32 (TF32) tensor cores, available on A100 GPUs.
# For more details https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere.
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
data_loader = self.build_detection_train_loader(cfg)

trainer_hooks = self._get_trainer_hooks(
cfg, model, optimizer, scheduler, periodic_checkpointer, trainer
)
def _get_model_with_abnormal_checker(model):
if not cfg.ABNORMAL_CHECKER.ENABLED:
return model

if comm.is_main_process():
assert (
cfg.GATHER_METRIC_PERIOD <= cfg.WRITER_PERIOD
and cfg.WRITER_PERIOD % cfg.GATHER_METRIC_PERIOD == 0
), "WRITER_PERIOD needs to be divisible by GATHER_METRIC_PERIOD"
tbx_writer = self.get_tbx_writer(cfg)
writers = [
CommonMetricPrinter(max_iter, window_size=cfg.WRITER_PERIOD),
JSONWriter(
os.path.join(cfg.OUTPUT_DIR, "metrics.json"),
window_size=cfg.WRITER_PERIOD,
),
tbx_writer,
]
trainer_hooks.append(hooks.PeriodicWriter(writers, cfg.WRITER_PERIOD))
update_hooks_from_registry(trainer_hooks, cfg)
trainer.register_hooks(trainer_hooks)
trainer.train(start_iter, max_iter)

if hasattr(self, "original_cfg"):
table = get_cfg_diff_table(cfg, self.original_cfg)
logger.info(
"GeneralizeRCNN Runner ignoring training config change: \n" + table
tbx_writer = self.get_tbx_writer(cfg)
writers = get_writers(cfg, tbx_writer)
checker = AbnormalLossChecker(start_iter, writers)
ret = AbnormalLossCheckerWrapper(model, checker)
return ret

if cfg.SOLVER.AMP.ENABLED:
trainer = AMPTrainer(
_get_model_with_abnormal_checker(model),
data_loader,
optimizer,
gather_metric_period=cfg.GATHER_METRIC_PERIOD,
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
grad_scaler=get_grad_scaler(cfg),
precision=parse_precision_from_string(
cfg.SOLVER.AMP.PRECISION, lightning=False
),
log_grad_scaler=cfg.SOLVER.AMP.LOG_GRAD_SCALER,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
)
else:
trainer = SimpleTrainer(
_get_model_with_abnormal_checker(model),
data_loader,
optimizer,
gather_metric_period=cfg.GATHER_METRIC_PERIOD,
zero_grad_before_forward=cfg.ZERO_GRAD_BEFORE_FORWARD,
async_write_metrics=cfg.ASYNC_WRITE_METRICS,
)

if cfg.SOLVER.AMP.ENABLED and torch.cuda.is_available():
# Allow to use the TensorFloat32 (TF32) tensor cores, available on A100 GPUs.
# For more details https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere.
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

trainer_hooks = self._get_trainer_hooks(
cfg, model, optimizer, scheduler, periodic_checkpointer, trainer
)
trained_cfg = self.original_cfg.clone()
else:
trained_cfg = cfg.clone()
with temp_defrost(trained_cfg):
trained_cfg.MODEL.WEIGHTS = checkpointer.get_checkpoint_file()
return {"model_final": trained_cfg}

if comm.is_main_process():
assert (
cfg.GATHER_METRIC_PERIOD <= cfg.WRITER_PERIOD
and cfg.WRITER_PERIOD % cfg.GATHER_METRIC_PERIOD == 0
), "WRITER_PERIOD needs to be divisible by GATHER_METRIC_PERIOD"
tbx_writer = self.get_tbx_writer(cfg)
writers = [
CommonMetricPrinter(max_iter, window_size=cfg.WRITER_PERIOD),
JSONWriter(
os.path.join(cfg.OUTPUT_DIR, "metrics.json"),
window_size=cfg.WRITER_PERIOD,
),
tbx_writer,
]
trainer_hooks.append(hooks.PeriodicWriter(writers, cfg.WRITER_PERIOD))
update_hooks_from_registry(trainer_hooks, cfg)
trainer.register_hooks(trainer_hooks)
trainer.train(start_iter, max_iter)

if hasattr(self, "original_cfg"):
table = get_cfg_diff_table(cfg, self.original_cfg)
logger.info(
"GeneralizeRCNN Runner ignoring training config change: \n" + table
)
trained_cfg = self.original_cfg.clone()
else:
trained_cfg = cfg.clone()
with temp_defrost(trained_cfg):
trained_cfg.MODEL.WEIGHTS = checkpointer.get_checkpoint_file()
return {"model_final": trained_cfg}

@staticmethod
def get_evaluator(cfg, dataset_name, output_folder):
Expand Down

0 comments on commit e82635e

Please sign in to comment.