EasyMocap/easymocap/estimator/HRNet/hrnet_api.py

527 lines
21 KiB
Python
Raw Permalink Normal View History

2022-08-22 00:07:46 +08:00
'''
@ Date: 2020-06-04 12:47:04
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-04-19 17:02:57
@ Author: Qing Shuai
@ Mail: s_q@zju.edu.cn
'''
from os.path import join
import cv2
import numpy as np
import torch
from torchvision.transforms import transforms
from .hrnet import HRNet
COCO17_IN_BODY25 = [0,16,15,18,17,5,2,6,3,7,4,12,9,13,10,14,11]
pairs = [[1, 8], [1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [8, 9], [9, 10], [10, 11], [8, 12], [12, 13], [13, 14], [1, 0], [0,15], [15,17], [0,16], [16,18], [14,19], [19,20], [14,21], [11,22], [22,23], [11,24]]
def coco17tobody25(points2d):
kpts = np.zeros((points2d.shape[0], 25, 3))
kpts[:, COCO17_IN_BODY25, :2] = points2d[:, :, :2]
kpts[:, COCO17_IN_BODY25, 2:3] = points2d[:, :, 2:3]
kpts[:, 8, :2] = kpts[:, [9, 12], :2].mean(axis=1)
kpts[:, 8, 2] = kpts[:, [9, 12], 2].min(axis=1)
kpts[:, 1, :2] = kpts[:, [2, 5], :2].mean(axis=1)
kpts[:, 1, 2] = kpts[:, [2, 5], 2].min(axis=1)
# 需要交换一下
# kpts = kpts[:, :, [1,0,2]]
return kpts
# 生成高斯核
def generate_gauss(sigma):
tmp_size = sigma * 3
size = 2 * tmp_size + 1
x = np.arange(0, size, 1, np.float32)
y = x[:, np.newaxis]
x0 = y0 = size // 2
# The gaussian is not normalized, we want the center value to equal 1
g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * sigma ** 2))
return g, tmp_size
gauss = {}
for SIGMA in range(1, 5):
gauss_kernel, gauss_radius = generate_gauss(SIGMA)
gauss[SIGMA] = {
'kernel': gauss_kernel,
'radius': gauss_radius
}
def box_to_center_scale(box, model_image_width, model_image_height, scale_factor=1.25):
"""convert a box to center,scale information required for pose transformation
Parameters
----------
box : list of tuple
list of length 2 with two tuples of floats representing
bottom left and top right corner of a box
model_image_width : int
model_image_height : int
Returns
-------
(numpy array, numpy array)
Two numpy arrays, coordinates for the center of the box and the scale of the box
"""
center = np.zeros((2), dtype=np.float32)
bottom_left_corner = (box[0], box[1])
top_right_corner = (box[2], box[3])
box_width = top_right_corner[0]-bottom_left_corner[0]
box_height = top_right_corner[1]-bottom_left_corner[1]
bottom_left_x = bottom_left_corner[0]
bottom_left_y = bottom_left_corner[1]
center[0] = bottom_left_x + box_width * 0.5
center[1] = bottom_left_y + box_height * 0.5
aspect_ratio = model_image_width * 1.0 / model_image_height
pixel_std = 200
if box_width > aspect_ratio * box_height:
box_height = box_width * 1.0 / aspect_ratio
elif box_width < aspect_ratio * box_height:
box_width = box_height * aspect_ratio
scale = np.array(
[box_width * 1.0 / pixel_std, box_height * 1.0 / pixel_std],
dtype=np.float32)
scale = scale * scale_factor
return center, scale
def get_dir(src_point, rot_rad):
sn, cs = np.sin(rot_rad), np.cos(rot_rad)
src_result = [0, 0]
src_result[0] = src_point[0] * cs - src_point[1] * sn
src_result[1] = src_point[0] * sn + src_point[1] * cs
return src_result
def get_3rd_point(a, b):
direct = a - b
return b + np.array([-direct[1], direct[0]], dtype=np.float32)
def get_affine_transform(
center, scale, rot, output_size,
shift=np.array([0, 0], dtype=np.float32), inv=0
):
if not isinstance(scale, np.ndarray) and not isinstance(scale, list):
print(scale)
scale = np.array([scale, scale])
scale_tmp = scale * 200.0
src_w = scale_tmp[0]
dst_w = output_size[0]
dst_h = output_size[1]
rot_rad = np.pi * rot / 180
src_dir = get_dir([0, src_w * -0.5], rot_rad)
dst_dir = np.array([0, dst_w * -0.5], np.float32)
src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center + scale_tmp * shift
src[1, :] = center + src_dir + scale_tmp * shift
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])
if inv:
trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return trans
def get_max_preds(batch_heatmaps):
'''
get predictions from score maps
heatmaps: numpy.ndarray([batch_size, num_joints, height, width])
'''
assert isinstance(batch_heatmaps, np.ndarray), \
'batch_heatmaps should be numpy.ndarray'
assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'
batch_size = batch_heatmaps.shape[0]
num_joints = batch_heatmaps.shape[1]
width = batch_heatmaps.shape[3]
heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
idx = np.argmax(heatmaps_reshaped, 2)
maxvals = np.amax(heatmaps_reshaped, 2)
maxvals = maxvals.reshape((batch_size, num_joints, 1))
idx = idx.reshape((batch_size, num_joints, 1))
preds = np.tile(idx, (1, 1, 2)).astype(np.float32)
preds[:, :, 0] = (preds[:, :, 0]) % width
preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)
pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2))
pred_mask = pred_mask.astype(np.float32)
preds *= pred_mask
return preds, maxvals
def affine_transform(pt, t):
new_pt = np.array([pt[0], pt[1], 1.]).T
new_pt = np.dot(t, new_pt)
return new_pt[:2]
def batch_affine_transform(points, trans):
points = np.hstack((points[:, :2], np.ones((points.shape[0], 1))))
out = points @ trans.T
return out
def transform_preds(coords, center, scale, rot, output_size):
target_coords = np.zeros(coords.shape)
trans = get_affine_transform(center, scale, rot, output_size, inv=1)
target_coords[:, :2] = batch_affine_transform(coords, trans)
return target_coords
config_ = {'kintree': [[1, 0], [2, 0], [3, 1], [4, 2], [5, 0], [6, 0], [7, 5], [8, 6], [9, 7], [10, 8], [11, 5], [12, 6], [13, 11], [
14, 12], [15, 13], [16, 14], [6, 5], [12, 11]], 'color': ['g', 'r', 'g', 'r', 'g', 'r', 'g', 'r', 'g', 'r', 'g', 'r', 'g', 'r', 'g', 'r', 'k', 'k']}
colors_table = {
# colorblind/print/copy safe:
'_blue': [0.65098039, 0.74117647, 0.85882353],
'_pink': [.9, .7, .7],
'_mint': [ 166/255., 229/255., 204/255.],
'_mint2': [ 202/255., 229/255., 223/255.],
'_green': [ 153/255., 216/255., 201/255.],
'_green2': [ 171/255., 221/255., 164/255.],
'_red': [ 251/255., 128/255., 114/255.],
'_orange': [ 253/255., 174/255., 97/255.],
'_yellow': [ 250/255., 230/255., 154/255.],
'r':[255/255,0,0],
'g':[0,255/255,0],
'b':[0,0,255/255],
'k':[0,0,0],
'y':[255/255,255/255,0],
'purple':[128/255,0,128/255]
}
for key, val in colors_table.items():
colors_table[key] = tuple([int(val[2]*255), int(val[1]*255), int(val[0]*255)])
def save_batch_heatmaps(batch_image, batch_heatmaps, file_name,
normalize=True):
'''
batch_image: [batch_size, channel, height, width]
batch_heatmaps: ['batch_size, num_joints, height, width]
file_name: saved file name
'''
if normalize:
batch_image = batch_image.clone()
min = float(batch_image.min())
max = float(batch_image.max())
batch_image.add_(-min).div_(max - min + 1e-5)
batch_size = batch_heatmaps.size(0)
num_joints = batch_heatmaps.size(1)
heatmap_height = batch_heatmaps.size(2)
heatmap_width = batch_heatmaps.size(3)
grid_image = np.zeros((batch_size*heatmap_height,
(num_joints+2)*heatmap_width,
3),
dtype=np.uint8)
preds, maxvals = get_max_preds(batch_heatmaps.detach().cpu().numpy())
for i in range(batch_size):
image = batch_image[i].mul(255)\
.clamp(0, 255)\
.byte()\
.permute(1, 2, 0)\
.cpu().numpy()
heatmaps = batch_heatmaps[i].mul(255)\
.clamp(0, 255)\
.byte()\
.cpu().numpy()
resized_image = cv2.resize(image,
(int(heatmap_width), int(heatmap_height)))
resized_image_copy = resized_image.copy()
height_begin = heatmap_height * i
height_end = heatmap_height * (i + 1)
for ip in range(len(config_['kintree'])):
src, dst = config_['kintree'][ip]
c = config_['color'][ip]
if maxvals[i][src] < 0.1 or maxvals[i][dst] < 0.1:
continue
plot_line(resized_image_copy, preds[i][src], preds[i][dst], colors_table[c], 1)
for j in range(num_joints):
cv2.circle(resized_image,
(int(preds[i][j][0]), int(preds[i][j][1])),
1, [0, 0, 255], 1)
heatmap = heatmaps[j, :, :]
mask = (heatmap > 0.1)[:,:,None]
colored_heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
masked_image = (colored_heatmap*0.7 + resized_image*0.3)*mask + resized_image*(1-mask)
cv2.circle(masked_image,
(int(preds[i][j][0]), int(preds[i][j][1])),
1, [0, 0, 255], 1)
width_begin = heatmap_width * (j+2)
width_end = heatmap_width * (j+2+1)
grid_image[height_begin:height_end, width_begin:width_end, :] = \
masked_image
# grid_image[height_begin:height_end, width_begin:width_end, :] = \
# colored_heatmap*0.7 + resized_image*0.3
grid_image[height_begin:height_end, 0:heatmap_width, :] = resized_image
grid_image[height_begin:height_end, heatmap_width:heatmap_width+heatmap_width, :] = resized_image_copy
cv2.imwrite(file_name, grid_image)
import math
def get_final_preds(batch_heatmaps, center, scale, rot=None, flip=None):
coords, maxvals = get_max_preds(batch_heatmaps)
heatmap_height = batch_heatmaps.shape[2]
heatmap_width = batch_heatmaps.shape[3]
# post-processing
if True:
for n in range(coords.shape[0]):
for p in range(coords.shape[1]):
hm = batch_heatmaps[n][p]
px = int(math.floor(coords[n][p][0] + 0.5))
py = int(math.floor(coords[n][p][1] + 0.5))
if 1 < px < heatmap_width-1 and 1 < py < heatmap_height-1:
diff = np.array(
[
hm[py][px+1] - hm[py][px-1],
hm[py+1][px]-hm[py-1][px]
]
)
coords[n][p] += np.sign(diff) * .25
preds = coords.copy()
# Transform back
for i in range(coords.shape[0]):
if flip is not None:
if flip[i]:
coords[i, :, 0] = heatmap_width - 1 - coords[i, :, 0]
if rot is None:
_rot = 0
else:
_rot = rot[i]
preds[i] = transform_preds(
coords[i], center[i], scale[i], _rot, [heatmap_width, heatmap_height]
)
return preds, maxvals
def get_gaussian_maps(net_out, keypoints, sigma):
radius, kernel = gauss[sigma]['radius'], gauss[sigma]['kernel']
weights = np.ones(net_out.shape, dtype=np.float32)
for i in range(weights.shape[0]):
for nj in range(weights.shape[1]):
if keypoints[i][nj][2] < 0:
weights[i][nj] = 0
continue
elif keypoints[i][nj][2] < 0.01:
weights[i][nj] = 0
continue
weights[i][nj] = 0
mu_x, mu_y = keypoints[i][nj][:2]
mu_x, mu_y = int(mu_x + 0.5), int(mu_y + 0.5)
# Usable gaussian range
ul = [mu_x - radius, mu_y - radius]
br = [mu_x + radius + 1, mu_y + radius + 1]
# Usable gaussian range
g_x = max(0, -ul[0]), min(br[0], weights.shape[3]) - ul[0]
g_y = max(0, -ul[1]), min(br[1], weights.shape[2]) - ul[1]
# Image range
img_x = max(0, ul[0]), min(br[0], weights.shape[3])
img_y = max(0, ul[1]), min(br[1], weights.shape[2])
weights[i][nj][img_y[0]:img_y[1], img_x[0]:img_x[1]] = \
kernel[g_y[0]:g_y[1], g_x[0]:g_x[1]]
return weights
humanId = 0
class SimpleHRNet:
def __init__(self, c, nof_joints, checkpoint_path, device, resolution=(288, 384),):
self.device = device
self.c = c
self.nof_joints = nof_joints
self.checkpoint_path = checkpoint_path
self.max_batch_size = 64
self.resolution = resolution # in the form (height, width) as in the original implementation
self.transform = transforms.Compose([
# transforms.ToPILImage(),
# transforms.Resize((self.resolution[0], self.resolution[1])), # (height, width)
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
self.model = HRNet(c=c, nof_joints=nof_joints).to(device)
self.model.load_state_dict(torch.load(checkpoint_path, map_location=self.device))
self.model.eval()
def __call__(self, image, bboxes, rot=0, net_out=False):
# image:
images = torch.zeros((len(bboxes), 3, self.resolution[1], self.resolution[0]), device=self.device) # (height, width)
if len(bboxes) > 0:
# pose estimation : for multiple people
centers, scales, trans_all = [], [], []
for box in bboxes:
center, scale = box_to_center_scale(box, self.resolution[0], self.resolution[1])
centers.append(center)
scales.append(scale)
trans = get_affine_transform(center, scale, rot=rot, output_size=self.resolution)
trans_all.append(trans)
for i, trans in enumerate(trans_all):
# Crop smaller image of people
model_input = cv2.warpAffine(
image, trans,
(int(self.resolution[0]), int(self.resolution[1])),
flags=cv2.INTER_LINEAR)
# cv2.imshow('input', model_input)
# cv2.waitKey(0)
# hwc -> 1chw
model_input = self.transform(model_input)#.unsqueeze(0)
images[i] = model_input
images = images.to(self.device)
with torch.no_grad():
out = self.model(images)
out = out.cpu().detach().numpy()
if net_out:
return out, trans_all, centers, scales, rot
coords, max_val = get_final_preds(
out,
np.asarray(centers),
np.asarray(scales),
[rot for _ in range(out.shape[0])])
pts = np.concatenate((coords, max_val), axis=2)
return coco17tobody25(pts)
else:
return np.empty(0, 25, 3)
def predict_with_previous(self, image, bboxes, keypoints, sigma):
# (batch, nJoints, height, width)
net_out, trans_all, centers, scales, rot = self.__call__(image, bboxes, net_out=True)
keypoints = keypoints[:, COCO17_IN_BODY25]
keypoints_rescale = keypoints.copy()
for i in range(keypoints.shape[0]):
keypoints_rescale[..., :2] = batch_affine_transform(keypoints[i], trans_all[i])/4
weights = get_gaussian_maps(net_out, keypoints_rescale, sigma)
out = net_out * weights
coords, max_val = get_final_preds(
out,
np.asarray(centers),
np.asarray(scales),
rot)
pts = np.concatenate((coords, max_val), axis=2)
return coco17tobody25(pts)
def predict(self, image, detections, keypoints=None, ret_crop=False):
if keypoints is not None:
keypoints = keypoints[:, COCO17_IN_BODY25]
kpts_rescale = [None for _ in range(len(keypoints))]
boxes = []
rotation = 0
image_pose = image
# image_pose = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
if detections is not None:
images = torch.zeros((len(detections), 3, self.resolution[1], self.resolution[0]), device=self.device) # (height, width)
# pose estimation : for multiple people
centers = []
scales = []
for box in detections:
center, scale = box_to_center_scale(box, self.resolution[0], self.resolution[1])
centers.append(center)
scales.append(scale)
model_inputs = []
for i, (center, scale) in enumerate(zip(centers, scales)):
trans = get_affine_transform(center, scale, rotation, self.resolution)
# Crop smaller image of people
model_input = cv2.warpAffine(
image_pose,
trans,
(int(self.resolution[0]), int(self.resolution[1])),
flags=cv2.INTER_LINEAR)
if keypoints is not None:
kpts_homo = keypoints[i].copy()
kpts_homo[:, 2] = 1
kpts_rescale[i] = (kpts_homo @ trans.T)/4
# global humanId
# cv2.imwrite('../output/debughrnet/person_{}.jpg'.format(humanId), model_input[:,:,[2,1,0]])
# humanId += 1
# hwc -> 1chw
model_input = self.transform(model_input)#.unsqueeze(0)
images[i] = model_input
# torch.cuda.synchronize(self.device)
# print(' - spending {:.2f}ms in preprocess.'.format(1000*(time.time() - start)))
if images.shape[0] == 0:
return np.empty((0, 25, 3))
else:
# start = time.time()
images = images.to(self.device)
# torch.cuda.synchronize(self.device)
# print(' - spending {:.2f}ms in copy to cuda.'.format(1000*(time.time() - start)))
# start = time.time()
with torch.no_grad():
if len(images) <= self.max_batch_size:
out = self.model(images)
else:
out = torch.empty(
(images.shape[0], self.nof_joints, self.resolution[1] // 4, self.resolution[0] // 4)
).to(self.device)
for i in range(0, len(images), self.max_batch_size):
out[i:i + self.max_batch_size] = self.model(images[i:i + self.max_batch_size])
# torch.cuda.synchronize(self.device)
global humanId
if keypoints is not None:
filename = join('../output/debughrnet', '{:06d}.jpg'.format(humanId))
humanId += 1
# save_batch_heatmaps(images, out, filename)
# 制造高斯核默认为1
weights = np.ones(out.shape, dtype=np.float32)
for i in range(weights.shape[0]):
for nj in range(weights.shape[1]):
if keypoints[i][nj][2] < 0:
weights[i][nj] = 0
continue
elif keypoints[i][nj][2] < 0.01:
continue
weights[i][nj] = 0
mu_x, mu_y = kpts_rescale[i][nj]
mu_x, mu_y = int(mu_x + 0.5), int(mu_y + 0.5)
# Usable gaussian range
ul = [mu_x - gauss_radius, mu_y - gauss_radius]
br = [mu_x + gauss_radius + 1, mu_y + gauss_radius + 1]
# Usable gaussian range
g_x = max(0, -ul[0]), min(br[0], weights.shape[3]) - ul[0]
g_y = max(0, -ul[1]), min(br[1], weights.shape[2]) - ul[1]
# Image range
img_x = max(0, ul[0]), min(br[0], weights.shape[3])
img_y = max(0, ul[1]), min(br[1], weights.shape[2])
weights[i][nj][img_y[0]:img_y[1], img_x[0]:img_x[1]] = \
gauss_kernel[g_y[0]:g_y[1], g_x[0]:g_x[1]]
filename = join('../output/debughrnet', '{:06d}.jpg'.format(humanId))
humanId += 1
# save_batch_heatmaps(images, torch.Tensor(weights), filename)
out = out.cpu().detach().numpy()
out = out * weights
filename = join('../output/debughrnet', '{:06d}.jpg'.format(humanId))
humanId += 1
# save_batch_heatmaps(images, torch.Tensor(out), filename)
else:
out = out.cpu().detach().numpy()
coords, max_val = get_final_preds(
out,
np.asarray(centers),
np.asarray(scales))
pts = np.concatenate((coords, max_val), axis=2)
# torch.cuda.synchronize(self.device)
# print(' - spending {:.2f}ms in postprocess.'.format(1000*(time.time() - start)))
# print('')
if ret_crop:
return coco17tobody25(pts), images
else:
return coco17tobody25(pts)