Shortcuts

Source code for mmaction.datasets.transforms.pose_transforms

# Copyright (c) OpenMMLab. All rights reserved.
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import scipy
from mmcv.transforms import BaseTransform, KeyMapper
from mmengine.dataset import Compose
from packaging import version as pv
from scipy.stats import mode
from torch.nn.modules.utils import _pair

from mmaction.registry import TRANSFORMS
from .loading import DecordDecode, DecordInit
from .processing import _combine_quadruple

if pv.parse(scipy.__version__) < pv.parse('1.11.0'):
    get_mode = mode
else:
    from functools import partial
    get_mode = partial(mode, keepdims=True)


[docs]@TRANSFORMS.register_module() class DecompressPose(BaseTransform): """Load Compressed Pose. Required Keys: - frame_inds - total_frames - keypoint - anno_inds (optional) Modified Keys: - keypoint - frame_inds Added Keys: - keypoint_score - num_person Args: squeeze (bool): Whether to remove frames with no human pose. Defaults to True. max_person (int): The max number of persons in a frame. Defaults to 10. """ def __init__(self, squeeze: bool = True, max_person: int = 10) -> None: self.squeeze = squeeze self.max_person = max_person
[docs] def transform(self, results: Dict) -> Dict: """Perform the pose decoding. Args: results (dict): The resulting dict to be modified and passed to the next transform in pipeline. """ required_keys = ['total_frames', 'frame_inds', 'keypoint'] for k in required_keys: assert k in results total_frames = results['total_frames'] frame_inds = results.pop('frame_inds') keypoint = results['keypoint'] if 'anno_inds' in results: frame_inds = frame_inds[results['anno_inds']] keypoint = keypoint[results['anno_inds']] assert np.all(np.diff(frame_inds) >= 0), \ 'frame_inds should be monotonical increasing' def mapinds(inds): uni = np.unique(inds) map_ = {x: i for i, x in enumerate(uni)} inds = [map_[x] for x in inds] return np.array(inds, dtype=np.int16) if self.squeeze: frame_inds = mapinds(frame_inds) total_frames = np.max(frame_inds) + 1 results['total_frames'] = total_frames num_joints = keypoint.shape[1] num_person = get_mode(frame_inds)[-1][0] new_kp = np.zeros([num_person, total_frames, num_joints, 2], dtype=np.float16) new_kpscore = np.zeros([num_person, total_frames, num_joints], dtype=np.float16) nperson_per_frame = np.zeros([total_frames], dtype=np.int16) for frame_ind, kp in zip(frame_inds, keypoint): person_ind = nperson_per_frame[frame_ind] new_kp[person_ind, frame_ind] = kp[:, :2] new_kpscore[person_ind, frame_ind] = kp[:, 2] nperson_per_frame[frame_ind] += 1 if num_person > self.max_person: for i in range(total_frames): nperson = nperson_per_frame[i] val = new_kpscore[:nperson, i] score_sum = val.sum(-1) inds = sorted(range(nperson), key=lambda x: -score_sum[x]) new_kpscore[:nperson, i] = new_kpscore[inds, i] new_kp[:nperson, i] = new_kp[inds, i] num_person = self.max_person results['num_person'] = num_person results['keypoint'] = new_kp[:num_person] results['keypoint_score'] = new_kpscore[:num_person] return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'squeeze={self.squeeze}, ' f'max_person={self.max_person})') return repr_str
[docs]@TRANSFORMS.register_module() class GeneratePoseTarget(BaseTransform): """Generate pseudo heatmaps based on joint coordinates and confidence. Required Keys: - keypoint - keypoint_score (optional) - img_shape Added Keys: - imgs (optional) - heatmap_imgs (optional) Args: sigma (float): The sigma of the generated gaussian map. Defaults to 0.6. use_score (bool): Use the confidence score of keypoints as the maximum of the gaussian maps. Defaults to True. with_kp (bool): Generate pseudo heatmaps for keypoints. Defaults to True. with_limb (bool): Generate pseudo heatmaps for limbs. At least one of 'with_kp' and 'with_limb' should be True. Defaults to False. skeletons (tuple[tuple]): The definition of human skeletons. Defaults to ``((0, 1), (0, 2), (1, 3), (2, 4), (0, 5), (5, 7), (7, 9), (0, 6), (6, 8), (8, 10), (5, 11), (11, 13), (13, 15), (6, 12), (12, 14), (14, 16), (11, 12))``, which is the definition of COCO-17p skeletons. double (bool): Output both original heatmaps and flipped heatmaps. Defaults to False. left_kp (tuple[int]): Indexes of left keypoints, which is used when flipping heatmaps. Defaults to (1, 3, 5, 7, 9, 11, 13, 15), which is left keypoints in COCO-17p. right_kp (tuple[int]): Indexes of right keypoints, which is used when flipping heatmaps. Defaults to (2, 4, 6, 8, 10, 12, 14, 16), which is right keypoints in COCO-17p. left_limb (tuple[int]): Indexes of left limbs, which is used when flipping heatmaps. Defaults to (0, 2, 4, 5, 6, 10, 11, 12), which is left limbs of skeletons we defined for COCO-17p. right_limb (tuple[int]): Indexes of right limbs, which is used when flipping heatmaps. Defaults to (1, 3, 7, 8, 9, 13, 14, 15), which is right limbs of skeletons we defined for COCO-17p. scaling (float): The ratio to scale the heatmaps. Defaults to 1. """ def __init__(self, sigma: float = 0.6, use_score: bool = True, with_kp: bool = True, with_limb: bool = False, skeletons: Tuple[Tuple[int]] = ((0, 1), (0, 2), (1, 3), (2, 4), (0, 5), (5, 7), (7, 9), (0, 6), (6, 8), (8, 10), (5, 11), (11, 13), (13, 15), (6, 12), (12, 14), (14, 16), (11, 12)), double: bool = False, left_kp: Tuple[int] = (1, 3, 5, 7, 9, 11, 13, 15), right_kp: Tuple[int] = (2, 4, 6, 8, 10, 12, 14, 16), left_limb: Tuple[int] = (0, 2, 4, 5, 6, 10, 11, 12), right_limb: Tuple[int] = (1, 3, 7, 8, 9, 13, 14, 15), scaling: float = 1.) -> None: self.sigma = sigma self.use_score = use_score self.with_kp = with_kp self.with_limb = with_limb self.double = double # an auxiliary const self.eps = 1e-4 assert self.with_kp or self.with_limb, ( 'At least one of "with_limb" ' 'and "with_kp" should be set as True.') self.left_kp = left_kp self.right_kp = right_kp self.skeletons = skeletons self.left_limb = left_limb self.right_limb = right_limb self.scaling = scaling
[docs] def generate_a_heatmap(self, arr: np.ndarray, centers: np.ndarray, max_values: np.ndarray) -> None: """Generate pseudo heatmap for one keypoint in one frame. Args: arr (np.ndarray): The array to store the generated heatmaps. Shape: img_h * img_w. centers (np.ndarray): The coordinates of corresponding keypoints (of multiple persons). Shape: M * 2. max_values (np.ndarray): The max values of each keypoint. Shape: M. """ sigma = self.sigma img_h, img_w = arr.shape for center, max_value in zip(centers, max_values): if max_value < self.eps: continue mu_x, mu_y = center[0], center[1] st_x = max(int(mu_x - 3 * sigma), 0) ed_x = min(int(mu_x + 3 * sigma) + 1, img_w) st_y = max(int(mu_y - 3 * sigma), 0) ed_y = min(int(mu_y + 3 * sigma) + 1, img_h) x = np.arange(st_x, ed_x, 1, np.float32) y = np.arange(st_y, ed_y, 1, np.float32) # if the keypoint not in the heatmap coordinate system if not (len(x) and len(y)): continue y = y[:, None] patch = np.exp(-((x - mu_x)**2 + (y - mu_y)**2) / 2 / sigma**2) patch = patch * max_value arr[st_y:ed_y, st_x:ed_x] = \ np.maximum(arr[st_y:ed_y, st_x:ed_x], patch)
[docs] def generate_a_limb_heatmap(self, arr: np.ndarray, starts: np.ndarray, ends: np.ndarray, start_values: np.ndarray, end_values: np.ndarray) -> None: """Generate pseudo heatmap for one limb in one frame. Args: arr (np.ndarray): The array to store the generated heatmaps. Shape: img_h * img_w. starts (np.ndarray): The coordinates of one keypoint in the corresponding limbs. Shape: M * 2. ends (np.ndarray): The coordinates of the other keypoint in the corresponding limbs. Shape: M * 2. start_values (np.ndarray): The max values of one keypoint in the corresponding limbs. Shape: M. end_values (np.ndarray): The max values of the other keypoint in the corresponding limbs. Shape: M. """ sigma = self.sigma img_h, img_w = arr.shape for start, end, start_value, end_value in zip(starts, ends, start_values, end_values): value_coeff = min(start_value, end_value) if value_coeff < self.eps: continue min_x, max_x = min(start[0], end[0]), max(start[0], end[0]) min_y, max_y = min(start[1], end[1]), max(start[1], end[1]) min_x = max(int(min_x - 3 * sigma), 0) max_x = min(int(max_x + 3 * sigma) + 1, img_w) min_y = max(int(min_y - 3 * sigma), 0) max_y = min(int(max_y + 3 * sigma) + 1, img_h) x = np.arange(min_x, max_x, 1, np.float32) y = np.arange(min_y, max_y, 1, np.float32) if not (len(x) and len(y)): continue y = y[:, None] x_0 = np.zeros_like(x) y_0 = np.zeros_like(y) # distance to start keypoints d2_start = ((x - start[0])**2 + (y - start[1])**2) # distance to end keypoints d2_end = ((x - end[0])**2 + (y - end[1])**2) # the distance between start and end keypoints. d2_ab = ((start[0] - end[0])**2 + (start[1] - end[1])**2) if d2_ab < 1: self.generate_a_heatmap(arr, start[None], start_value[None]) continue coeff = (d2_start - d2_end + d2_ab) / 2. / d2_ab a_dominate = coeff <= 0 b_dominate = coeff >= 1 seg_dominate = 1 - a_dominate - b_dominate position = np.stack([x + y_0, y + x_0], axis=-1) projection = start + np.stack([coeff, coeff], axis=-1) * ( end - start) d2_line = position - projection d2_line = d2_line[:, :, 0]**2 + d2_line[:, :, 1]**2 d2_seg = ( a_dominate * d2_start + b_dominate * d2_end + seg_dominate * d2_line) patch = np.exp(-d2_seg / 2. / sigma**2) patch = patch * value_coeff arr[min_y:max_y, min_x:max_x] = \ np.maximum(arr[min_y:max_y, min_x:max_x], patch)
[docs] def generate_heatmap(self, arr: np.ndarray, kps: np.ndarray, max_values: np.ndarray) -> None: """Generate pseudo heatmap for all keypoints and limbs in one frame (if needed). Args: arr (np.ndarray): The array to store the generated heatmaps. Shape: V * img_h * img_w. kps (np.ndarray): The coordinates of keypoints in this frame. Shape: M * V * 2. max_values (np.ndarray): The confidence score of each keypoint. Shape: M * V. """ if self.with_kp: num_kp = kps.shape[1] for i in range(num_kp): self.generate_a_heatmap(arr[i], kps[:, i], max_values[:, i]) if self.with_limb: for i, limb in enumerate(self.skeletons): start_idx, end_idx = limb starts = kps[:, start_idx] ends = kps[:, end_idx] start_values = max_values[:, start_idx] end_values = max_values[:, end_idx] self.generate_a_limb_heatmap(arr[i], starts, ends, start_values, end_values)
[docs] def gen_an_aug(self, results: Dict) -> np.ndarray: """Generate pseudo heatmaps for all frames. Args: results (dict): The dictionary that contains all info of a sample. Returns: np.ndarray: The generated pseudo heatmaps. """ all_kps = results['keypoint'].astype(np.float32) kp_shape = all_kps.shape if 'keypoint_score' in results: all_kpscores = results['keypoint_score'] else: all_kpscores = np.ones(kp_shape[:-1], dtype=np.float32) img_h, img_w = results['img_shape'] # scale img_h, img_w and kps img_h = int(img_h * self.scaling + 0.5) img_w = int(img_w * self.scaling + 0.5) all_kps[..., :2] *= self.scaling num_frame = kp_shape[1] num_c = 0 if self.with_kp: num_c += all_kps.shape[2] if self.with_limb: num_c += len(self.skeletons) ret = np.zeros([num_frame, num_c, img_h, img_w], dtype=np.float32) for i in range(num_frame): # M, V, C kps = all_kps[:, i] # M, C kpscores = all_kpscores[:, i] if self.use_score else \ np.ones_like(all_kpscores[:, i]) self.generate_heatmap(ret[i], kps, kpscores) return ret
[docs] def transform(self, results: Dict) -> Dict: """Generate pseudo heatmaps based on joint coordinates and confidence. Args: results (dict): The resulting dict to be modified and passed to the next transform in pipeline. """ heatmap = self.gen_an_aug(results) key = 'heatmap_imgs' if 'imgs' in results else 'imgs' if self.double: indices = np.arange(heatmap.shape[1], dtype=np.int64) left, right = (self.left_kp, self.right_kp) if self.with_kp else ( self.left_limb, self.right_limb) for l, r in zip(left, right): # noqa: E741 indices[l] = r indices[r] = l heatmap_flip = heatmap[..., ::-1][:, indices] heatmap = np.concatenate([heatmap, heatmap_flip]) results[key] = heatmap return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'sigma={self.sigma}, ' f'use_score={self.use_score}, ' f'with_kp={self.with_kp}, ' f'with_limb={self.with_limb}, ' f'skeletons={self.skeletons}, ' f'double={self.double}, ' f'left_kp={self.left_kp}, ' f'right_kp={self.right_kp}, ' f'left_limb={self.left_limb}, ' f'right_limb={self.right_limb}, ' f'scaling={self.scaling})') return repr_str
[docs]@TRANSFORMS.register_module() class PoseCompact(BaseTransform): """Convert the coordinates of keypoints to make it more compact. Specifically, it first find a tight bounding box that surrounds all joints in each frame, then we expand the tight box by a given padding ratio. For example, if 'padding == 0.25', then the expanded box has unchanged center, and 1.25x width and height. Required Keys: - keypoint - img_shape Modified Keys: - img_shape - keypoint Added Keys: - crop_quadruple Args: padding (float): The padding size. Defaults to 0.25. threshold (int): The threshold for the tight bounding box. If the width or height of the tight bounding box is smaller than the threshold, we do not perform the compact operation. Defaults to 10. hw_ratio (float | tuple[float] | None): The hw_ratio of the expanded box. Float indicates the specific ratio and tuple indicates a ratio range. If set as None, it means there is no requirement on hw_ratio. Defaults to None. allow_imgpad (bool): Whether to allow expanding the box outside the image to meet the hw_ratio requirement. Defaults to True. """ def __init__(self, padding: float = 0.25, threshold: int = 10, hw_ratio: Optional[Union[float, Tuple[float]]] = None, allow_imgpad: bool = True) -> None: self.padding = padding self.threshold = threshold if hw_ratio is not None: hw_ratio = _pair(hw_ratio) self.hw_ratio = hw_ratio self.allow_imgpad = allow_imgpad assert self.padding >= 0
[docs] def transform(self, results: Dict) -> Dict: """Convert the coordinates of keypoints to make it more compact. Args: results (dict): The resulting dict to be modified and passed to the next transform in pipeline. """ img_shape = results['img_shape'] h, w = img_shape kp = results['keypoint'] # Make NaN zero kp[np.isnan(kp)] = 0. kp_x = kp[..., 0] kp_y = kp[..., 1] min_x = np.min(kp_x[kp_x != 0], initial=np.Inf) min_y = np.min(kp_y[kp_y != 0], initial=np.Inf) max_x = np.max(kp_x[kp_x != 0], initial=-np.Inf) max_y = np.max(kp_y[kp_y != 0], initial=-np.Inf) # The compact area is too small if max_x - min_x < self.threshold or max_y - min_y < self.threshold: return results center = ((max_x + min_x) / 2, (max_y + min_y) / 2) half_width = (max_x - min_x) / 2 * (1 + self.padding) half_height = (max_y - min_y) / 2 * (1 + self.padding) if self.hw_ratio is not None: half_height = max(self.hw_ratio[0] * half_width, half_height) half_width = max(1 / self.hw_ratio[1] * half_height, half_width) min_x, max_x = center[0] - half_width, center[0] + half_width min_y, max_y = center[1] - half_height, center[1] + half_height # hot update if not self.allow_imgpad: min_x, min_y = int(max(0, min_x)), int(max(0, min_y)) max_x, max_y = int(min(w, max_x)), int(min(h, max_y)) else: min_x, min_y = int(min_x), int(min_y) max_x, max_y = int(max_x), int(max_y) kp_x[kp_x != 0] -= min_x kp_y[kp_y != 0] -= min_y new_shape = (max_y - min_y, max_x - min_x) results['img_shape'] = new_shape # the order is x, y, w, h (in [0, 1]), a tuple crop_quadruple = results.get('crop_quadruple', (0., 0., 1., 1.)) new_crop_quadruple = (min_x / w, min_y / h, (max_x - min_x) / w, (max_y - min_y) / h) crop_quadruple = _combine_quadruple(crop_quadruple, new_crop_quadruple) results['crop_quadruple'] = crop_quadruple return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(padding={self.padding}, ' f'threshold={self.threshold}, ' f'hw_ratio={self.hw_ratio}, ' f'allow_imgpad={self.allow_imgpad})') return repr_str
[docs]@TRANSFORMS.register_module() class PreNormalize3D(BaseTransform): """PreNormalize for NTURGB+D 3D keypoints (x, y, z). PreNormalize3D first subtracts the coordinates of each joint from the coordinates of the 'spine' (joint #1 in ntu) of the first person in the first frame. Subsequently, it performs a 3D rotation to fix the Z axis parallel to the 3D vector from the 'hip' (joint #0) and the 'spine' (joint #1) and the X axis toward the 3D vector from the 'right shoulder' (joint #8) and the 'left shoulder' (joint #4). Codes adapted from https://github.com/lshiwjx/2s-AGCN. Required Keys: - keypoint - total_frames (optional) Modified Keys: - keypoint Added Keys: - body_center Args: zaxis (list[int]): The target Z axis for the 3D rotation. Defaults to ``[0, 1]``. xaxis (list[int]): The target X axis for the 3D rotation. Defaults to ``[8, 4]``. align_spine (bool): Whether to perform a 3D rotation to align the spine. Defaults to True. align_shoulder (bool): Whether to perform a 3D rotation to align the shoulder. Defaults to True. align_center (bool): Whether to align the body center. Defaults to True. """ def __init__(self, zaxis: List[int] = [0, 1], xaxis: List[int] = [8, 4], align_spine: bool = True, align_shoulder: bool = True, align_center: bool = True) -> None: self.zaxis = zaxis self.xaxis = xaxis self.align_center = align_center self.align_spine = align_spine self.align_shoulder = align_shoulder
[docs] def unit_vector(self, vector: np.ndarray) -> np.ndarray: """Returns the unit vector of the vector.""" return vector / np.linalg.norm(vector)
[docs] def angle_between(self, v1: np.ndarray, v2: np.ndarray) -> float: """Returns the angle in radians between vectors 'v1' and 'v2'.""" if np.abs(v1).sum() < 1e-6 or np.abs(v2).sum() < 1e-6: return 0 v1_u = self.unit_vector(v1) v2_u = self.unit_vector(v2) return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
[docs] def rotation_matrix(self, axis: np.ndarray, theta: float) -> np.ndarray: """Returns the rotation matrix associated with counterclockwise rotation about the given axis by theta radians.""" if np.abs(axis).sum() < 1e-6 or np.abs(theta) < 1e-6: return np.eye(3) axis = np.asarray(axis) axis = axis / np.sqrt(np.dot(axis, axis)) a = np.cos(theta / 2.0) b, c, d = -axis * np.sin(theta / 2.0) aa, bb, cc, dd = a * a, b * b, c * c, d * d bc, ad, ac, ab, bd, cd = b * c, a * d, a * c, a * b, b * d, c * d return np.array([[aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)], [2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)], [2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc]])
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`PreNormalize3D`. Args: results (dict): The result dict. Returns: dict: The result dict. """ skeleton = results['keypoint'] total_frames = results.get('total_frames', skeleton.shape[1]) M, T, V, C = skeleton.shape assert T == total_frames if skeleton.sum() == 0: return results index0 = [ i for i in range(T) if not np.all(np.isclose(skeleton[0, i], 0)) ] assert M in [1, 2] if M == 2: index1 = [ i for i in range(T) if not np.all(np.isclose(skeleton[1, i], 0)) ] if len(index0) < len(index1): skeleton = skeleton[:, np.array(index1)] skeleton = skeleton[[1, 0]] else: skeleton = skeleton[:, np.array(index0)] else: skeleton = skeleton[:, np.array(index0)] T_new = skeleton.shape[1] if self.align_center: if skeleton.shape[2] == 25: main_body_center = skeleton[0, 0, 1].copy() else: main_body_center = skeleton[0, 0, -1].copy() mask = ((skeleton != 0).sum(-1) > 0)[..., None] skeleton = (skeleton - main_body_center) * mask if self.align_spine: joint_bottom = skeleton[0, 0, self.zaxis[0]] joint_top = skeleton[0, 0, self.zaxis[1]] axis = np.cross(joint_top - joint_bottom, [0, 0, 1]) angle = self.angle_between(joint_top - joint_bottom, [0, 0, 1]) matrix_z = self.rotation_matrix(axis, angle) skeleton = np.einsum('abcd,kd->abck', skeleton, matrix_z) if self.align_shoulder: joint_rshoulder = skeleton[0, 0, self.xaxis[0]] joint_lshoulder = skeleton[0, 0, self.xaxis[1]] axis = np.cross(joint_rshoulder - joint_lshoulder, [1, 0, 0]) angle = self.angle_between(joint_rshoulder - joint_lshoulder, [1, 0, 0]) matrix_x = self.rotation_matrix(axis, angle) skeleton = np.einsum('abcd,kd->abck', skeleton, matrix_x) results['keypoint'] = skeleton results['total_frames'] = T_new results['body_center'] = main_body_center return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'zaxis={self.zaxis}, ' f'xaxis={self.xaxis}, ' f'align_center={self.align_center}, ' f'align_spine={self.align_spine}, ' f'align_shoulder={self.align_shoulder})') return repr_str
[docs]@TRANSFORMS.register_module() class PreNormalize2D(BaseTransform): """Normalize the range of keypoint values. Required Keys: - keypoint - img_shape (optional) Modified Keys: - keypoint Args: img_shape (tuple[int, int]): The resolution of the original video. Defaults to ``(1080, 1920)``. """ def __init__(self, img_shape: Tuple[int, int] = (1080, 1920)) -> None: self.img_shape = img_shape
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`PreNormalize2D`. Args: results (dict): The result dict. Returns: dict: The result dict. """ h, w = results.get('img_shape', self.img_shape) results['keypoint'][..., 0] = \ (results['keypoint'][..., 0] - (w / 2)) / (w / 2) results['keypoint'][..., 1] = \ (results['keypoint'][..., 1] - (h / 2)) / (h / 2) return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'img_shape={self.img_shape})') return repr_str
[docs]@TRANSFORMS.register_module() class JointToBone(BaseTransform): """Convert the joint information to bone information. Required Keys: - keypoint Modified Keys: - keypoint Args: dataset (str): Define the type of dataset: 'nturgb+d', 'openpose', 'coco'. Defaults to ``'nturgb+d'``. target (str): The target key for the bone information. Defaults to ``'keypoint'``. """ def __init__(self, dataset: str = 'nturgb+d', target: str = 'keypoint') -> None: self.dataset = dataset self.target = target if self.dataset not in ['nturgb+d', 'openpose', 'coco']: raise ValueError( f'The dataset type {self.dataset} is not supported') if self.dataset == 'nturgb+d': self.pairs = [(0, 1), (1, 20), (2, 20), (3, 2), (4, 20), (5, 4), (6, 5), (7, 6), (8, 20), (9, 8), (10, 9), (11, 10), (12, 0), (13, 12), (14, 13), (15, 14), (16, 0), (17, 16), (18, 17), (19, 18), (21, 22), (20, 20), (22, 7), (23, 24), (24, 11)] elif self.dataset == 'openpose': self.pairs = ((0, 0), (1, 0), (2, 1), (3, 2), (4, 3), (5, 1), (6, 5), (7, 6), (8, 2), (9, 8), (10, 9), (11, 5), (12, 11), (13, 12), (14, 0), (15, 0), (16, 14), (17, 15)) elif self.dataset == 'coco': self.pairs = ((0, 0), (1, 0), (2, 0), (3, 1), (4, 2), (5, 0), (6, 0), (7, 5), (8, 6), (9, 7), (10, 8), (11, 0), (12, 0), (13, 11), (14, 12), (15, 13), (16, 14))
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`JointToBone`. Args: results (dict): The result dict. Returns: dict: The result dict. """ keypoint = results['keypoint'] M, T, V, C = keypoint.shape bone = np.zeros((M, T, V, C), dtype=np.float32) assert C in [2, 3] for v1, v2 in self.pairs: bone[..., v1, :] = keypoint[..., v1, :] - keypoint[..., v2, :] if C == 3 and self.dataset in ['openpose', 'coco']: score = (keypoint[..., v1, 2] + keypoint[..., v2, 2]) / 2 bone[..., v1, 2] = score results[self.target] = bone return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'dataset={self.dataset}, ' f'target={self.target})') return repr_str
[docs]@TRANSFORMS.register_module() class ToMotion(BaseTransform): """Convert the joint information or bone information to corresponding motion information. Required Keys: - keypoint Added Keys: - motion Args: dataset (str): Define the type of dataset: 'nturgb+d', 'openpose', 'coco'. Defaults to ``'nturgb+d'``. source (str): The source key for the joint or bone information. Defaults to ``'keypoint'``. target (str): The target key for the motion information. Defaults to ``'motion'``. """ def __init__(self, dataset: str = 'nturgb+d', source: str = 'keypoint', target: str = 'motion') -> None: self.dataset = dataset self.source = source self.target = target
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`ToMotion`. Args: results (dict): The result dict. Returns: dict: The result dict. """ data = results[self.source] M, T, V, C = data.shape motion = np.zeros_like(data) assert C in [2, 3] motion[:, :T - 1] = np.diff(data, axis=1) if C == 3 and self.dataset in ['openpose', 'coco']: score = (data[:, :T - 1, :, 2] + data[:, 1:, :, 2]) / 2 motion[:, :T - 1, :, 2] = score results[self.target] = motion return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'dataset={self.dataset}, ' f'source={self.source}, ' f'target={self.target})') return repr_str
[docs]@TRANSFORMS.register_module() class MergeSkeFeat(BaseTransform): """Merge multi-stream features. Args: feat_list (list[str]): The list of the keys of features. Defaults to ``['keypoint']``. target (str): The target key for the merged multi-stream information. Defaults to ``'keypoint'``. axis (int): The axis along which the features will be joined. Defaults to -1. """ def __init__(self, feat_list: List[str] = ['keypoint'], target: str = 'keypoint', axis: int = -1) -> None: self.feat_list = feat_list self.target = target self.axis = axis
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`MergeSkeFeat`. Args: results (dict): The result dict. Returns: dict: The result dict. """ feats = [] for name in self.feat_list: feats.append(results.pop(name)) feats = np.concatenate(feats, axis=self.axis) results[self.target] = feats return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'feat_list={self.feat_list}, ' f'target={self.target}, ' f'axis={self.axis})') return repr_str
[docs]@TRANSFORMS.register_module() class GenSkeFeat(BaseTransform): """Unified interface for generating multi-stream skeleton features. Required Keys: - keypoint - keypoint_score (optional) Args: dataset (str): Define the type of dataset: 'nturgb+d', 'openpose', 'coco'. Defaults to ``'nturgb+d'``. feats (list[str]): The list of the keys of features. Defaults to ``['j']``. axis (int): The axis along which the features will be joined. Defaults to -1. """ def __init__(self, dataset: str = 'nturgb+d', feats: List[str] = ['j'], axis: int = -1) -> None: self.dataset = dataset self.feats = feats self.axis = axis ops = [] if 'b' in feats or 'bm' in feats: ops.append(JointToBone(dataset=dataset, target='b')) ops.append(KeyMapper(remapping={'keypoint': 'j'})) if 'jm' in feats: ops.append(ToMotion(dataset=dataset, source='j', target='jm')) if 'bm' in feats: ops.append(ToMotion(dataset=dataset, source='b', target='bm')) ops.append(MergeSkeFeat(feat_list=feats, axis=axis)) self.ops = Compose(ops)
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`GenSkeFeat`. Args: results (dict): The result dict. Returns: dict: The result dict. """ if 'keypoint_score' in results and 'keypoint' in results: assert self.dataset != 'nturgb+d' assert results['keypoint'].shape[ -1] == 2, 'Only 2D keypoints have keypoint_score. ' keypoint = results.pop('keypoint') keypoint_score = results.pop('keypoint_score') results['keypoint'] = np.concatenate( [keypoint, keypoint_score[..., None]], -1) return self.ops(results)
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'dataset={self.dataset}, ' f'feats={self.feats}, ' f'axis={self.axis})') return repr_str
[docs]@TRANSFORMS.register_module() class UniformSampleFrames(BaseTransform): """Uniformly sample frames from the video. To sample an n-frame clip from the video. UniformSampleFrames basically divide the video into n segments of equal length and randomly sample one frame from each segment. To make the testing results reproducible, a random seed is set during testing, to make the sampling results deterministic. Required Keys: - total_frames - start_index (optional) Added Keys: - frame_inds - frame_interval - num_clips - clip_len Args: clip_len (int): Frames of each sampled output clip. num_clips (int): Number of clips to be sampled. Defaults to 1. test_mode (bool): Store True when building test or validation dataset. Defaults to False. seed (int): The random seed used during test time. Defaults to 255. """ def __init__(self, clip_len: int, num_clips: int = 1, test_mode: bool = False, seed: int = 255) -> None: self.clip_len = clip_len self.num_clips = num_clips self.test_mode = test_mode self.seed = seed def _get_train_clips(self, num_frames: int, clip_len: int) -> np.ndarray: """Uniformly sample indices for training clips. Args: num_frames (int): The number of frames. clip_len (int): The length of the clip. Returns: np.ndarray: The sampled indices for training clips. """ all_inds = [] for clip_idx in range(self.num_clips): if num_frames < clip_len: start = np.random.randint(0, num_frames) inds = np.arange(start, start + clip_len) elif clip_len <= num_frames < 2 * clip_len: basic = np.arange(clip_len) inds = np.random.choice( clip_len + 1, num_frames - clip_len, replace=False) offset = np.zeros(clip_len + 1, dtype=np.int32) offset[inds] = 1 offset = np.cumsum(offset) inds = basic + offset[:-1] else: bids = np.array( [i * num_frames // clip_len for i in range(clip_len + 1)]) bsize = np.diff(bids) bst = bids[:clip_len] offset = np.random.randint(bsize) inds = bst + offset all_inds.append(inds) return np.concatenate(all_inds) def _get_test_clips(self, num_frames: int, clip_len: int) -> np.ndarray: """Uniformly sample indices for testing clips. Args: num_frames (int): The number of frames. clip_len (int): The length of the clip. Returns: np.ndarray: The sampled indices for testing clips. """ np.random.seed(self.seed) all_inds = [] for i in range(self.num_clips): if num_frames < clip_len: start_ind = i if num_frames < self.num_clips \ else i * num_frames // self.num_clips inds = np.arange(start_ind, start_ind + clip_len) elif clip_len <= num_frames < clip_len * 2: basic = np.arange(clip_len) inds = np.random.choice( clip_len + 1, num_frames - clip_len, replace=False) offset = np.zeros(clip_len + 1, dtype=np.int64) offset[inds] = 1 offset = np.cumsum(offset) inds = basic + offset[:-1] else: bids = np.array( [i * num_frames // clip_len for i in range(clip_len + 1)]) bsize = np.diff(bids) bst = bids[:clip_len] offset = np.random.randint(bsize) inds = bst + offset all_inds.append(inds) return np.concatenate(all_inds)
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`UniformSampleFrames`. Args: results (dict): The result dict. Returns: dict: The result dict. """ num_frames = results['total_frames'] if self.test_mode: inds = self._get_test_clips(num_frames, self.clip_len) else: inds = self._get_train_clips(num_frames, self.clip_len) inds = np.mod(inds, num_frames) start_index = results.get('start_index', 0) inds = inds + start_index if 'keypoint' in results: kp = results['keypoint'] assert num_frames == kp.shape[1] num_person = kp.shape[0] num_persons = [num_person] * num_frames for i in range(num_frames): j = num_person - 1 while j >= 0 and np.all(np.abs(kp[j, i]) < 1e-5): j -= 1 num_persons[i] = j + 1 transitional = [False] * num_frames for i in range(1, num_frames - 1): if num_persons[i] != num_persons[i - 1]: transitional[i] = transitional[i - 1] = True if num_persons[i] != num_persons[i + 1]: transitional[i] = transitional[i + 1] = True inds_int = inds.astype(np.int64) coeff = np.array([transitional[i] for i in inds_int]) inds = (coeff * inds_int + (1 - coeff) * inds).astype(np.float32) results['frame_inds'] = inds.astype(np.int32) results['clip_len'] = self.clip_len results['frame_interval'] = None results['num_clips'] = self.num_clips return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'clip_len={self.clip_len}, ' f'num_clips={self.num_clips}, ' f'test_mode={self.test_mode}, ' f'seed={self.seed})') return repr_str
[docs]@TRANSFORMS.register_module() class PadTo(BaseTransform): """Sample frames from the video. To sample an n-frame clip from the video, PadTo samples the frames from zero index, and loop or zero pad the frames if the length of video frames is less than the value of `length`. Required Keys: - keypoint - total_frames - start_index (optional) Modified Keys: - keypoint - total_frames Args: length (int): The maximum length of the sampled output clip. mode (str): The padding mode. Defaults to ``'loop'``. """ def __init__(self, length: int, mode: str = 'loop') -> None: self.length = length assert mode in ['loop', 'zero'] self.mode = mode
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`PadTo`. Args: results (dict): The result dict. Returns: dict: The result dict. """ total_frames = results['total_frames'] assert total_frames <= self.length start_index = results.get('start_index', 0) inds = np.arange(start_index, start_index + self.length) inds = np.mod(inds, total_frames) keypoint = results['keypoint'][:, inds].copy() if self.mode == 'zero': keypoint[:, total_frames:] = 0 results['keypoint'] = keypoint results['total_frames'] = self.length return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'length={self.length}, ' f'mode={self.mode})') return repr_str
[docs]@TRANSFORMS.register_module() class PoseDecode(BaseTransform): """Load and decode pose with given indices. Required Keys: - keypoint - total_frames (optional) - frame_inds (optional) - offset (optional) - keypoint_score (optional) Modified Keys: - keypoint - keypoint_score (optional) """ @staticmethod def _load_kp(kp: np.ndarray, frame_inds: np.ndarray) -> np.ndarray: """Load keypoints according to sampled indexes.""" return kp[:, frame_inds].astype(np.float32) @staticmethod def _load_kpscore(kpscore: np.ndarray, frame_inds: np.ndarray) -> np.ndarray: """Load keypoint scores according to sampled indexes.""" return kpscore[:, frame_inds].astype(np.float32)
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`PoseDecode`. Args: results (dict): The result dict. Returns: dict: The result dict. """ if 'total_frames' not in results: results['total_frames'] = results['keypoint'].shape[1] if 'frame_inds' not in results: results['frame_inds'] = np.arange(results['total_frames']) if results['frame_inds'].ndim != 1: results['frame_inds'] = np.squeeze(results['frame_inds']) offset = results.get('offset', 0) frame_inds = results['frame_inds'] + offset if 'keypoint_score' in results: results['keypoint_score'] = self._load_kpscore( results['keypoint_score'], frame_inds) results['keypoint'] = self._load_kp(results['keypoint'], frame_inds) return results
def __repr__(self) -> str: repr_str = f'{self.__class__.__name__}()' return repr_str
[docs]@TRANSFORMS.register_module() class MMUniformSampleFrames(UniformSampleFrames): """Uniformly sample frames from the multi-modal data."""
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`MMUniformSampleFrames`. Args: results (dict): The result dict. Returns: dict: The result dict. """ num_frames = results['total_frames'] modalities = [] for modality, clip_len in self.clip_len.items(): if self.test_mode: inds = self._get_test_clips(num_frames, clip_len) else: inds = self._get_train_clips(num_frames, clip_len) inds = np.mod(inds, num_frames) results[f'{modality}_inds'] = inds.astype(np.int32) modalities.append(modality) results['clip_len'] = self.clip_len results['frame_interval'] = None results['num_clips'] = self.num_clips if not isinstance(results['modality'], list): # should override results['modality'] = modalities return results
[docs]@TRANSFORMS.register_module() class MMDecode(DecordInit, DecordDecode, PoseDecode): """Decode RGB videos and skeletons.""" def __init__(self, io_backend: str = 'disk', **kwargs) -> None: DecordInit.__init__(self, io_backend=io_backend, **kwargs) DecordDecode.__init__(self) self.io_backend = io_backend self.kwargs = kwargs self.file_client = None
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`MMDecode`. Args: results (dict): The result dict. Returns: dict: The result dict. """ for mod in results['modality']: if results[f'{mod}_inds'].ndim != 1: results[f'{mod}_inds'] = np.squeeze(results[f'{mod}_inds']) frame_inds = results[f'{mod}_inds'] if mod == 'RGB': if 'filename' not in results: results['filename'] = results['frame_dir'] + '.mp4' video_reader = self._get_video_reader(results['filename']) imgs = self._decord_load_frames(video_reader, frame_inds) del video_reader results['imgs'] = imgs elif mod == 'Pose': assert 'keypoint' in results if 'keypoint_score' not in results: keypoint_score = [ np.ones(keypoint.shape[:-1], dtype=np.float32) for keypoint in results['keypoint'] ] results['keypoint_score'] = np.stack(keypoint_score) results['keypoint'] = self._load_kp(results['keypoint'], frame_inds) results['keypoint_score'] = self._load_kpscore( results['keypoint_score'], frame_inds) else: raise NotImplementedError( f'MMDecode: Modality {mod} not supported') # We need to scale human keypoints to the new image size if 'imgs' in results and 'keypoint' in results: real_img_shape = results['imgs'][0].shape[:2] if real_img_shape != results['img_shape']: oh, ow = results['img_shape'] nh, nw = real_img_shape assert results['keypoint'].shape[-1] in [2, 3] results['keypoint'][..., 0] *= (nw / ow) results['keypoint'][..., 1] *= (nh / oh) results['img_shape'] = real_img_shape results['original_shape'] = real_img_shape return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(' f'io_backend={self.io_backend})') return repr_str
[docs]@TRANSFORMS.register_module() class MMCompact(BaseTransform): """Convert the coordinates of keypoints and crop the images to make them more compact. Required Keys: - imgs - keypoint - img_shape Modified Keys: - imgs - keypoint - img_shape Args: padding (float): The padding size. Defaults to 0.25. threshold (int): The threshold for the tight bounding box. If the width or height of the tight bounding box is smaller than the threshold, we do not perform the compact operation. Defaults to 10. hw_ratio (float | tuple[float]): The hw_ratio of the expanded box. Float indicates the specific ratio and tuple indicates a ratio range. If set as None, it means there is no requirement on hw_ratio. Defaults to 1. allow_imgpad (bool): Whether to allow expanding the box outside the image to meet the hw_ratio requirement. Defaults to True. """ def __init__(self, padding: float = 0.25, threshold: int = 10, hw_ratio: Union[float, Tuple[float]] = 1, allow_imgpad: bool = True) -> None: self.padding = padding self.threshold = threshold if hw_ratio is not None: hw_ratio = _pair(hw_ratio) self.hw_ratio = hw_ratio self.allow_imgpad = allow_imgpad assert self.padding >= 0 def _get_box(self, keypoint: np.ndarray, img_shape: Tuple[int]) -> Tuple: """Calculate the bounding box surrounding all joints in the frames.""" h, w = img_shape kp_x = keypoint[..., 0] kp_y = keypoint[..., 1] min_x = np.min(kp_x[kp_x != 0], initial=np.Inf) min_y = np.min(kp_y[kp_y != 0], initial=np.Inf) max_x = np.max(kp_x[kp_x != 0], initial=-np.Inf) max_y = np.max(kp_y[kp_y != 0], initial=-np.Inf) # The compact area is too small if max_x - min_x < self.threshold or max_y - min_y < self.threshold: return 0, 0, w, h center = ((max_x + min_x) / 2, (max_y + min_y) / 2) half_width = (max_x - min_x) / 2 * (1 + self.padding) half_height = (max_y - min_y) / 2 * (1 + self.padding) if self.hw_ratio is not None: half_height = max(self.hw_ratio[0] * half_width, half_height) half_width = max(1 / self.hw_ratio[1] * half_height, half_width) min_x, max_x = center[0] - half_width, center[0] + half_width min_y, max_y = center[1] - half_height, center[1] + half_height # hot update if not self.allow_imgpad: min_x, min_y = int(max(0, min_x)), int(max(0, min_y)) max_x, max_y = int(min(w, max_x)), int(min(h, max_y)) else: min_x, min_y = int(min_x), int(min_y) max_x, max_y = int(max_x), int(max_y) return min_x, min_y, max_x, max_y def _compact_images(self, imgs: List[np.ndarray], img_shape: Tuple[int], box: Tuple[int]) -> List: """Crop the images acoordding the bounding box.""" h, w = img_shape min_x, min_y, max_x, max_y = box pad_l, pad_u, pad_r, pad_d = 0, 0, 0, 0 if min_x < 0: pad_l = -min_x min_x, max_x = 0, max_x + pad_l w += pad_l if min_y < 0: pad_u = -min_y min_y, max_y = 0, max_y + pad_u h += pad_u if max_x > w: pad_r = max_x - w w = max_x if max_y > h: pad_d = max_y - h h = max_y if pad_l > 0 or pad_r > 0 or pad_u > 0 or pad_d > 0: imgs = [ np.pad(img, ((pad_u, pad_d), (pad_l, pad_r), (0, 0))) for img in imgs ] imgs = [img[min_y:max_y, min_x:max_x] for img in imgs] return imgs
[docs] def transform(self, results: Dict) -> Dict: """The transform function of :class:`MMCompact`. Args: results (dict): The result dict. Returns: dict: The result dict. """ img_shape = results['img_shape'] kp = results['keypoint'] # Make NaN zero kp[np.isnan(kp)] = 0. min_x, min_y, max_x, max_y = self._get_box(kp, img_shape) kp_x, kp_y = kp[..., 0], kp[..., 1] kp_x[kp_x != 0] -= min_x kp_y[kp_y != 0] -= min_y new_shape = (max_y - min_y, max_x - min_x) results['img_shape'] = new_shape results['imgs'] = self._compact_images(results['imgs'], img_shape, (min_x, min_y, max_x, max_y)) return results
def __repr__(self) -> str: repr_str = (f'{self.__class__.__name__}(padding={self.padding}, ' f'threshold={self.threshold}, ' f'hw_ratio={self.hw_ratio}, ' f'allow_imgpad={self.allow_imgpad})') return repr_str