Source code for mmaction.models.backbones.uniformerv2
# Copyright (c) OpenMMLab. All rights reserved.
import os
from collections import OrderedDict
from typing import Dict, List, Optional, Union
import torch
from mmcv.cnn.bricks import DropPath
from mmengine.logging import MMLogger
from mmengine.model import BaseModule, ModuleList
from mmengine.runner.checkpoint import _load_checkpoint
from torch import nn
from mmaction.registry import MODELS
logger = MMLogger.get_current_instance()
MODEL_PATH = 'https://download.openmmlab.com/mmaction/v1.0/recognition'
_MODELS = {
'ViT-B/16':
os.path.join(MODEL_PATH, 'uniformerv2/clipVisualEncoder',
'vit-base-p16-res224_clip-rgb_20221219-b8a5da86.pth'),
'ViT-L/14':
os.path.join(MODEL_PATH, 'uniformerv2/clipVisualEncoder',
'vit-large-p14-res224_clip-rgb_20221219-9de7543e.pth'),
'ViT-L/14_336':
os.path.join(MODEL_PATH, 'uniformerv2/clipVisualEncoder',
'vit-large-p14-res336_clip-rgb_20221219-d370f9e5.pth'),
}
class QuickGELU(BaseModule):
"""Quick GELU function. Forked from https://github.com/openai/CLIP/blob/d50
d76daa670286dd6cacf3bcd80b5e4823fc8e1/clip/model.py.
Args:
x (torch.Tensor): The input features of shape :math:`(B, N, C)`.
"""
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x * torch.sigmoid(1.702 * x)
class Local_MHRA(BaseModule):
"""Local MHRA.
Args:
d_model (int): Number of input channels.
dw_reduction (float): Downsample ratio of input channels.
Defaults to 1.5.
pos_kernel_size (int): Kernel size of local MHRA.
Defaults to 3.
init_cfg (dict, optional): The config of weight initialization.
Defaults to None.
"""
def __init__(
self,
d_model: int,
dw_reduction: float = 1.5,
pos_kernel_size: int = 3,
init_cfg: Optional[dict] = None,
) -> None:
super().__init__(init_cfg=init_cfg)
padding = pos_kernel_size // 2
re_d_model = int(d_model // dw_reduction)
self.pos_embed = nn.Sequential(
nn.BatchNorm3d(d_model),
nn.Conv3d(d_model, re_d_model, kernel_size=1, stride=1, padding=0),
nn.Conv3d(
re_d_model,
re_d_model,
kernel_size=(pos_kernel_size, 1, 1),
stride=(1, 1, 1),
padding=(padding, 0, 0),
groups=re_d_model),
nn.Conv3d(re_d_model, d_model, kernel_size=1, stride=1, padding=0),
)
# init zero
logger.info('Init zero for Conv in pos_emb')
nn.init.constant_(self.pos_embed[3].weight, 0)
nn.init.constant_(self.pos_embed[3].bias, 0)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.pos_embed(x)
class ResidualAttentionBlock(BaseModule):
"""Local UniBlock.
Args:
d_model (int): Number of input channels.
n_head (int): Number of attention head.
drop_path (float): Stochastic depth rate.
Defaults to 0.0.
dw_reduction (float): Downsample ratio of input channels.
Defaults to 1.5.
no_lmhra (bool): Whether removing local MHRA.
Defaults to False.
double_lmhra (bool): Whether using double local MHRA.
Defaults to True.
init_cfg (dict, optional): The config of weight initialization.
Defaults to None.
"""
def __init__(
self,
d_model: int,
n_head: int,
drop_path: float = 0.0,
dw_reduction: float = 1.5,
no_lmhra: bool = False,
double_lmhra: bool = True,
init_cfg: Optional[dict] = None,
) -> None:
super().__init__(init_cfg=init_cfg)
self.n_head = n_head
self.drop_path = DropPath(
drop_path) if drop_path > 0. else nn.Identity()
logger.info(f'Drop path rate: {drop_path}')
self.no_lmhra = no_lmhra
self.double_lmhra = double_lmhra
logger.info(f'No L_MHRA: {no_lmhra}')
logger.info(f'Double L_MHRA: {double_lmhra}')
if not no_lmhra:
self.lmhra1 = Local_MHRA(d_model, dw_reduction=dw_reduction)
if double_lmhra:
self.lmhra2 = Local_MHRA(d_model, dw_reduction=dw_reduction)
# spatial
self.attn = nn.MultiheadAttention(d_model, n_head)
self.ln_1 = nn.LayerNorm(d_model)
self.mlp = nn.Sequential(
OrderedDict([('c_fc', nn.Linear(d_model, d_model * 4)),
('gelu', QuickGELU()),
('c_proj', nn.Linear(d_model * 4, d_model))]))
self.ln_2 = nn.LayerNorm(d_model)
def attention(self, x: torch.Tensor) -> torch.Tensor:
return self.attn(x, x, x, need_weights=False, attn_mask=None)[0]
def forward(self, x: torch.Tensor, T: int = 8) -> torch.Tensor:
# x: 1+HW, NT, C
if not self.no_lmhra:
# Local MHRA
tmp_x = x[1:, :, :]
L, NT, C = tmp_x.shape
N = NT // T
H = W = int(L**0.5)
tmp_x = tmp_x.view(H, W, N, T, C).permute(2, 4, 3, 0,
1).contiguous()
tmp_x = tmp_x + self.drop_path(self.lmhra1(tmp_x))
tmp_x = tmp_x.view(N, C, T,
L).permute(3, 0, 2,
1).contiguous().view(L, NT, C)
x = torch.cat([x[:1, :, :], tmp_x], dim=0)
# MHSA
x = x + self.drop_path(self.attention(self.ln_1(x)))
# Local MHRA
if not self.no_lmhra and self.double_lmhra:
tmp_x = x[1:, :, :]
tmp_x = tmp_x.view(H, W, N, T, C).permute(2, 4, 3, 0,
1).contiguous()
tmp_x = tmp_x + self.drop_path(self.lmhra2(tmp_x))
tmp_x = tmp_x.view(N, C, T,
L).permute(3, 0, 2,
1).contiguous().view(L, NT, C)
x = torch.cat([x[:1, :, :], tmp_x], dim=0)
# FFN
x = x + self.drop_path(self.mlp(self.ln_2(x)))
return x
class Extractor(BaseModule):
"""Global UniBlock.
Args:
d_model (int): Number of input channels.
n_head (int): Number of attention head.
mlp_factor (float): Ratio of hidden dimensions in MLP layers.
Defaults to 4.0.
drop_out (float): Stochastic dropout rate.
Defaults to 0.0.
drop_path (float): Stochastic depth rate.
Defaults to 0.0.
init_cfg (dict, optional): The config of weight initialization.
Defaults to None.
"""
def __init__(
self,
d_model: int,
n_head: int,
mlp_factor: float = 4.0,
dropout: float = 0.0,
drop_path: float = 0.0,
init_cfg: Optional[dict] = None,
) -> None:
super().__init__(init_cfg=init_cfg)
self.drop_path = DropPath(
drop_path) if drop_path > 0. else nn.Identity()
logger.info(f'Drop path rate: {drop_path}')
self.attn = nn.MultiheadAttention(d_model, n_head)
self.ln_1 = nn.LayerNorm(d_model)
d_mlp = round(mlp_factor * d_model)
self.mlp = nn.Sequential(
OrderedDict([('c_fc', nn.Linear(d_model, d_mlp)),
('gelu', QuickGELU()),
('dropout', nn.Dropout(dropout)),
('c_proj', nn.Linear(d_mlp, d_model))]))
self.ln_2 = nn.LayerNorm(d_model)
self.ln_3 = nn.LayerNorm(d_model)
# zero init
nn.init.xavier_uniform_(self.attn.in_proj_weight)
nn.init.constant_(self.attn.out_proj.weight, 0.)
nn.init.constant_(self.attn.out_proj.bias, 0.)
nn.init.xavier_uniform_(self.mlp[0].weight)
nn.init.constant_(self.mlp[-1].weight, 0.)
nn.init.constant_(self.mlp[-1].bias, 0.)
def attention(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
d_model = self.ln_1.weight.size(0)
q = (x @ self.attn.in_proj_weight[:d_model].T
) + self.attn.in_proj_bias[:d_model]
k = (y @ self.attn.in_proj_weight[d_model:-d_model].T
) + self.attn.in_proj_bias[d_model:-d_model]
v = (y @ self.attn.in_proj_weight[-d_model:].T
) + self.attn.in_proj_bias[-d_model:]
Tx, Ty, N = q.size(0), k.size(0), q.size(1)
q = q.view(Tx, N, self.attn.num_heads,
self.attn.head_dim).permute(1, 2, 0, 3)
k = k.view(Ty, N, self.attn.num_heads,
self.attn.head_dim).permute(1, 2, 0, 3)
v = v.view(Ty, N, self.attn.num_heads,
self.attn.head_dim).permute(1, 2, 0, 3)
aff = (q @ k.transpose(-2, -1) / (self.attn.head_dim**0.5))
aff = aff.softmax(dim=-1)
out = aff @ v
out = out.permute(2, 0, 1, 3).flatten(2)
out = self.attn.out_proj(out)
return out
def forward(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
x = x + self.drop_path(self.attention(self.ln_1(x), self.ln_3(y)))
x = x + self.drop_path(self.mlp(self.ln_2(x)))
return x
class Transformer(BaseModule):
"""Backbone:
Args:
width (int): Number of input channels in local UniBlock.
layers (int): Number of layers of local UniBlock.
heads (int): Number of attention head in local UniBlock.
backbone_drop_path_rate (float): Stochastic depth rate
in local UniBlock. Defaults to 0.0.
t_size (int): Number of temporal dimension after patch embedding.
Defaults to 8.
dw_reduction (float): Downsample ratio of input channels in local MHRA.
Defaults to 1.5.
no_lmhra (bool): Whether removing local MHRA in local UniBlock.
Defaults to False.
double_lmhra (bool): Whether using double local MHRA
in local UniBlock. Defaults to True.
return_list (List[int]): Layer index of input features
for global UniBlock. Defaults to [8, 9, 10, 11].
n_dim (int): Number of layers of global UniBlock.
Defaults to 4.
n_dim (int): Number of layers of global UniBlock.
Defaults to 4.
n_dim (int): Number of input channels in global UniBlock.
Defaults to 768.
n_head (int): Number of attention head in global UniBlock.
Defaults to 12.
mlp_factor (float): Ratio of hidden dimensions in MLP layers
in global UniBlock. Defaults to 4.0.
drop_path_rate (float): Stochastic depth rate in global UniBlock.
Defaults to 0.0.
mlp_dropout (List[float]): Stochastic dropout rate in each MLP layer
in global UniBlock. Defaults to [0.5, 0.5, 0.5, 0.5].
init_cfg (dict, optional): The config of weight initialization.
Defaults to None.
"""
def __init__(
self,
width: int,
layers: int,
heads: int,
backbone_drop_path_rate: float = 0.,
t_size: int = 8,
dw_reduction: float = 1.5,
no_lmhra: bool = True,
double_lmhra: bool = False,
return_list: List[int] = [8, 9, 10, 11],
n_layers: int = 4,
n_dim: int = 768,
n_head: int = 12,
mlp_factor: float = 4.0,
drop_path_rate: float = 0.,
mlp_dropout: List[float] = [0.5, 0.5, 0.5, 0.5],
init_cfg: Optional[dict] = None,
) -> None:
super().__init__(init_cfg=init_cfg)
self.T = t_size
self.return_list = return_list
# backbone
b_dpr = [
x.item()
for x in torch.linspace(0, backbone_drop_path_rate, layers)
]
self.resblocks = ModuleList([
ResidualAttentionBlock(
width,
heads,
drop_path=b_dpr[i],
dw_reduction=dw_reduction,
no_lmhra=no_lmhra,
double_lmhra=double_lmhra,
) for i in range(layers)
])
# global block
assert n_layers == len(return_list)
self.temporal_cls_token = nn.Parameter(torch.zeros(1, 1, n_dim))
self.dpe = ModuleList([
nn.Conv3d(
n_dim,
n_dim,
kernel_size=3,
stride=1,
padding=1,
bias=True,
groups=n_dim) for _ in range(n_layers)
])
for m in self.dpe:
nn.init.constant_(m.bias, 0.)
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, n_layers)]
self.dec = ModuleList([
Extractor(
n_dim,
n_head,
mlp_factor=mlp_factor,
dropout=mlp_dropout[i],
drop_path=dpr[i],
) for i in range(n_layers)
])
# weight sum
self.norm = nn.LayerNorm(n_dim)
self.balance = nn.Parameter(torch.zeros((n_dim)))
self.sigmoid = nn.Sigmoid()
def forward(self, x: torch.Tensor) -> torch.Tensor:
T_down = self.T
L, NT, C = x.shape
N = NT // T_down
H = W = int((L - 1)**0.5)
cls_token = self.temporal_cls_token.repeat(1, N, 1)
j = -1
for i, resblock in enumerate(self.resblocks):
x = resblock(x, T_down)
if i in self.return_list:
j += 1
tmp_x = x.clone()
tmp_x = tmp_x.view(L, N, T_down, C)
# dpe
_, tmp_feats = tmp_x[:1], tmp_x[1:]
tmp_feats = tmp_feats.permute(1, 3, 2,
0).reshape(N, C, T_down, H, W)
tmp_feats = self.dpe[j](tmp_feats.clone()).view(
N, C, T_down, L - 1).permute(3, 0, 2, 1).contiguous()
tmp_x[1:] = tmp_x[1:] + tmp_feats
# global block
tmp_x = tmp_x.permute(2, 0, 1, 3).flatten(0, 1) # T * L, N, C
cls_token = self.dec[j](cls_token, tmp_x)
weight = self.sigmoid(self.balance)
residual = x.view(L, N, T_down, C)[0].mean(1) # L, N, T, C
out = self.norm((1 - weight) * cls_token[0, :, :] + weight * residual)
return out
[docs]@MODELS.register_module()
class UniFormerV2(BaseModule):
"""UniFormerV2:
A pytorch implement of: `UniFormerV2: Spatiotemporal
Learning by Arming Image ViTs with Video UniFormer
<https://arxiv.org/abs/2211.09552>`
Args:
input_resolution (int): Number of input resolution.
Defaults to 224.
patch_size (int): Number of patch size.
Defaults to 16.
width (int): Number of input channels in local UniBlock.
Defaults to 768.
layers (int): Number of layers of local UniBlock.
Defaults to 12.
heads (int): Number of attention head in local UniBlock.
Defaults to 12.
backbone_drop_path_rate (float): Stochastic depth rate
in local UniBlock. Defaults to 0.0.
t_size (int): Number of temporal dimension after patch embedding.
Defaults to 8.
temporal_downsample (bool): Whether downsampling temporal dimentison.
Defaults to False.
dw_reduction (float): Downsample ratio of input channels in local MHRA.
Defaults to 1.5.
no_lmhra (bool): Whether removing local MHRA in local UniBlock.
Defaults to False.
double_lmhra (bool): Whether using double local MHRA in local UniBlock.
Defaults to True.
return_list (List[int]): Layer index of input features
for global UniBlock. Defaults to [8, 9, 10, 11].
n_dim (int): Number of layers of global UniBlock.
Defaults to 4.
n_dim (int): Number of layers of global UniBlock.
Defaults to 4.
n_dim (int): Number of input channels in global UniBlock.
Defaults to 768.
n_head (int): Number of attention head in global UniBlock.
Defaults to 12.
mlp_factor (float): Ratio of hidden dimensions in MLP layers
in global UniBlock. Defaults to 4.0.
drop_path_rate (float): Stochastic depth rate in global UniBlock.
Defaults to 0.0.
mlp_dropout (List[float]): Stochastic dropout rate in each MLP layer
in global UniBlock. Defaults to [0.5, 0.5, 0.5, 0.5].
clip_pretrained (bool): Whether to load pretrained CLIP visual encoder.
Defaults to True.
pretrained (str): Name of pretrained model.
Defaults to None.
init_cfg (dict or list[dict]): Initialization config dict. Defaults to
``[
dict(type='TruncNormal', layer='Linear', std=0.02, bias=0.),
dict(type='Constant', layer='LayerNorm', val=1., bias=0.)
]``.
"""
def __init__(
self,
# backbone
input_resolution: int = 224,
patch_size: int = 16,
width: int = 768,
layers: int = 12,
heads: int = 12,
backbone_drop_path_rate: float = 0.,
t_size: int = 8,
kernel_size: int = 3,
dw_reduction: float = 1.5,
temporal_downsample: bool = False,
no_lmhra: bool = True,
double_lmhra: bool = False,
# global block
return_list: List[int] = [8, 9, 10, 11],
n_layers: int = 4,
n_dim: int = 768,
n_head: int = 12,
mlp_factor: float = 4.0,
drop_path_rate: float = 0.,
mlp_dropout: List[float] = [0.5, 0.5, 0.5, 0.5],
# pretrain
clip_pretrained: bool = True,
pretrained: Optional[str] = None,
init_cfg: Optional[Union[Dict, List[Dict]]] = [
dict(type='TruncNormal', layer='Linear', std=0.02, bias=0.),
dict(type='Constant', layer='LayerNorm', val=1., bias=0.)
]
) -> None:
super().__init__(init_cfg=init_cfg)
self.pretrained = pretrained
self.clip_pretrained = clip_pretrained
self.input_resolution = input_resolution
padding = (kernel_size - 1) // 2
if temporal_downsample:
self.conv1 = nn.Conv3d(
3,
width, (kernel_size, patch_size, patch_size),
(2, patch_size, patch_size), (padding, 0, 0),
bias=False)
t_size = t_size // 2
else:
self.conv1 = nn.Conv3d(
3,
width, (1, patch_size, patch_size),
(1, patch_size, patch_size), (0, 0, 0),
bias=False)
scale = width**-0.5
self.class_embedding = nn.Parameter(scale * torch.randn(width))
self.positional_embedding = nn.Parameter(scale * torch.randn(
(input_resolution // patch_size)**2 + 1, width))
self.ln_pre = nn.LayerNorm(width)
self.transformer = Transformer(
width,
layers,
heads,
dw_reduction=dw_reduction,
backbone_drop_path_rate=backbone_drop_path_rate,
t_size=t_size,
no_lmhra=no_lmhra,
double_lmhra=double_lmhra,
return_list=return_list,
n_layers=n_layers,
n_dim=n_dim,
n_head=n_head,
mlp_factor=mlp_factor,
drop_path_rate=drop_path_rate,
mlp_dropout=mlp_dropout,
)
def _inflate_weight(self,
weight_2d: torch.Tensor,
time_dim: int,
center: bool = True) -> torch.Tensor:
logger.info(f'Init center: {center}')
if center:
weight_3d = torch.zeros(*weight_2d.shape)
weight_3d = weight_3d.unsqueeze(2).repeat(1, 1, time_dim, 1, 1)
middle_idx = time_dim // 2
weight_3d[:, :, middle_idx, :, :] = weight_2d
else:
weight_3d = weight_2d.unsqueeze(2).repeat(1, 1, time_dim, 1, 1)
weight_3d = weight_3d / time_dim
return weight_3d
def _load_pretrained(self, pretrained: str = None) -> None:
"""Load CLIP pretrained visual encoder.
The visual encoder is extracted from CLIP.
https://github.com/openai/CLIP
Args:
pretrained (str): Model name of pretrained CLIP visual encoder.
Defaults to None.
"""
assert pretrained is not None, \
'please specify clip pretraied checkpoint'
model_path = _MODELS[pretrained]
logger.info(f'Load CLIP pretrained model from {model_path}')
state_dict = _load_checkpoint(model_path, map_location='cpu')
state_dict_3d = self.state_dict()
for k in state_dict.keys():
if k in state_dict_3d.keys(
) and state_dict[k].shape != state_dict_3d[k].shape:
if len(state_dict_3d[k].shape) <= 2:
logger.info(f'Ignore: {k}')
continue
logger.info(f'Inflate: {k}, {state_dict[k].shape}' +
f' => {state_dict_3d[k].shape}')
time_dim = state_dict_3d[k].shape[2]
state_dict[k] = self._inflate_weight(state_dict[k], time_dim)
self.load_state_dict(state_dict, strict=False)
[docs] def init_weights(self):
"""Initialize the weights in backbone."""
if self.clip_pretrained:
logger = MMLogger.get_current_instance()
logger.info(f'load model from: {self.pretrained}')
self._load_pretrained(self.pretrained)
else:
if self.pretrained:
self.init_cfg = dict(
type='Pretrained', checkpoint=self.pretrained)
super().init_weights()
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.conv1(x) # shape = [*, width, grid, grid]
N, C, T, H, W = x.shape
x = x.permute(0, 2, 3, 4, 1).reshape(N * T, H * W, C)
x = torch.cat([
self.class_embedding.to(x.dtype) + torch.zeros(
x.shape[0], 1, x.shape[-1], dtype=x.dtype, device=x.device), x
],
dim=1) # shape = [*, grid ** 2 + 1, width]
x = x + self.positional_embedding.to(x.dtype)
x = self.ln_pre(x)
x = x.permute(1, 0, 2) # NLD -> LND
out = self.transformer(x)
return out