from timeit import default_timer
from pathlib import Path
from typing import Union
import sys
import torch
from torch.cuda import amp
from torch import nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
# Only import wandb and use if installed
wandb_available = False
try:
import wandb
wandb_available = True
except ModuleNotFoundError:
wandb_available = False
import neuralop.mpu.comm as comm
from neuralop.losses import LpLoss
from .training_state import load_training_state, save_training_state
[docs]
class Trainer:
"""
A general Trainer class to train neural-operators on given datasets
"""
def __init__(
self,
*,
model: nn.Module,
n_epochs: int,
wandb_log: bool=False,
device: str='cpu',
mixed_precision: bool=False,
data_processor: nn.Module=None,
eval_interval: int=1,
log_output: bool=False,
use_distributed: bool=False,
verbose: bool=False,
):
"""
Parameters
----------
model : nn.Module
n_epochs : int
wandb_log : bool, default is False
whether to log results to wandb
device : torch.device, or str 'cpu' or 'cuda'
mixed_precision : bool, default is False
whether to use torch.autocast to compute mixed precision
data_processor : DataProcessor class to transform data, default is None
if not None, data from the loaders is transform first with data_processor.preprocess,
then after getting an output from the model, that is transformed with data_processor.postprocess.
eval_interval : int, default is 1
how frequently to evaluate model and log training stats
log_output : bool, default is False
if True, and if wandb_log is also True, log output images to wandb
use_distributed : bool, default is False
whether to use DDP
verbose : bool, default is False
"""
self.model = model
self.n_epochs = n_epochs
# only log to wandb if a run is active
self.wandb_log = False
if wandb_available:
self.wandb_log = (wandb_log and wandb.run is not None)
self.eval_interval = eval_interval
self.log_output = log_output
self.verbose = verbose
self.use_distributed = use_distributed
self.device = device
# handle autocast device
if isinstance(self.device, torch.device):
self.autocast_device_type = self.device.type
else:
if "cuda" in self.device:
self.autocast_device_type = "cuda"
else:
self.autocast_device_type = "cpu"
self.mixed_precision = mixed_precision
self.data_processor = data_processor
# Track starting epoch for checkpointing/resuming
self.start_epoch = 0
[docs]
def train(
self,
train_loader,
test_loaders,
optimizer,
scheduler,
regularizer=None,
training_loss=None,
eval_losses=None,
save_every: int=None,
save_best: int=None,
save_dir: Union[str, Path]="./ckpt",
resume_from_dir: Union[str, Path]=None,
):
"""Trains the given model on the given dataset.
If a device is provided, the model and data processor are loaded to device here.
Parameters
-----------
train_loader: torch.utils.data.DataLoader
training dataloader
test_loaders: dict[torch.utils.data.DataLoader]
testing dataloaders
optimizer: torch.optim.Optimizer
optimizer to use during training
optimizer: torch.optim.lr_scheduler
learning rate scheduler to use during training
training_loss: training.losses function
cost function to minimize
eval_losses: dict[Loss]
dict of losses to use in self.eval()
save_every: int, optional, default is None
if provided, interval at which to save checkpoints
save_best: str, optional, default is None
if provided, key of metric f"{loader_name}_{loss_name}"
to monitor and save model with best eval result
Overrides save_every and saves on eval_interval
save_dir: str | Path, default "./ckpt"
directory at which to save training states if
save_every and/or save_best is provided
resume_from_dir: str | Path, default None
if provided, resumes training state (model,
optimizer, regularizer, scheduler) from state saved in
`resume_from_dir`
Returns
-------
all_metrics: dict
dictionary keyed f"{loader_name}_{loss_name}"
of metric results for last validation epoch across
all test_loaders
"""
self.optimizer = optimizer
self.scheduler = scheduler
if regularizer:
self.regularizer = regularizer
else:
self.regularizer = None
if training_loss is None:
training_loss = LpLoss(d=2)
if eval_losses is None: # By default just evaluate on the training loss
eval_losses = dict(l2=training_loss)
# accumulated wandb metrics
self.wandb_epoch_metrics = None
# attributes for checkpointing
self.save_every = save_every
self.save_best = save_best
if resume_from_dir is not None:
self.resume_state_from_dir(resume_from_dir)
# Load model and data_processor to device
self.model = self.model.to(self.device)
if self.use_distributed and dist.is_initialized():
device_id = dist.get_rank()
self.model = DDP(self.model, device_ids=[device_id], output_device=device_id)
if self.data_processor is not None:
self.data_processor = self.data_processor.to(self.device)
# ensure save_best is a metric we collect
if self.save_best is not None:
metrics = []
for name in test_loaders.keys():
for metric in eval_losses.keys():
metrics.append(f"{name}_{metric}")
assert self.save_best in metrics,\
f"Error: expected a metric of the form <loader_name>_<metric>, got {save_best}"
best_metric_value = float('inf')
# either monitor metric or save on interval, exclusive for simplicity
self.save_every = None
if self.verbose:
print(f'Training on {len(train_loader.dataset)} samples')
print(f'Testing on {[len(loader.dataset) for loader in test_loaders.values()]} samples'
f' on resolutions {[name for name in test_loaders]}.')
sys.stdout.flush()
for epoch in range(self.start_epoch, self.n_epochs):
train_err, avg_loss, avg_lasso_loss, epoch_train_time =\
self.train_one_epoch(epoch, train_loader, training_loss)
epoch_metrics = dict(
train_err=train_err,
avg_loss=avg_loss,
avg_lasso_loss=avg_lasso_loss,
epoch_train_time=epoch_train_time
)
if epoch % self.eval_interval == 0:
# evaluate and gather metrics across each loader in test_loaders
eval_metrics = self.evaluate_all(epoch=epoch,
eval_losses=eval_losses,
test_loaders=test_loaders)
epoch_metrics.update(**eval_metrics)
# save checkpoint if conditions are met
if save_best is not None:
if eval_metrics[save_best] < best_metric_value:
best_metric_value = eval_metrics[save_best]
self.checkpoint(save_dir)
# save checkpoint if save_every and save_best is not set
if self.save_every is not None:
if epoch % self.save_every == 0:
self.checkpoint(save_dir)
return epoch_metrics
[docs]
def train_one_epoch(self, epoch, train_loader, training_loss):
"""train_one_epoch trains self.model on train_loader
for one epoch and returns training metrics
Parameters
----------
epoch : int
epoch number
train_loader : torch.utils.data.DataLoader
data loader of train examples
test_loaders : dict
dict of test torch.utils.data.DataLoader objects
Returns
-------
all_errors
dict of all eval metrics for the last epoch
"""
self.on_epoch_start(epoch)
avg_loss = 0
avg_lasso_loss = 0
self.model.train()
if self.data_processor:
self.data_processor.train()
t1 = default_timer()
train_err = 0.0
# track number of training examples in batch
self.n_samples = 0
for idx, sample in enumerate(train_loader):
loss = self.train_one_batch(idx, sample, training_loss)
loss.backward()
self.optimizer.step()
train_err += loss.item()
with torch.no_grad():
avg_loss += loss.item()
if self.regularizer:
avg_lasso_loss += self.regularizer.loss
if isinstance(self.scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
self.scheduler.step(train_err)
else:
self.scheduler.step()
epoch_train_time = default_timer() - t1
train_err /= len(train_loader)
avg_loss /= self.n_samples
if self.regularizer:
avg_lasso_loss /= self.n_samples
else:
avg_lasso_loss = None
lr = None
for pg in self.optimizer.param_groups:
lr = pg["lr"]
if self.verbose and epoch % self.eval_interval == 0:
self.log_training(
epoch=epoch,
time=epoch_train_time,
avg_loss=avg_loss,
train_err=train_err,
avg_lasso_loss=avg_lasso_loss,
lr=lr
)
return train_err, avg_loss, avg_lasso_loss, epoch_train_time
def evaluate_all(self, epoch, eval_losses, test_loaders):
# evaluate and gather metrics across each loader in test_loaders
all_metrics = {}
for loader_name, loader in test_loaders.items():
loader_metrics = self.evaluate(eval_losses, loader,
log_prefix=loader_name)
all_metrics.update(**loader_metrics)
if self.verbose:
self.log_eval(epoch=epoch,
eval_metrics=all_metrics)
return all_metrics
[docs]
def evaluate(self, loss_dict, data_loader, log_prefix="", epoch=None):
"""Evaluates the model on a dictionary of losses
Parameters
----------
loss_dict : dict of functions
each function takes as input a tuple (prediction, ground_truth)
and returns the corresponding loss
data_loader : data_loader to evaluate on
log_prefix : str, default is ''
if not '', used as prefix in output dictionary
epoch : int | None
current epoch. Used when logging both train and eval
default None
Returns
-------
errors : dict
dict[f'{log_prefix}_{loss_name}] = loss for loss in loss_dict
"""
# Ensure model and data processor are loaded to the proper device
self.model = self.model.to(self.device)
if self.data_processor is not None and self.data_processor.device != self.device:
self.data_processor = self.data_processor.to(self.device)
self.model.eval()
if self.data_processor:
self.data_processor.eval()
errors = {f"{log_prefix}_{loss_name}": 0 for loss_name in loss_dict.keys()}
self.n_samples = 0
with torch.no_grad():
for idx, sample in enumerate(data_loader):
return_output = False
if idx == len(data_loader) - 1:
return_output = True
eval_step_losses, outs = self.eval_one_batch(sample, loss_dict, return_output=return_output)
for loss_name, val_loss in eval_step_losses.items():
errors[f"{log_prefix}_{loss_name}"] += val_loss
for key in errors.keys():
errors[key] /= self.n_samples
# on last batch, log model outputs
if self.log_output:
errors[f"{log_prefix}_outputs"] = wandb.Image(outs)
return errors
[docs]
def on_epoch_start(self, epoch):
"""on_epoch_start runs at the beginning
of each training epoch. This method is a stub
that can be overwritten in more complex cases.
Parameters
----------
epoch : int
index of epoch
Returns
-------
None
"""
self.epoch = epoch
return None
[docs]
def train_one_batch(self, idx, sample, training_loss):
"""Run one batch of input through model
and return training loss on outputs
Parameters
----------
idx : int
index of batch within train_loader
sample : dict
data dictionary holding one batch
Returns
-------
loss: float | Tensor
float value of training loss
"""
self.optimizer.zero_grad(set_to_none=True)
if self.regularizer:
self.regularizer.reset()
if self.data_processor is not None:
sample = self.data_processor.preprocess(sample)
else:
# load data to device if no preprocessor exists
sample = {
k: v.to(self.device)
for k, v in sample.items()
if torch.is_tensor(v)
}
self.n_samples += sample["y"].shape[0]
if self.mixed_precision:
with torch.autocast(device_type=self.autocast_device_type):
out = self.model(**sample)
else:
out = self.model(**sample)
if self.epoch == 0 and idx == 0 and self.verbose:
print(f"Raw outputs of shape {out.shape}")
if self.data_processor is not None:
out, sample = self.data_processor.postprocess(out, sample)
loss = 0.0
if self.mixed_precision:
with torch.autocast(device_type=self.autocast_device_type):
loss += training_loss(out, **sample)
else:
loss += training_loss(out, **sample)
if self.regularizer:
loss += self.regularizer.loss
return loss
[docs]
def eval_one_batch(self,
sample: dict,
eval_losses: dict,
return_output: bool=False):
"""eval_one_batch runs inference on one batch
and returns eval_losses for that batch.
Parameters
----------
sample : dict
data batch dictionary
eval_losses : dict
dictionary of named eval metrics
return_outputs : bool
whether to return model outputs for plotting
by default False
Returns
-------
eval_step_losses : dict
keyed "loss_name": step_loss_value for each loss name
outputs: torch.Tensor | None
optionally returns batch outputs
"""
if self.data_processor is not None:
sample = self.data_processor.preprocess(sample)
else:
# load data to device if no preprocessor exists
sample = {
k: v.to(self.device)
for k, v in sample.items()
if torch.is_tensor(v)
}
self.n_samples += sample["y"].size(0)
out = self.model(**sample)
if self.data_processor is not None:
out, sample = self.data_processor.postprocess(out, sample)
eval_step_losses = {}
for loss_name, loss in eval_losses.items():
val_loss = loss(out, **sample)
eval_step_losses[loss_name] = val_loss
if return_output:
return eval_step_losses, out
else:
return eval_step_losses, None
[docs]
def log_training(self,
epoch:int,
time: float,
avg_loss: float,
train_err: float,
avg_lasso_loss: float=None,
lr: float=None
):
"""Basic method to log results
from a single training epoch.
Parameters
----------
epoch: int
time: float
training time of epoch
avg_loss: float
average train_err per individual sample
train_err: float
train error for entire epoch
avg_lasso_loss: float
average lasso loss from regularizer, optional
lr: float
learning rate at current epoch
"""
# accumulate info to log to wandb
if self.wandb_log:
values_to_log = dict(
train_err=train_err,
time=time,
avg_loss=avg_loss,
avg_lasso_loss=avg_lasso_loss,
lr=lr)
msg = f"[{epoch}] time={time:.2f}, "
msg += f"avg_loss={avg_loss:.4f}, "
msg += f"train_err={train_err:.4f}"
if avg_lasso_loss is not None:
msg += f", avg_lasso={avg_lasso_loss:.4f}"
print(msg)
sys.stdout.flush()
if self.wandb_log:
wandb.log(data=values_to_log,
step=epoch+1,
commit=False)
[docs]
def log_eval(self,
epoch: int,
eval_metrics: dict):
"""log_eval logs outputs from evaluation
on all test loaders to stdout and wandb
Parameters
----------
epoch : int
current training epoch
eval_metrics : dict
metrics collected during evaluation
keyed f"{test_loader_name}_{metric}" for each test_loader
"""
values_to_log = {}
msg = ""
for metric, value in eval_metrics.items():
if isinstance(value, float) or isinstance(value, torch.Tensor):
msg += f"{metric}={value:.4f}, "
if self.wandb_log:
values_to_log[metric] = value
msg = f"Eval: " + msg[:-2] # cut off last comma+space
print(msg)
sys.stdout.flush()
if self.wandb_log:
wandb.log(data=values_to_log,
step=epoch+1,
commit=True)
[docs]
def resume_state_from_dir(self, save_dir):
"""
Resume training from save_dir created by `neuralop.training.save_training_state`
Params
------
save_dir: Union[str, Path]
directory in which training state is saved
(see neuralop.training.training_state)
"""
if isinstance(save_dir, str):
save_dir = Path(save_dir)
print(f"{save_dir=} {type(save_dir)}")
# check for save model exists
if (save_dir / "best_model_state_dict.pt").exists():
save_name = "best_model"
elif (save_dir / "model_state_dict.pt").exists():
save_name = "model"
else:
raise FileNotFoundError("Error: resume_from_dir expects a model\
state dict named model.pt or best_model.pt.")
# returns model, loads other modules if provided
self.model, self.optimizer, self.scheduler, self.regularizer, resume_epoch =\
load_training_state(save_dir=save_dir, save_name=save_name,
model=self.model,
optimizer=self.optimizer,
regularizer=self.regularizer,
scheduler=self.scheduler)
if resume_epoch is not None:
if resume_epoch > self.start_epoch:
self.start_epoch = resume_epoch
if self.verbose:
print(f"Trainer resuming from epoch {resume_epoch}")
[docs]
def checkpoint(self, save_dir):
"""checkpoint saves current training state
to a directory for resuming later. Only saves
training state on the first GPU.
See neuralop.training.training_state
Parameters
----------
save_dir : str | Path
directory in which to save training state
"""
if comm.get_local_rank() == 0:
if self.save_best is not None:
save_name = 'best_model'
else:
save_name = "model"
save_training_state(save_dir=save_dir,
save_name=save_name,
model=self.model,
optimizer=self.optimizer,
scheduler=self.scheduler,
regularizer=self.regularizer,
epoch=self.epoch
)
if self.verbose:
print(f"[Rank 0]: saved training state to {save_dir}")