Source code for neuralop.layers.local_fno_block

from typing import List, Optional, Union
import warnings

import torch
from torch import nn
import torch.nn.functional as F

from .channel_mlp import ChannelMLP
from .fno_block import SubModule
from .differential_conv import FiniteDifferenceConvolution
from .discrete_continuous_convolution import EquidistantDiscreteContinuousConv2d
from .normalization_layers import AdaIN, InstanceNorm
from .skip_connections import skip_connection
from .spectral_convolution import SpectralConv
from ..utils import validate_scaling_factor


Number = Union[int, float]


[docs] class LocalFNOBlocks(nn.Module): """LocalFNOBlocks implements a sequence of Fourier layers, the operations of which are first described in [1]_. The exact implementation details of the Fourier layer architecture are discussed in [2]_. The Fourier layers are placed in parallel with differential kernel layers and local integral layers, as described in [3]_. Parameters ---------- in_channels : int input channels to Fourier layers out_channels : int output channels after Fourier layers n_modes : int, List[int] number of modes to keep along each dimension in frequency space. Can either be specified as an int (for all dimensions) or an iterable with one number per dimension default_in_shape : Tuple[int] Default input shape on spatiotemporal dimensions. resolution_scaling_factor : Optional[Union[Number, List[Number]]], optional factor by which to scale outputs for super-resolution, by default None n_layers : int, optional number of Fourier layers to apply in sequence, by default 1 disco_layers : bool or bool list, optional Must be same length as n_layers, dictates whether to include a local integral kernel parallel connection at each layer. If a single bool, shared for all layers. disco_kernel_shape: Union[int, List[int]] kernel shape for local integral. Expects either a single integer for isotropic kernels or two integers for anisotropic kernels domain_length: torch.Tensor, optional extent/length of the physical domain. Assumes square domain [-1, 1]^2 by default disco_groups: int, optional number of groups in the local integral convolution, by default 1 disco_bias: bool, optional whether to use a bias for the integral kernel, by default True radius_cutoff: float, optional cutoff radius (with respect to domain_length) for the local integral kernel, by default None diff_layers : bool or bool list, optional Must be same length as n_layers, dictates whether to include a differential kernel parallel connection at each layer. If a single bool, shared for all layers. conv_padding_mode : str in ['periodic', 'circular', 'replicate', 'reflect', 'zeros'], optional Padding mode for spatial convolution kernels. fin_diff_kernel_size : odd int, optional Conv kernel size for finite difference convolution. mix_derivatives : bool, optional Whether to mix derivatives across channels. max_n_modes : int, List[int], optional maximum number of modes to keep along each dimension, by default None fno_block_precision : str, optional floating point precision to use for computations, by default "full" channel_mlp_dropout : int, optional dropout parameter for self.channel_mlp, by default 0 channel_mlp_expansion : float, optional expansion parameter for self.channel_mlp, by default 0.5 non_linearity : torch.nn.F module, optional nonlinear activation function to use between layers, by default F.gelu stabilizer : Literal["tanh"], optional stabilizing module to use between certain layers, by default None if "tanh", use tanh norm : Literal["ada_in", "group_norm", "instance_norm"], optional Normalization layer to use, by default None ada_in_features : int, optional number of features for adaptive instance norm above, by default None preactivation : bool, optional whether to call forward pass with pre-activation, by default False if True, call nonlinear activation and norm before Fourier convolution if False, call activation and norms after Fourier convolutions fno_skip : str, optional module to use for FNO skip connections, by default "linear" see layers.skip_connections for more details channel_mlp_skip : str, optional module to use for ChannelMLP skip connections, by default "soft-gating" see layers.skip_connections for more details Other Parameters ------------------- complex_data : bool, optional whether the FNO's data takes on complex values in space, by default False separable : bool, optional separable parameter for SpectralConv, by default False factorization : str, optional factorization parameter for SpectralConv, by default None rank : float, optional rank parameter for SpectralConv, by default 1.0 conv_module : BaseConv, optional module to use for convolutions in FNO block, by default SpectralConv joint_factorization : bool, optional whether to factorize all spectralConv weights as one tensor, by default False fixed_rank_modes : bool, optional fixed_rank_modes parameter for SpectralConv, by default False implementation : str, optional implementation parameter for SpectralConv, by default "factorized" decomposition_kwargs : _type_, optional kwargs for tensor decomposition in SpectralConv, by default dict() References ----------- .. [1] Li, Z. et al. "Fourier Neural Operator for Parametric Partial Differential Equations" (2021). ICLR 2021, https://arxiv.org/pdf/2010.08895. .. [2] Kossaifi, J., Kovachki, N., Azizzadenesheli, K., Anandkumar, A. "Multi-Grid Tensorized Fourier Neural Operator for High-Resolution PDEs" (2024). TMLR 2024, https://openreview.net/pdf?id=AWiDlO63bH. .. [3] Liu-Schiaffini M., Berner J., Bonev B., Kurth T., Azizzadenesheli K., Anandkumar A.; "Neural Operators with Localized Integral and Differential Kernels" (2024). ICML 2024, https://arxiv.org/pdf/2402.16845. """ def __init__( self, in_channels, out_channels, n_modes, default_in_shape, resolution_scaling_factor=None, n_layers=1, disco_layers=True, disco_kernel_shape=[2,4], radius_cutoff=None, domain_length=[2,2], disco_groups=1, disco_bias=True, diff_layers=True, conv_padding_mode='periodic', fin_diff_kernel_size=3, mix_derivatives=True, max_n_modes=None, fno_block_precision="full", use_channel_mlp=False, channel_mlp_dropout=0, channel_mlp_expansion=0.5, non_linearity=F.gelu, stabilizer=None, norm=None, ada_in_features=None, preactivation=False, fno_skip="linear", channel_mlp_skip="soft-gating", separable=False, factorization=None, rank=1.0, conv_module=SpectralConv, fixed_rank_modes=False, implementation="factorized", decomposition_kwargs=dict(), fft_norm="forward", **kwargs, ): super().__init__() if isinstance(n_modes, int): n_modes = [n_modes] self._n_modes = n_modes assert len(n_modes) == len(default_in_shape), "Spatiotemporal dimensions must be consistent" # If a single bool is passed for disco_layers or diff_layers, set values for all layers if isinstance(disco_layers, bool): disco_layers = [disco_layers] * n_layers if isinstance(diff_layers, bool): diff_layers = [diff_layers] * n_layers if len(n_modes) > 3 and True in diff_layers: NotImplementedError("Differential convs not implemented for dimensions higher than 3.") if len(n_modes) != 2 and True in disco_layers: NotImplementedError("Local conv layers only implemented for dimension 2.") if conv_padding_mode not in ['circular', 'periodic', 'zeros'] and True in disco_layers: warnings.warn("Local conv layers only support periodic or zero padding, defaulting to zero padding for local convs.") self.n_dim = len(n_modes) self.resolution_scaling_factor: Union[ None, List[List[float]] ] = validate_scaling_factor(resolution_scaling_factor, self.n_dim, n_layers) self.max_n_modes = max_n_modes self.fno_block_precision = fno_block_precision self.in_channels = in_channels self.out_channels = out_channels self.n_layers = n_layers self.non_linearity = non_linearity self.stabilizer = stabilizer self.rank = rank self.factorization = factorization self.fixed_rank_modes = fixed_rank_modes self.decomposition_kwargs = decomposition_kwargs self.fno_skip = fno_skip self.channel_mlp_skip = channel_mlp_skip self.use_channel_mlp = use_channel_mlp self.channel_mlp_expansion = channel_mlp_expansion self.channel_mlp_dropout = channel_mlp_dropout self.fft_norm = fft_norm self.implementation = implementation self.separable = separable self.preactivation = preactivation self.ada_in_features = ada_in_features self.diff_layers = diff_layers self.conv_padding_mode = conv_padding_mode self.default_in_shape = default_in_shape self.fin_diff_kernel_size = fin_diff_kernel_size self.mix_derivatives = mix_derivatives self.disco_layers = disco_layers self.disco_kernel_shape = disco_kernel_shape self.radius_cutoff = radius_cutoff self.domain_length = domain_length self.disco_groups = disco_groups self.disco_bias = disco_bias self.periodic = (self.conv_padding_mode in ['circular', 'periodic']) assert len(diff_layers) == n_layers,\ f"diff_layers must either provide a single bool value or a list of booleans of length n_layers,\ got {len(diff_layers)=}" assert len(disco_layers) == n_layers,\ f"disco_layers must either provide a single bool value or a list of booleans of length n_layers,\ got {len(disco_layers)=}" self.convs = nn.ModuleList( [ conv_module( self.in_channels, self.out_channels, self.n_modes, resolution_scaling_factor=None if resolution_scaling_factor is None else self.resolution_scaling_factor[i], max_n_modes=max_n_modes, rank=rank, fixed_rank_modes=fixed_rank_modes, implementation=implementation, separable=separable, factorization=factorization, decomposition_kwargs=decomposition_kwargs, ) for i in range(n_layers) ] ) self.fno_skips = nn.ModuleList( [ skip_connection( self.in_channels, self.out_channels, skip_type=fno_skip, n_dim=self.n_dim, ) for _ in range(n_layers) ] ) self.diff_groups = 1 if mix_derivatives else in_channels self.differential = nn.ModuleList( [ FiniteDifferenceConvolution(self.in_channels, self.out_channels, self.n_dim, self.fin_diff_kernel_size, self.diff_groups, self.conv_padding_mode) for _ in range(sum(self.diff_layers)) ] ) self.local_convs = nn.ModuleList( [ EquidistantDiscreteContinuousConv2d(self.in_channels, self.out_channels, in_shape=self.default_in_shape, out_shape=self.default_in_shape, kernel_shape=self.disco_kernel_shape, domain_length=self.domain_length, radius_cutoff=self.radius_cutoff, periodic=self.periodic, groups=self.disco_groups, bias=self.disco_bias) for _ in range(sum(self.disco_layers)) ] ) # Helper for calling differential layers self.differential_idx_list = [] j = 0 for i in range(n_layers): if self.diff_layers[i]: self.differential_idx_list.append(j) j += 1 else: self.differential_idx_list.append(-1) assert max(self.differential_idx_list) == sum(self.diff_layers) - 1 # Helper for calling local conv layers self.disco_idx_list = [] j = 0 for i in range(n_layers): if self.disco_layers[i]: self.disco_idx_list.append(j) j += 1 else: self.disco_idx_list.append(-1) assert max(self.disco_idx_list) == sum(self.disco_layers) - 1 if use_channel_mlp: self.mlp = nn.ModuleList( [ ChannelMLP( in_channels=self.out_channels, hidden_channels=round(self.out_channels * channel_mlp_expansion), dropout=channel_mlp_dropout, n_dim=self.n_dim, ) for _ in range(n_layers) ] ) self.channel_mlp_skips = nn.ModuleList( [ skip_connection( self.in_channels, self.out_channels, skip_type=channel_mlp_skip, n_dim=self.n_dim, ) for _ in range(n_layers) ] ) else: self.mlp = None # Each block will have 2 norms if we also use an MLP self.n_norms = 1 if self.mlp is None else 2 if norm is None: self.norm = None elif norm == "instance_norm": self.norm = nn.ModuleList( [ InstanceNorm() for _ in range(n_layers * self.n_norms) ] ) elif norm == "group_norm": self.norm = nn.ModuleList( [ nn.GroupNorm(num_groups=1, num_channels=self.out_channels) for _ in range(n_layers * self.n_norms) ] ) elif norm == "ada_in": self.norm = nn.ModuleList( [ AdaIN(ada_in_features, out_channels) for _ in range(n_layers * self.n_norms) ] ) else: raise ValueError( f"Got norm={norm} but expected None or one of " "[instance_norm, group_norm, ada_in]" )
[docs] def set_ada_in_embeddings(self, *embeddings): """Sets the embeddings of each Ada-IN norm layers Parameters ---------- embeddings : tensor or list of tensor if a single embedding is given, it will be used for each norm layer otherwise, each embedding will be used for the corresponding norm layer """ if len(embeddings) == 1: for norm in self.norm: norm.set_embedding(embeddings[0]) else: for norm, embedding in zip(self.norm, embeddings): norm.set_embedding(embedding)
[docs] def forward(self, x, index=0, output_shape=None): if self.preactivation: return self.forward_with_preactivation(x, index, output_shape) else: return self.forward_with_postactivation(x, index, output_shape)
def forward_with_postactivation(self, x, index=0, output_shape=None): x_skip_fno = self.fno_skips[index](x) x_skip_fno = self.convs[index].transform(x_skip_fno, output_shape=output_shape) if self.mlp is not None: x_skip_mlp = self.channel_mlp_skips[index](x) x_skip_mlp = self.convs[index].transform(x_skip_mlp, output_shape=output_shape) if self.stabilizer == "tanh": x = torch.tanh(x) x_fno = self.convs[index](x, output_shape=output_shape) if self.differential_idx_list[index] != -1: grid_width_scaling_factor = 1 / (x.shape[-1] / self.default_in_shape[0]) x_differential = self.differential[self.differential_idx_list[index]](x, grid_width_scaling_factor) x_differential = self.convs[index].transform(x_differential, output_shape=output_shape) else: x_differential = 0 if self.disco_idx_list[index] != -1: x_localconv = self.local_convs[self.disco_idx_list[index]](x) x_localconv = self.convs[index].transform(x_localconv, output_shape=output_shape) else: x_localconv = 0 x_fno_diff_disco = x_fno + x_differential + x_localconv if self.norm is not None: x_fno_diff_disco = self.norm[self.n_norms * index](x_fno_diff_disco) x = x_fno_diff_disco + x_skip_fno if (self.mlp is not None) or (index < (self.n_layers - 1)): x = self.non_linearity(x) if self.mlp is not None: x = self.mlp[index](x) + x_skip_mlp if self.norm is not None: x = self.norm[self.n_norms * index + 1](x) if index < (self.n_layers - 1): x = self.non_linearity(x) return x def forward_with_preactivation(self, x, index=0, output_shape=None): # Apply non-linear activation (and norm) # before this block's convolution/forward pass: x = self.non_linearity(x) if self.norm is not None: x = self.norm[self.n_norms * index](x) if self.differential_idx_list[index] != -1: grid_width_scaling_factor = 1 / (x.shape[-1] / self.default_grid_res) x_differential = self.differential[self.differential_idx_list[index]](x, grid_width_scaling_factor) else: x_differential = 0 if self.disco_idx_list[index] != -1: x_localconv = self.local_convs[self.disco_idx_list[index]](x) else: x_localconv = 0 x_skip_fno = self.fno_skips[index](x) x_skip_local_fno = self.convs[index].transform(x_skip_fno + x_differential + x_localconv, output_shape=output_shape) if self.mlp is not None: x_skip_mlp = self.channel_mlp_skips[index](x) x_skip_mlp = self.convs[index].transform(x_skip_mlp, output_shape=output_shape) if self.stabilizer == "tanh": x = torch.tanh(x) x_fno = self.convs[index](x, output_shape=output_shape) x = x_fno + x_skip_local_fno if self.mlp is not None: if index < (self.n_layers - 1): x = self.non_linearity(x) if self.norm is not None: x = self.norm[self.n_norms * index + 1](x) x = self.mlp[index](x) + x_skip_mlp return x @property def n_modes(self): return self._n_modes @n_modes.setter def n_modes(self, n_modes): for i in range(self.n_layers): self.convs[i].n_modes = n_modes self._n_modes = n_modes
[docs] def get_block(self, indices): """Returns a sub-FNO Block layer from the jointly parametrized main block The parametrization of an FNOBlock layer is shared with the main one. """ if self.n_layers == 1: raise ValueError( "A single layer is parametrized, directly use the main class." ) return SubModule(self, indices)
def __getitem__(self, indices): return self.get_block(indices)