EasyMocap/myeasymocap/backbone/basetopdown.py
2023-07-10 22:10:41 +08:00

309 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from os.path import join
import numpy as np
import cv2
import torch
import torch.nn as nn
import pickle
import math
def rotate_2d(pt_2d, rot_rad):
x = pt_2d[0]
y = pt_2d[1]
sn, cs = np.sin(rot_rad), np.cos(rot_rad)
xx = x * cs - y * sn
yy = x * sn + y * cs
return np.array([xx, yy], dtype=np.float32)
def gen_trans_from_patch_cv(c_x, c_y, src_width, src_height, dst_width, dst_height, scale, rot, inv=False):
# augment size with scale
src_w = src_width * scale
src_h = src_height * scale
src_center = np.zeros(2)
src_center[0] = c_x
src_center[1] = c_y # np.array([c_x, c_y], dtype=np.float32)
# augment rotation
rot_rad = np.pi * rot / 180
src_downdir = rotate_2d(np.array([0, src_h * 0.5], dtype=np.float32), rot_rad)
src_rightdir = rotate_2d(np.array([src_w * 0.5, 0], dtype=np.float32), rot_rad)
dst_w = dst_width
dst_h = dst_height
dst_center = np.array([dst_w * 0.5, dst_h * 0.5], dtype=np.float32)
dst_downdir = np.array([0, dst_h * 0.5], dtype=np.float32)
dst_rightdir = np.array([dst_w * 0.5, 0], dtype=np.float32)
src = np.zeros((3, 2), dtype=np.float32)
src[0, :] = src_center
src[1, :] = src_center + src_downdir
src[2, :] = src_center + src_rightdir
dst = np.zeros((3, 2), dtype=np.float32)
dst[0, :] = dst_center
dst[1, :] = dst_center + dst_downdir
dst[2, :] = dst_center + dst_rightdir
inv_trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return trans, inv_trans
# TODO: add UDP
def get_warp_matrix(theta, size_input, size_dst, size_target):
"""Calculate the transformation matrix under the constraint of unbiased.
Paper ref: Huang et al. The Devil is in the Details: Delving into Unbiased
Data Processing for Human Pose Estimation (CVPR 2020).
Args:
theta (float): Rotation angle in degrees.
size_input (np.ndarray): Size of input image [w, h].
size_dst (np.ndarray): Size of output image [w, h].
size_target (np.ndarray): Size of ROI in input plane [w, h].
Returns:
np.ndarray: A matrix for transformation.
"""
theta = np.deg2rad(theta)
matrix = np.zeros((2, 3), dtype=np.float32)
scale_x = size_dst[0] / size_target[0]
scale_y = size_dst[1] / size_target[1]
matrix[0, 0] = math.cos(theta) * scale_x
matrix[0, 1] = -math.sin(theta) * scale_x
matrix[0, 2] = scale_x * (-0.5 * size_input[0] * math.cos(theta) +
0.5 * size_input[1] * math.sin(theta) +
0.5 * size_target[0])
matrix[1, 0] = math.sin(theta) * scale_y
matrix[1, 1] = math.cos(theta) * scale_y
matrix[1, 2] = scale_y * (-0.5 * size_input[0] * math.sin(theta) -
0.5 * size_input[1] * math.cos(theta) +
0.5 * size_target[1])
return matrix
def generate_patch_image_cv(cvimg, c_x, c_y, bb_width, bb_height, patch_width, patch_height, do_flip, scale, rot):
trans, inv_trans = gen_trans_from_patch_cv(c_x, c_y, bb_width, bb_height, patch_width, patch_height, scale, rot, inv=False)
img_patch = cv2.warpAffine(cvimg, trans, (int(patch_width), int(patch_height)),
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
return img_patch, trans, inv_trans
def get_single_image_crop_demo(image, bbox, scale=1.2, crop_size=224,
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], fliplr=False):
crop_image, trans, inv_trans = generate_patch_image_cv(
cvimg=image.copy(),
c_x=bbox[0],
c_y=bbox[1],
bb_width=bbox[2],
bb_height=bbox[3],
patch_width=crop_size[0],
patch_height=crop_size[1],
do_flip=False,
scale=scale,
rot=0,
)
if fliplr:
crop_image = cv2.flip(crop_image, 1)
# cv2.imwrite('debug_crop.jpg', crop_image[:,:,::-1])
# cv2.imwrite('debug_crop_full.jpg', image[:,:,::-1])
crop_image = crop_image.transpose(2,0,1)
mean1=np.array(mean, dtype=np.float32).reshape(3,1,1)
std1= np.array(std, dtype=np.float32).reshape(3,1,1)
crop_image = (crop_image.astype(np.float32))/255.
# _max = np.max(abs(crop_image))
# crop_image = np.divide(crop_image, _max)
crop_image = (crop_image - mean1)/std1
return crop_image, inv_trans
def xyxy2ccwh(bbox):
w = bbox[:, 2] - bbox[:, 0]
h = bbox[:, 3] - bbox[:, 1]
cx = (bbox[:, 2] + bbox[:, 0])/2
cy = (bbox[:, 3] + bbox[:, 1])/2
return np.stack([cx, cy, w, h], axis=1)
class BaseTopDownModel(nn.Module):
def __init__(self, bbox_scale, res_input,
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
super().__init__()
self.bbox_scale = bbox_scale
if not isinstance(res_input, list):
res_input = [res_input, res_input]
self.crop_size = res_input
self.mean = mean
self.std = std
def load_checkpoint(self, model, state_dict, prefix, strict):
state_dict_new = {}
for key, val in state_dict.items():
if key.startswith(prefix):
key_new = key.replace(prefix, '')
state_dict_new[key_new] = val
model.load_state_dict(state_dict_new, strict=strict)
def infer(self, image, bbox, to_numpy=False, flips=None):
if isinstance(image, str):
image = cv2.imread(image)
img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
squeeze = False
if len(bbox.shape) == 1:
bbox = bbox[None]
squeeze = True
# TODO: 兼容多张图片的
bbox = xyxy2ccwh(bbox)
# convert the bbox to the aspect of input bbox
aspect_ratio = self.crop_size[1] / self.crop_size[0]
w, h = bbox[:, 2], bbox[:, 3]
# 如果height大于w*ratio那么增大w
flag = h > aspect_ratio * w
bbox[flag, 2] = h[flag] / aspect_ratio
# 否则增大h
bbox[~flag, 3] = w[~flag] * aspect_ratio
inputs = []
inv_trans_ = []
for i in range(bbox.shape[0]):
if flips is None:
fliplr=False
else:
fliplr=flips[i]
norm_img, inv_trans = get_single_image_crop_demo(
img,
bbox[i],
scale=self.bbox_scale,
crop_size=self.crop_size,
mean=self.mean,
std=self.std,
fliplr=fliplr
)
inputs.append(norm_img)
inv_trans_.append(inv_trans)
if False:
vis = np.hstack(inputs)
mean, std = np.array(self.mean), np.array(self.std)
mean = mean.reshape(3, 1, 1)
std = std.reshape(3, 1, 1)
vis = (vis * std) + mean
vis = vis.transpose(1, 2, 0)
vis = (vis[:, :, ::-1] * 255).astype(np.uint8)
cv2.imwrite('debug_crop.jpg', vis)
inputs = np.stack(inputs)
inv_trans_ = np.stack(inv_trans_)
inputs = torch.FloatTensor(inputs).to(self.device)
with torch.no_grad():
output = self.model(inputs)
if squeeze:
for key, val in output.items():
output[key] = val[0]
if to_numpy:
for key, val in output.items():
if torch.is_tensor(val):
output[key] = val.detach().cpu().numpy()
output['inv_trans'] = inv_trans_
return output
@staticmethod
def batch_affine_transform(points, trans):
# points: (Bn, J, 2), trans: (Bn, 2, 3)
points = np.dstack((points[..., :2], np.ones((*points.shape[:-1], 1))))
out = np.matmul(points, trans.swapaxes(-1, -2))
return out
class BaseTopDownModelCache(BaseTopDownModel):
def __init__(self, name, **kwargs):
super().__init__(**kwargs)
self.name = name
def cachename(self, imgname):
basename = os.sep.join(imgname.split(os.sep)[-2:])
cachename = join(self.output, self.name, basename.replace('.jpg', '.pkl'))
return cachename
def dump(self, cachename, output):
os.makedirs(os.path.dirname(cachename), exist_ok=True)
with open(cachename, 'wb') as f:
pickle.dump(output, f)
return output
def load(self, cachename):
with open(cachename, 'rb') as f:
output = pickle.load(f)
return output
def __call__(self, bbox, images, imgname, flips=None):
cachename = self.cachename(imgname)
if os.path.exists(cachename):
output = self.load(cachename)
else:
output = self.infer(images, bbox, to_numpy=True, flips=flips)
output = self.dump(cachename, output)
ret = {
'params': output
}
return ret
# post processing
def get_max_preds(batch_heatmaps):
'''
get predictions from score maps
heatmaps: numpy.ndarray([batch_size, num_joints, height, width])
'''
assert isinstance(batch_heatmaps, np.ndarray), \
'batch_heatmaps should be numpy.ndarray'
assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'
batch_size = batch_heatmaps.shape[0]
num_joints = batch_heatmaps.shape[1]
width = batch_heatmaps.shape[3]
heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
idx = np.argmax(heatmaps_reshaped, 2)
maxvals = np.amax(heatmaps_reshaped, 2)
maxvals = maxvals.reshape((batch_size, num_joints, 1))
idx = idx.reshape((batch_size, num_joints, 1))
preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
preds[:, :, 0] = (preds[:, :, 0]) % width
preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)
pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2))
pred_mask = pred_mask.astype(np.float32)
preds *= pred_mask
return preds, maxvals
def get_preds_from_heatmaps(batch_heatmaps):
coords, maxvals = get_max_preds(batch_heatmaps)
heatmap_height = batch_heatmaps.shape[2]
heatmap_width = batch_heatmaps.shape[3]
# post-processing
if True:
for n in range(coords.shape[0]):
for p in range(coords.shape[1]):
hm = batch_heatmaps[n][p]
px = int(math.floor(coords[n][p][0] + 0.5))
py = int(math.floor(coords[n][p][1] + 0.5))
if 1 < px < heatmap_width-1 and 1 < py < heatmap_height-1:
diff = np.array(
[
hm[py][px+1] - hm[py][px-1],
hm[py+1][px]-hm[py-1][px]
]
)
coords[n][p] += np.sign(diff) * .25
coords = coords.astype(np.float32) * 4
pred = np.dstack((coords, maxvals))
return pred
def gdown_models(ckpt, url):
print('Try to download model from {} to {}'.format(url, ckpt))
os.makedirs(os.path.dirname(ckpt), exist_ok=True)
cmd = 'gdown "{}" -O {}'.format(url, ckpt)
print('\n', cmd, '\n')
os.system(cmd)