EasyMocap/easymocap/multistage/fitting.py
2023-02-10 08:10:48 -06:00

1772 lines
78 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
@ Date: 2022-03-22 16:11:44
@ Author: Qing Shuai
@ Mail: s_q@zju.edu.cn
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-07-25 11:51:50
@ FilePath: /EasyMocapPublic/easymocap/multistage/fitting.py
'''
# This function provides a realtime fitting interface
from collections import namedtuple
from time import time, sleep
import numpy as np
import cv2
import torch
import copy
from ..config.baseconfig import load_object_from_cmd
from ..mytools.debug_utils import log, mywarn
from ..mytools import Timer
from ..config import Config
from ..mytools.triangulator import iterative_triangulate
from ..bodymodel.base import Params
from .torchgeometry import axis_angle_to_euler, euler_to_axis_angle
def batch_rodrigues(rot_vecs, epsilon=1e-8, dtype=torch.float32):
''' Calculates the rotation matrices for a batch of rotation vectors
Parameters
----------
rot_vecs: torch.tensor Nx3
array of N axis-angle vectors
Returns
-------
R: torch.tensor Nx3x3
The rotation matrices for the given axis-angle parameters
'''
batch_size = rot_vecs.shape[0]
device = rot_vecs.device
angle = torch.norm(rot_vecs + 1e-8, dim=1, keepdim=True)
rot_dir = rot_vecs / angle
cos = torch.unsqueeze(torch.cos(angle), dim=1)
sin = torch.unsqueeze(torch.sin(angle), dim=1)
# Bx1 arrays
rx, ry, rz = torch.split(rot_dir, 1, dim=1)
K = torch.zeros((batch_size, 3, 3), dtype=dtype, device=device)
zeros = torch.zeros((batch_size, 1), dtype=dtype, device=device)
K = torch.cat([zeros, -rz, ry, rz, zeros, -rx, -ry, rx, zeros], dim=1) \
.view((batch_size, 3, 3))
ident = torch.eye(3, dtype=dtype, device=device).unsqueeze(dim=0)
rot_mat = ident + sin * K + (1 - cos) * torch.bmm(K, K)
return rot_mat
from scipy.spatial.transform import Rotation
def aa2euler(aa):
aa = np.array(aa)
R = cv2.Rodrigues(aa)[0]
# res = Rotation.from_dcm(R).as_euler('XYZ', degrees=True)
res = Rotation.from_matrix(R).as_euler('XYZ', degrees=False)
return np.round(res, 2).tolist()
def rotmat2euler(rot):
res = Rotation.from_matrix(rot).as_euler('XYZ', degrees=True)
return res
def euler2rotmat(euler):
res = Rotation.from_euler('XYZ', euler, degrees=True)
return res.as_matrix()
def batch_rodrigues_jacobi(rvec):
shape = rvec.shape
rvec = rvec.view(-1, 3)
device = rvec.device
dSkew = torch.zeros(3, 9, device=device)
dSkew[0, 5] = -1
dSkew[1, 6] = -1
dSkew[2, 1] = -1
dSkew[0, 7] = 1
dSkew[1, 2] = 1
dSkew[2, 3] = 1
dSkew = dSkew[None]
theta = torch.norm(rvec, dim=-1, keepdim=True) + 1e-5
c = torch.cos(theta)
s = torch.sin(theta)
c1 = 1 - c
itheta = 1 / theta
r = rvec / theta
zeros = torch.zeros_like(r[:, :1])
rx, ry, rz = torch.split(r, 1, dim=1)
rrt = torch.matmul(r[:, :, None], r[:, None, :])
skew = torch.cat([zeros, -rz, ry, rz, zeros, -rx, -ry, rx, zeros], dim=1) \
.view((r.shape[0], 3, 3))
I = torch.eye(3, device=rvec.device, dtype=rvec.dtype)[None]
rot_mat = I + s[:, None] * skew + c1[:, None] * torch.bmm(skew, skew)
drrt = torch.stack([
rx + rx, ry, rz, ry, zeros, zeros, rz, zeros, zeros,
zeros, rx, zeros, rx, ry + ry, rz, zeros, rz, zeros,
zeros, zeros, rx, zeros, zeros, ry, rx, ry, rz + rz
], dim=-1).view((r.shape[0], 3, 9))
jacobi = torch.zeros((r.shape[0], 3, 9), device=rvec.device, dtype=rvec.dtype)
for i in range(3):
ri = r[:, i:i+1]
a0 = -s * ri
a1 = (s - 2*c1*itheta)*ri
a2 = c1 * itheta
a3 = (c-s*itheta)*ri
a4 = s * itheta
jaco = a0[:, None] * I + a1[:, None] * rrt + a2[:, None] * drrt[:, i].view(-1, 3, 3) + a3[:, None] * skew + a4[:, None] * dSkew[:, i].view(-1, 3, 3)
jacobi[:, i] = jaco.view(-1, 9)
rot_mat = rot_mat.view(*shape[:-1], 3, 3)
jacobi = jacobi.view(*shape[:-1], 3, 9)
return rot_mat, jacobi
def getJacobianOfRT(rvec, tvec, joints):
# joints: (bn, nJ, 3)
dtype, device = rvec.dtype, rvec.device
bn, nJoints = joints.shape[:2]
# jacobiToRvec: (bn, 3, 9) // tested by OpenCV and PyTorch
Rot, jacobiToRvec = batch_rodrigues_jacobi(rvec)
I3 = torch.eye(3, dtype=dtype, device=device)[None]
# jacobiJ_R: (bn, nJ, 3, 3+3+3) => (bn, nJ, 3, 9)
# // flat by column:
# // x, 0, 0 | y, 0, 0 | z, 0, 0
# // 0, x, 0 | 0, y, 0 | 0, z, 0
# // 0, 0, x | 0, 0, y | 0, 0, z
jacobi_J_R = torch.zeros((bn, nJoints, 3, 9), dtype=dtype, device=device)
jacobi_J_R[:, :, 0, :3] = joints
jacobi_J_R[:, :, 1, 3:6] = joints
jacobi_J_R[:, :, 2, 6:9] = joints
# jacobi_J_rvec: (bn, nJ, 3, 3)
jacobi_J_rvec = torch.matmul(jacobi_J_R, jacobiToRvec[:, None].transpose(-1, -2))
# if True: # 测试自动梯度
# def test_func(rvec):
# Rot = batch_rodrigues(rvec[None])[0]
# joints_new = joints[0] @ Rot.t()
# return joints_new
# jac_J_rvec = torch.autograd.functional.jacobian(test_func, rvec[0])
# my_j = jacobi_joints_RT[0, ..., :3]
# jacobi_J_tvec: (bn, nJx3, 3)
jacobi_J_tvec = I3[None].expand(bn, nJoints, -1, -1)
jacobi_J_rt = torch.cat([jacobi_J_rvec, jacobi_J_tvec], dim=-1)
return Rot, jacobiToRvec, jacobi_J_rt
class Model:
rootIdx = 0
parents = []
INDEX_HALF = [0,1,2,3,4,5,6,7,15,16,17,18]
class LowPassFilter:
def __init__(self):
self.prev_raw_value = None
self.prev_filtered_value = None
def process(self, value, alpha):
if self.prev_raw_value is None:
s = value
else:
s = alpha * value + (1.0 - alpha) * self.prev_filtered_value
self.prev_raw_value = value
self.prev_filtered_value = s
return s
class OneEuroFilter:
def __init__(self, mincutoff=1.0, beta=0.0, dcutoff=1.0, freq=30):
self.freq = freq
self.mincutoff = mincutoff
self.beta = beta
self.dcutoff = dcutoff
self.x_filter = LowPassFilter()
self.dx_filter = LowPassFilter()
def compute_alpha(self, cutoff):
te = 1.0 / self.freq
tau = 1.0 / (2 * np.pi * cutoff)
return 1.0 / (1.0 + tau / te)
def process(self, x):
prev_x = self.x_filter.prev_raw_value
dx = 0.0 if prev_x is None else (x - prev_x) * self.freq
edx = self.dx_filter.process(dx, self.compute_alpha(self.dcutoff))
cutoff = self.mincutoff + self.beta * np.abs(edx)
return self.x_filter.process(x, self.compute_alpha(cutoff))
class BaseBody:
def __init__(self, cfg_triangulator, cfg_model, cfg) -> None:
self.triangulator = load_object_from_cmd(cfg_triangulator, [])
self.body_model = load_object_from_cmd(cfg_model, ['args.use_pose_blending', False, 'args.device', 'cpu'])
self.cfg = cfg
self.register_from_lbs(self.body_model)
def register_from_lbs(self, body_model):
kintree_shape = np.array(self.cfg.shape.kintree)
self.nJoints = body_model.J_regressor.shape[0]
self.k_shapeBlend = body_model.j_shapedirs[self.nJoints:]
self.j_shapeBlend = body_model.j_shapedirs[:self.nJoints]
self.jacobian_limb_shapes = self.k_shapeBlend[kintree_shape[:, 1]] - self.k_shapeBlend[kintree_shape[:, 0]]
self.k_template = body_model.j_v_template[self.nJoints:]
self.j_template = body_model.j_v_template[:self.nJoints]
self.k_weights = body_model.j_weights[self.nJoints:]
self.j_weights = body_model.j_weights[:self.nJoints]
parents = body_model.parents[1:].cpu().numpy()
child = np.arange(1, parents.shape[0]+1, dtype=np.int64)
self.kintree = np.stack([parents, child], axis=1)
self.parents = np.zeros(parents.shape[0]+1, dtype=int) - 1
self.parents[self.kintree[:, 1]] = self.kintree[:, 0]
self.rootIdx = 0
self.time = time()
def rotation_matrix_from_3x3(A):
U, s, Vt = np.linalg.svd(A, full_matrices=False)
V = Vt.T
T = np.dot(V, U.T)
# does the current solution use a reflection?
have_reflection = np.linalg.det(T) < 0
# if that's not what was specified, force another reflection
if have_reflection:
V[:,-1] *= -1
s[-1] *= -1
T = np.dot(V, U.T)
return T
def svd_rot(src, tgt, reflection=False, debug=True):
# optimum rotation matrix of Y
A = np.dot(src.T, tgt)
U, s, Vt = np.linalg.svd(A, full_matrices=False)
V = Vt.T
T = np.dot(V, U.T)
# does the current solution use a reflection?
have_reflection = np.linalg.det(T) < 0
# if that's not what was specified, force another reflection
if reflection != have_reflection:
V[:,-1] *= -1
s[-1] *= -1
T = np.dot(V, U.T)
if debug:
err = np.linalg.norm(tgt - src @ T.T, axis=1)
print('[svd] ', err)
return T
def normalize(vector):
return vector/np.linalg.norm(vector)
def rad_from_2vec(vec1, vec2):
return np.arccos((normalize(vec1)*normalize(vec2)).sum())
def smoothing_factor(t_e, cutoff):
r = 2 * 3.14 * cutoff * t_e
return r / (r + 1)
def exponential_smoothing(a, x, x_prev):
return a * x + (1 - a) * x_prev
FilterResult = namedtuple('FilterResult', ['x', 'dx', 'v', 't'])
class MyFilter:
def __init__(self, key, filled, min_cutoff=1.0, d_cutoff=1.0,
beta=0.1) -> None:
self.key = key
self.fill_result = filled
self.min_cutoff = min_cutoff
self.d_cutoff = d_cutoff
self.beta = beta
self.init = False
self.records = []
self.result = None
self.counter = 0
self.data = []
self.conf = []
self.smooth_record = []
def fill(self, value, conf):
filled = conf < 0.1
if filled.sum() >= 1:
value[filled] = self.fill_result[0][filled]
if self.key == 'Rh':
value = rotation_matrix_from_3x3(value.reshape(3, 3))
value = cv2.Rodrigues(value)[0].reshape(3,)
if (value < 0).all():
value = -value
return value[None]
def __call__(self, value, conf):
self.counter += 1
x, v = value[0], conf[0]
if self.key == 'Rh':
x = cv2.Rodrigues(x)[0].reshape(-1)
v = np.zeros((9,)) + v[0]
t = np.zeros_like(x)
t[v>0.1] = self.counter
self.data.append(x)
self.conf.append(v)
if len(self.smooth_record) == 0:
if self.key == 'Rh':
start = x
else:
# start = self.fill_result[0]
start = x
smoothed = FilterResult(start, np.zeros_like(x), np.zeros_like(x), t)
self.smooth_record.append(smoothed)
if len(self.data) < 3:
return self.fill(x, v)
data = np.array(self.data)
conf = np.array(self.conf)
smoothed = self.smooth_record[-1]
# 预计的速度
dx_new = x - smoothed.x
# 滤波器可见,当前可见
flag_vis = (smoothed.v > 0.1) & (v > 0.1)
# - 速度异常的,移除掉,认为当前帧不可见
flag_outlier = (np.abs(smoothed.dx) > 0.05) & (np.abs(dx_new - smoothed.dx)/(1e-5 + smoothed.dx) > 2.)
if self.key != 'Rh':
v[flag_vis&flag_outlier] = 0.
# 滤波器不可见,当前可见,速度打折,认为是新增的帧
flag_new = (smoothed.v < 0.1)&(v>0.1)
dx_new[flag_new] /= 3
# 滤波器可见,当前不可见,速度使用滤波器的速度
flag_unvis = (v<0.1) & (conf[-2] < 0.1)
dx_new[flag_unvis] = smoothed.dx[flag_unvis]
# 滤波器不可见当前也不可见速度清0
dx_new[(v<0.1)&(smoothed.v<0.1)] = 0.
# 实际估计出来的速度,这里要去掉不可见的地方
# 混合的权重使用 0.7, 0.3默认全部使用新的一半
weight_dx = np.zeros_like(dx_new) + 0.7
dx_smoothed = smoothed.dx*(1-weight_dx) + dx_new*weight_dx
smoothed_value = smoothed.x + dx_smoothed
v_new = smoothed.v.copy()
v_new = v_new * (1-weight_dx) + v*weight_dx
t_new = smoothed.t.copy()
t_new[v>0.1] = t[v>0.1]
smooth_new = FilterResult(smoothed_value, dx_smoothed, v_new, t_new)
self.smooth_record.append(smooth_new)
if self.counter == 1000:
if self.key == 'poses':
import matplotlib.pyplot as plt
xrange = np.arange(0, data.shape[0])
smoothed = np.array([d.x for d in self.smooth_record])
for nj in range(data.shape[1]):
valid = conf[:, nj] > 0.
plt.scatter(xrange[valid], data[valid, nj])
# yhat = savgol_filter(data[:, nj], data.shape[0], 3)
# plt.plot(yhat)
plt.plot(smoothed)
plt.show()
import ipdb;ipdb.set_trace()
# return self.fill(x, v)
return self.fill(smooth_new.x, smooth_new.v)
def __call__0(self, value, conf):
self.counter += 1
x, v = value[0], conf[0]
if self.key == 'Rh':
x = cv2.Rodrigues(x)[0].reshape(-1)
v = np.zeros((9,)) + v[0]
if self.result is None:
result = FilterResult(x, np.zeros_like(x), v, (v>0)*self.counter)
self.result = result
self.records.append(result)
return self.fill(result.x, result.v)
# return self.fill(x, v)
# 维护一个前一帧的去除outlier
prev = self.result
t = prev.t.copy()
t[v>0.] = self.counter
dx = x - prev.x # 这里直接使用与之前的结果的差了,避免多帧不可见,然后速度过大
MAX_DX = 1.
WINDOW = 31
not_valid = ((np.abs(dx) > MAX_DX) & (prev.v > 0.1))|\
(t-prev.t > WINDOW)
v[not_valid] = 0.
x_all = np.stack([r.x for r in self.records[-WINDOW:]])
v_all = np.stack([r.v for r in self.records[-WINDOW:]])
dx_all = np.stack([r.dx for r in self.records[-WINDOW:]])
v_sum = v_all.sum(axis=0)
dx_mean = (dx_all * v_all).sum(axis=0)/(1e-5 + v_all.sum(axis=0))
# if (x_all.shape[0] > 30) & (self.counter % 40 == 0):
if True:
x_mean = (x_all * v_all).sum(axis=0)/(1e-5 + v_all.sum(axis=0))
x_pred = x_mean
dx_pred = np.zeros_like(x_pred)
elif x_all.shape[0] >= 5:
# 进行smooth
axt = np.zeros((2, x_all.shape[1]))
xrange = np.arange(x_all.shape[0]).reshape(-1, 1)
A0 = np.hstack([xrange, np.ones((x_all.shape[0], 1))])
for nj in range(x_all.shape[1]):
conf = v_all[:, nj:nj+1]
if (conf>0.).sum() < 3:
continue
A = conf * A0
b = conf * (x_all[:, nj:nj+1])
est = np.linalg.inv(A.T @ A) @ A.T @ b
axt[:, nj] = est[:, 0]
x_all_smoothed = xrange * axt[0:1] + axt[1:]
x_pred = x_all_smoothed[x_all.shape[0]//2]
dx_pred = axt[0]
else:
x_pred = x_all[x_all.shape[0]//2]
dx_pred = dx_mean
if x_all.shape[0] == 1:
current = FilterResult(x, dx, v, t)
self.records.append(current)
self.result = current
else:
# dx_hat = (dx * v + dx_mean * v_mean)/(v+v_mean+1e-5)
# x_pred = x_mean + dx_hat
# current = FilterResult(x_pred, dx_hat, v, t)
current = FilterResult(x, dx, v, t)
self.records.append(current)
# 使用平均速度模型
self.result = FilterResult(x_pred, dx_pred, v_sum, t)
return self.fill(self.result.x, self.result.v)
def __call__2(self, value, conf):
self.counter += 1
x, v = value[0], conf[0]
if self.result is None:
result = FilterResult(x, np.zeros_like(x), v, (v>0)*self.counter)
self.result = result
return self.fill(result.x, result.v)
prev = self.result
t = prev.t.copy()
t[v>0.] = self.counter
# update t
# dx = (x - prev.x)/(np.maximum(t-prev.t, 1))
dx = x - prev.x # 这里直接使用与之前的结果的差了,避免多帧不可见,然后速度过大
dx_ = dx.copy()
# 判断dx的大小
large_dx = np.abs(dx) > 0.5
if large_dx.sum() > 0:
v[large_dx] = 0.
t[large_dx] = prev.t[large_dx]
dx[large_dx] = 0.
missing_index = ((prev.v > 0.1) & (v < 0.1)) | (t - prev.t > 10)
if missing_index.sum() > 0:
print('missing', missing_index)
new_index = (prev.v < 0.1) & (v > 0.1)
if new_index.sum() > 0:
print('new', new_index)
dx[new_index] = 0.
weight_dx = v/(1e-5+ 3*prev.v + 1*v)
weight_x = v/(1e-5+ 3*prev.v + 1*v)
# 移除速度过大的点
dx_hat = exponential_smoothing(weight_dx, dx, prev.dx)
x_pred = prev.x + dx_hat
x_hat = exponential_smoothing(weight_x, x, x_pred)
dx_real = x_hat - prev.x
# consider the unvisible v
print_val = {
't_pre': prev.t,
'x_inp': x,
'x_pre': prev.x,
'x_new': x_hat,
'dx_inp': dx_,
'dx_pre': prev.dx,
'dx_new': dx_hat,
'dx_real': dx_real,
'v': v,
'v_pre': prev.v,
'w_vel': weight_dx,
'w_x': weight_x
}
for key in print_val.keys():
print('{:7s}'.format(key), end=' ')
print('')
for i in range(x.shape[0]):
for key in print_val.keys():
print('{:7.2f}'.format(print_val[key][i]), end=' ')
print('')
v[missing_index] = prev.v[missing_index] / 1.2 # 衰减系数
result = FilterResult(x_hat, dx_hat, v, t)
self.result = result
return self.fill(result.x, result.v)
if len(self.records) < 10:
self.records.append([self.counter, value, conf])
return self.fill(value[0], conf[0])
if self.x is None:
time = np.vstack([x[0] for x in self.records])
value_pre = np.vstack([x[1] for x in self.records])
conf_pre = np.vstack([x[2] for x in self.records])
conf_sum = conf_pre.sum(axis=0)
value_mean = (value_pre * conf_pre).sum(axis=0)/(conf_sum + 1e-5)
self.x = value_mean
self.x_conf = conf_sum
t_prev = np.zeros_like(self.x, dtype=int) - 1
t_prev[conf_sum>0] = self.counter
self.t_prev = t_prev
# 零速度初始化
self.d_x = np.zeros_like(self.x)
return self.fill(self.x, self.x_conf)
# 假设每帧都传进来的吧
return self.fill(self.x, self.x_conf)
x_est, v_est, conf_est = self.x.copy(), self.d_x.copy(), self.x_conf.copy()
value = value[0]
conf = conf[0]
d_x = value - self.x
t_current = np.zeros_like(self.x, dtype=int) - 1
t_current[conf>0.] = self.counter
t_est = t_current - self.t_prev
# 前一帧有观测当前帧有观测两帧之差在10帧以内。正常更新
flag_vv = (t_current > 0) & (self.t_prev > 0) & \
(t_current - self.t_prev < 10)
# 前一帧无观测;当前帧有观测的;判断为新增的
flag_iv = (self.t_prev < 0) & (t_current > 0)
weight_vel = smoothing_factor(t_est, self.d_cutoff)
# 将观测的速度权重置0
weight_vel[flag_vv] = 0.
vel_hat = exponential_smoothing(weight_vel, d_x, self.d_x)
cutoff = self.min_cutoff + self.beta * np.abs(vel_hat)
weight_value = smoothing_factor(t_est, cutoff)
# 将观测的数值权重置0
weight_value[flag_vv] = 0.
weight_value[flag_iv] = 1. # 当前帧可见的,之前的帧不可见的,直接选择当前帧
vel_hat[flag_iv] = 0.
x_hat = exponential_smoothing(weight_value, value, self.x)
flag_vi = (self.t_prev > 0) & (~flag_vv)
flag_v = flag_vv | flag_vi | flag_iv
# 前一帧有观测;当前帧无观测的;判断为丢失的
x_est[flag_v] = x_hat[flag_v]
v_est[flag_v] = vel_hat[flag_v]
conf_est[flag_v] = (self.x_conf + conf)[flag_v]/2
self.t_prev[flag_v] = self.counter
self.x = x_est
self.d_x = v_est
self.x_conf = conf_est
return self.fill(x_est, conf_est)
class IKBody(BaseBody):
def __init__(self, key, cfg_triangulator, cfg_model, cfg, debug) -> None:
super().__init__(cfg_triangulator, cfg_model, cfg)
self.key = key
self.frame_index = 0
self.frame_latest = 0
self.init = False
self.records = []
self.results = []
self.blank_result = self.make_blank()
self.fill_result = self.make_fill()
self.results_newest = self.blank_result
if True:
self.lefthand = ManoFitterCPPCache('LEFT')
self.righthand = ManoFitterCPPCache('RIGHT')
self.up_vector = 'z'
self.filter = {}
for key in ['Rh', 'poses', 'handl', 'handr']:
self.filter[key] = MyFilter(key, self.fill_result[key])
def make_blank(self):
raise NotImplementedError
def make_fill(self):
raise NotImplementedError
def smooth_results(self, params=None):
results = {'id': 0, 'type': 'smplh_half'}
for key in ['Rh', 'poses', 'handl', 'handr']:
value = self.filter[key](params[key], params[key+'_conf'])
results[key] = value
for key in ['shapes', 'Th']:
results[key] = self.blank_result[key]
return results
def smooth_results_old(self, params=None):
if params is not None:
self.results.append(params)
if len(self.results) < 10:
return params
else:
if len(self.results) < 10:
return self.fill_result
else:
params = self.fill_result
results = {'id': 0}
if False:
for key in ['Rh', 'poses', 'handl', 'handr']:
if not self.filter[key].init:
import ipdb;ipdb.set_trace()
else:
value = self.filter[key](self.results[-1][key])
if True:
for key in ['Rh', 'poses', 'handl', 'handr']:
find = False
for WINDOW in [10, 20, 40]:
if WINDOW > len(self.results):
break
records = self.results[-WINDOW:]
value = np.vstack([r[key] for r in records])
conf = np.vstack([r[key+'_conf'] for r in records])
valid = conf[..., 0] > 0
if valid.sum() < WINDOW // 3:
import ipdb;ipdb.set_trace()
else:
value, conf = value[valid], conf[valid]
mean_value = value.mean(axis=0)
std_value = value.std(axis=0)
valid2 = (np.abs(value - mean_value) < WINDOW//3 * std_value).any(axis=-1)
if valid2.sum() < WINDOW // 4:
continue
find = True
value, conf = value[valid2], conf[valid2]
conf_sum = conf.sum(axis=0)
mean = (value*conf).sum(axis=0)/(conf_sum + 1e-5)
# 计算latest
break
if key in ['poses', 'handl', 'handr']:
conf_sum_p = conf.sum(axis=0)
mean_previous = (value*conf).sum(axis=0)/(conf_sum_p + 1e-5)
mean[conf_sum<0.01] = mean_previous[conf_sum<0.01]
conf_sum[conf_sum<0.01] = conf_sum_p[conf_sum<0.01]
# 使用fill的填值
mean[conf_sum<0.01] = self.fill_result[key][0][conf_sum<0.01]
break
if find:
results[key] = mean[None]
else:
results[key] = self.fill_result[key]
if False: # 均值滤波
for key in ['Rh', 'poses', 'handl', 'handr']:
if key not in params.keys():
continue
if key not in self.cfg.SMOOTH_SIZE.keys():
results[key] = params[key]
records = self.results[-self.cfg.SMOOTH_SIZE[key]:]
value = np.vstack([r[key] for r in records])
conf = np.vstack([r[key+'_conf'] for r in records])
conf_sum = conf.sum(axis=0)
mean = (value*conf).sum(axis=0)/(conf_sum + 1e-5)
# 计算latest
if key in ['poses', 'handl', 'handr']:
records = self.results[-5*self.cfg.SMOOTH_SIZE[key]:]
value = np.vstack([r[key] for r in records])
conf = np.vstack([r[key+'_conf'] for r in records])
conf_sum_p = conf.sum(axis=0)
mean_previous = (value*conf).sum(axis=0)/(conf_sum_p + 1e-5)
mean[conf_sum<0.01] = mean_previous[conf_sum<0.01]
conf_sum[conf_sum<0.01] = conf_sum_p[conf_sum<0.01]
# 使用fill的填值
mean[conf_sum<0.01] = self.fill_result[key][0][conf_sum<0.01]
results[key] = mean[None]
results['Th'] = self.blank_result['Th']
results['shapes'] = self.blank_result['shapes']
return results
def get_keypoints3d(self, records, key=None):
if key is None:
return np.stack([r[self.key] for r in records])
else:
return np.stack([r[key] for r in records])
def check_keypoints(self, keypoints3d):
flag = (keypoints3d[..., -1]>self.cfg.MIN_THRES).sum() > 5
if len(self.records) > 1:
pre = self.records[-1]
k_pre = self.get_keypoints3d([pre])
dist = np.linalg.norm(keypoints3d[..., :3] - k_pre[..., :3], axis=-1)
conf = np.sqrt(keypoints3d[..., 3] * k_pre[..., 3])
dist_mean = (dist * conf).sum()/conf.sum()
flag = flag and dist_mean < 0.1
return flag
def __call__(self, data):
self.frame_index += 1
k3d = self.triangulator(data)[0]
keypoints3d = self.get_keypoints3d([k3d])
flag = self.check_keypoints(keypoints3d)
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
# 1. 初始化过了,但是超出帧数了,清零
# 2. 没有初始化过,超出了,清零
if (self.frame_index - self.frame_latest > 10 and self.init) or not self.init:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
self.results = []
return [self.fill_result]
else:
return [self.smooth_results()]
elif not self.init: # 暂时还没有初始化,先等待
if len(self.records) < 10:
self.records.append(k3d)
return [self.fill_result]
self.records.append(k3d)
flag, params = self.fitting(keypoints3d, self.results_newest)
if not flag:
return [self.fill_result]
self.frame_latest = self.frame_index
# smooth results
results = self.smooth_results(params)
self.results_newest = results
k3d['type'] = 'body25'
return [results, k3d]
class HalfBodyIK(IKBody):
def get_keypoints3d(self, records):
THRES_WRIST = 0.2
keypoints3d = super().get_keypoints3d(records)
keypoints3d = keypoints3d[:, INDEX_HALF]
handl = super().get_keypoints3d(records, key='handl3d')
handr = super().get_keypoints3d(records, key='handr3d')
dist_ll = np.linalg.norm(keypoints3d[:, 7, :3] - handl[:, 0, :3], axis=-1)
dist_rr = np.linalg.norm(keypoints3d[:, 4, :3] - handr[:, 0, :3], axis=-1)
log('Dist left = {}, right = {}'.format(dist_ll, dist_rr))
handl[dist_ll>THRES_WRIST] = 0.
handr[dist_rr>THRES_WRIST] = 0.
keypoints3d = np.hstack([keypoints3d, handl, handr])
conf = keypoints3d[..., 3:]
keypoints3d = np.hstack([(keypoints3d[..., :3] * conf).sum(axis=0)/(1e-5 + conf.sum(axis=0)), conf.min(axis=0)])
keypoints3d = keypoints3d[None]
# if (keypoints3d.shape[0] == 10):
return keypoints3d
def _ik_shoulder(self, keypoints3d, params):
SHOULDER_IDX = [2, 5]
shoulder = keypoints3d[SHOULDER_IDX[1], :3] - keypoints3d[SHOULDER_IDX[0], :3]
if self.up_vector == 'x':
shoulder[..., 0] = 0.
up_vector = np.array([1., 0., 0.], dtype=np.float32)
elif self.up_vector == 'z':
shoulder[..., 2] = 0.
up_vector = np.array([0., 0., 1.], dtype=np.float32)
shoulder = shoulder/np.linalg.norm(shoulder, keepdims=True)
# 限定一下角度范围
theta = -np.rad2deg(np.arctan2(shoulder[1], shoulder[2]))
if (theta < 30 or theta > 150) and False:
return False, params
front = np.cross(shoulder, up_vector)
front = front/np.linalg.norm(front, keepdims=True)
R = np.stack([shoulder, up_vector, front]).T
Rh = cv2.Rodrigues(R)[0].reshape(1, 3)
log('Shoulder:{}'.format(Rh))
params['R'] = R
params['Rh'] = Rh
params['Rh_conf'] = np.zeros((1, 3)) + keypoints3d[SHOULDER_IDX, 3].min()
return True, params
def _ik_head(self, keypoints3d, params):
HEAD_IDX = [0, 8, 9, 10, 11]
HEAD_ROT_IDX = 0
est_points = keypoints3d[HEAD_IDX, :3]
valid = (keypoints3d[HEAD_IDX[0], 3] > self.cfg.MIN_THRES) and (keypoints3d[HEAD_IDX[1:], 3]>self.cfg.MIN_THRES).sum()>=2
if not valid:
params['poses_conf'][:, 3*HEAD_ROT_IDX:3*(HEAD_ROT_IDX+1)] = 0.
return params
params['poses_conf'][:, 3*HEAD_ROT_IDX:3*(HEAD_ROT_IDX+1)] = keypoints3d[HEAD_IDX, 3].sum()
gt_points = self.k_template[HEAD_IDX].numpy()
gt_points = gt_points - gt_points[:1]
est_points = est_points - est_points[:1]
# gt_points = gt_points / np.linalg.norm(gt_points, axis=-1, keepdims=True)
# est_points = est_points / np.linalg.norm(est_points, axis=-1, keepdims=True)
if True:
R_global = svd_rot(gt_points, est_points)
R_local = params['R'].T @ R_global
elif False:
est_points_inv = est_points @ params['R'].T.T
R_local = svd_rot(gt_points, est_points_inv)
else:
gt_points = gt_points @ params['R'].T
R_local = svd_rot(gt_points, est_points)
euler = rotmat2euler(R_local)
euler[0] = euler[0] - 25
# log('euler before filter: {}'.format(euler))
euler[0] = max(min(euler[0], 30), -30)
euler[1] = max(min(euler[1], 60), -60)
euler[2] = max(min(euler[2], 45), -45)
# log('euler after filter: {}'.format(euler))
R_local = euler2rotmat(euler)
R_head = cv2.Rodrigues(R_local)[0].reshape(1, 3)
params['poses'][:, 3*HEAD_ROT_IDX:3*(HEAD_ROT_IDX+1)] = R_head
return params
@staticmethod
def _rad_from_twovec(keypoints3d, start, mid, end, MIN_THRES):
start = keypoints3d[start]
mid = keypoints3d[mid]
end = keypoints3d[end]
if isinstance(end, list):
# dst is a list
if (end[:, 3] > MIN_THRES).sum() < 2:
return 0, 0.
end = np.sum(end * end[:, 3:], axis=0)/(end[:, 3:].sum())
# use its mean to represent the points
conf = [start[3], mid[3], end[3]]
valid = (min(conf) > MIN_THRES).all()
if not valid:
return 0, 0.
conf = sum(conf)
dir_src = normalize(mid[:3] - start[:3])
dir_dst = normalize(end[:3] - mid[:3])
rad = rad_from_2vec(dir_src, dir_dst)
return conf, rad
def _ik_elbow(self, keypoints3d, params):
for name, info in self.cfg.LEAF.items():
conf, rad = self._rad_from_twovec(keypoints3d, info.start, info.mid, info.end, self.cfg.MIN_THRES)
if conf <= 0.: continue
params['poses_conf'][:, 3*info['index']:3*(info['index']+1)] = conf
rad = np.clip(rad, *info['ranges'])
rot = rad*np.array(info['axis']).reshape(1, 3)
params['poses'][:, 3*info['index']:3*(info['index']+1)] = rot
return params
def _ik_arm(self, keypoints3d, params):
# forward一遍获得关键点
# 这里需要确保求解的关节点没有父节点了
template = self.body_model.keypoints({'poses': params['poses'], 'shapes': params['shapes']}, return_tensor=False)[0]
for name, info in self.cfg.NODE.items():
idx = info['children']
conf = keypoints3d[info['children'], 3]
if not (conf >self.cfg.MIN_THRES).all():
continue
params['poses_conf'][:, 3*info['index']:3*(info['index']+1)] = conf.sum()
est_points = keypoints3d[idx, :3]
gt_points = template[idx]
est_points = est_points - est_points[:1]
gt_points = gt_points - gt_points[:1]
R_children = svd_rot(gt_points, est_points)
R_local = params['R'].T @ R_children
euler = rotmat2euler(R_local)
# log('euler {} before filter: {}'.format(name, euler))
# euler[0] = max(min(euler[0], 90), -90)
# euler[1] = max(min(euler[1], 90), -90)
# euler[2] = max(min(euler[2], 90), -90)
# log('euler {} after filter: {}'.format(name, euler))
R_local = euler2rotmat(euler)
params['poses'][:, 3*info['index']:3*(info['index']+1)] = cv2.Rodrigues(R_local)[0].reshape(1, 3)
return params
def _ik_palm(self, keypoints3d, params):
# template = self.body_model.keypoints({'poses': params['poses'], 'shapes': params['shapes']}, return_tensor=False)[0]
T_joints, _ = self.body_model.transform({'poses': params['poses'], 'shapes': params['shapes']}, return_vertices=False)
T_joints = T_joints[0].cpu().numpy()
for name, info in self.cfg.PALM.items():
# 计算手掌的朝向
est_points = keypoints3d[:, :3]
est_conf = keypoints3d[:, 3]
if est_conf[info.children].min() < self.cfg.MIN_THRES:
continue
# 计算朝向
dir0 = normalize(est_points[info.children[1]] - est_points[info.children[0]])
dir1 = normalize(est_points[info.children[-1]] - est_points[info.children[0]])
normal = normalize(np.cross(dir0, dir1))
dir_parent = normalize(est_points[info.parent[1]] - est_points[info.parent[0]])
# 计算夹角
rad = np.arccos((normal * dir_parent).sum()) - np.pi/2
rad = np.clip(rad, *info['ranges'])
rot = rad*np.array(info['axis']).reshape(1, 3)
params['poses'][:, 3*info['index']:3*(info['index']+1)] = rot
# 考虑手肘的朝向;这个时候还差一个绕手肘的朝向的方向的旋转;这个旋转是在手肘扭曲之前的
# 先计算出这个朝向;再转化
R_parents = params['R'] @ T_joints[info.index, :3, :3]
normal_canonical = R_parents.T @ normal.reshape(3, 1)
normal_canonical[0, 0] = 0
normal_canonical = normalize(normal_canonical)
# 在canonical下的投影
# normal_T = np.array([0., -1., 0.])
# trick: 旋转角度的正弦值等于在z轴上的坐标
rad = np.arcsin(normal_canonical[2, 0])
rot_x = np.array([-rad, 0., 0.])
R_x = cv2.Rodrigues(rot_x)[0]
R_elbow = cv2.Rodrigues(params['poses'][:, 3*info.index_elbow:3*info.index_elbow+3])[0]
R_elbow = R_elbow @ R_x
params['poses'][:, 3*info.index_elbow:3*info.index_elbow+3] = cv2.Rodrigues(R_elbow)[0].reshape(1, 3)
return params
def _ik_hand(self, template, keypoints3d, poses, conf, is_left):
# 计算手的每一段的置信度
poses_full = np.zeros((1, 45))
conf_full = np.zeros((1, 45))
y_axis = np.array([0., 1., 0.])
log('_ik for left: {}'.format(is_left))
for name, info in self.cfg.HAND.LEAF.items():
conf, rad = self._rad_from_twovec(keypoints3d, *info.ranges, self.cfg.MIN_THRES)
if conf <= 0.:
log('- skip: {}'.format(name))
continue
# trick: 手的朝向是反的
rad = - rad
if info.axis == 'auto':
template_dir = template[info.ranges[2]] - template[info.ranges[1]]
# y轴方向设成0
template_dir[1] = 0.
template_dir = normalize(template_dir)
# 计算旋转轴在与z轴的cross方向上
rot_vec = normalize(np.cross(template_dir, y_axis)).reshape(1, 3)
elif info.axis == 'root':
template_dir0 = template[info.ranges[1]] - template[info.ranges[0]]
template_dir1 = template[info.ranges[2]] - template[info.ranges[1]]
template_dir0 = normalize(template_dir0)
template_dir1 = normalize(template_dir1)
costheta0 = (template_dir0 *template_dir1).sum()
# 计算当前的夹角
est_dir0 = keypoints3d[info.ranges[1], :3] - keypoints3d[info.ranges[0], :3]
est_dir1 = keypoints3d[info.ranges[2], :3] - keypoints3d[info.ranges[1], :3]
est_dir0 = normalize(est_dir0)
est_dir1 = normalize(est_dir1)
costheta1 = (est_dir0 * est_dir1).sum()
# trick: 手的旋转角度都是相反的
rad = - np.arccos(np.clip(costheta1/costheta0, 0., 1.))
rot_vec = normalize(np.cross(template_dir1, y_axis)).reshape(1, 3)
log('- get: {}: {:.1f}, {}'.format(name, np.rad2deg(rad), rot_vec))
poses_full[:, 3*info.index:3*info.index+3] = rot_vec * rad
conf_full[:, 3*info.index:3*info.index+3] = conf
# 求解
usePCA = False
if usePCA:
ncomp = 24
lamb = 0.05
if is_left:
A_full = self.body_model.components_full_l[:ncomp].T
mean_full = self.body_model.mean_full_l
else:
A_full = self.body_model.components_full_r[:ncomp].T
mean_full = self.body_model.mean_full_r
valid = conf_full[0] > 0.
A = A_full[valid, :]
res = (poses_full[:, valid] - mean_full[:, valid]).T
x = np.linalg.inv(A.T @ A + lamb * np.eye(ncomp)) @ A.T @ res
poses_full = (A_full @ x).reshape(1, -1) + mean_full
conf_full = np.zeros_like(poses_full) + valid.sum()
return poses_full, conf_full
def make_blank(self):
params = self.body_model.init_params(1, ret_tensor=False)
params['id'] = 0
params['type'] = 'smplh_half'
params['Th'][0, 0] = 1.
params['Th'][0, 1] = -1
params['Th'][0, 2] = 1.
return params
def make_fill(self):
params = self.body_model.init_params(1, ret_tensor=False)
params['id'] = 0
params['type'] = 'smplh_half'
params['Rh'][0, 2] = -np.pi/2
params['handl'] = self.body_model.mean_full_l
params['handr'] = self.body_model.mean_full_r
params['poses'][0, 3*4+2] = -np.pi/4
params['poses'][0, 3*5+2] = np.pi/4
return params
def fitting(self, keypoints3d, results_pre):
# keypoints3d: (nFrames, nJoints, 4)
# 根据肩膀计算身体朝向
if len(keypoints3d.shape) == 3:
keypoints3d = keypoints3d[0]
params = self.body_model.init_params(1, ret_tensor=False)
params['poses_conf'] = np.zeros_like(params['poses'])
params['handl_conf'] = np.zeros_like(params['handl'])
params['handr_conf'] = np.zeros_like(params['handr'])
params['Rh_conf'] = 0.
params['id'] = 0
flag, params = self._ik_shoulder(keypoints3d, params)
if (params['Rh_conf'] <= 0.01).all():
return False, params
params = self._ik_head(keypoints3d, params)
params = self._ik_elbow(keypoints3d, params)
params = self._ik_arm(keypoints3d, params)
params = self._ik_palm(keypoints3d, params)
if False:
params['handl'], params['handl_conf'] = self._ik_hand(self.k_template[12:12+21].numpy(), keypoints3d[12:12+21], params['handl'], params['handl_conf'], is_left=True)
params['handr'], params['handr_conf'] = self._ik_hand(self.k_template[12+21:12+21+21].numpy(), keypoints3d[12+21:12+21+21], params['handr'], params['handr_conf'], is_left=False)
else:
params_l = self.lefthand(keypoints3d[12:12+21])[0]
params_r = self.righthand(keypoints3d[12+21:12+21+21])[0]
# log('[{:06d}] {}'.format(self.frame_index, params_l['poses'][0]))
# log('[{:06d}] {}'.format(self.frame_index, params_r['poses'][0]))
ncomp = params_l['poses'].shape[1]
A_full = self.body_model.components_full_l[:ncomp].T
mean_full = self.body_model.mean_full_l
poses_full = (A_full @ params_l['poses'].T).T + mean_full
params['handl'] = poses_full
A_full = self.body_model.components_full_r[:ncomp].T
mean_full = self.body_model.mean_full_r
poses_full = (A_full @ params_r['poses'].T).T + mean_full
params['handr'] = poses_full
params['handl_conf'] = np.ones((1, 45))
params['handr_conf'] = np.ones((1, 45))
return True, params
class BaseFitter(BaseBody):
def __init__(self, cfg_triangulator, cfg_model,
INIT_SIZE, WINDOW_SIZE, FITTING_SIZE, SMOOTH_SIZE,
init_dict, fix_dict,
cfg) -> None:
super().__init__(cfg_triangulator, cfg_model, cfg)
self.records = []
self.results = []
self.INIT_SIZE = INIT_SIZE
self.WINDOW_SIZE = WINDOW_SIZE
self.FITTING_SIZE = FITTING_SIZE
self.SMOOTH_SIZE = SMOOTH_SIZE
self.time = 0
self.frame_latest = 0
self.frame_index = 0
self.init = False
self.init_dict = init_dict
self.fix_dict = fix_dict
self.identity_cache = {}
def get_keypoints3d(self, records):
raise NotImplementedError
def get_init_params(self, nFrames):
params = self.body_model.init_params(nFrames, ret_tensor=True)
for key, val in self.init_dict.items():
if key == 'Rh':
import cv2
R = cv2.Rodrigues(params['Rh'][0].cpu().numpy())[0]
for vec in self.init_dict['Rh']:
Rrel = cv2.Rodrigues(np.deg2rad(np.array(vec)))[0]
R = Rrel @ R
Rh = cv2.Rodrigues(R)[0]
params['Rh'] = torch.Tensor(Rh).reshape(-1, 3).repeat(nFrames, 1)
else:
params[key] = torch.Tensor(val).repeat(nFrames, 1)
params['id'] = 0
return params
def add_any_reg(self, val, val0, JTJ, JTr, w):
# loss: (val - val0)
nVals = val.shape[0]
if nVals not in self.identity_cache.keys():
self.identity_cache[nVals] = torch.eye(nVals, device=val.device, dtype=val.dtype)
identity = self.identity_cache[nVals]
JTJ += w * identity
JTr += -w*(val - val0).view(-1, 1)
return 0
def log(self, name, step, delta, res, keys_range=None):
toc = (time() - self.time)*1000
norm_d = torch.norm(delta).item()
norm_f = torch.norm(res).item()
text = '[{}:{:6.2f}]: step = {:3d}, ||delta|| = {:.4f}, ||res|| = {:.4f}'.format(name, toc, step, norm_d, norm_f)
print(text)
self.time = time()
return norm_d, norm_f
def fitShape(self, keypoints3d, params, weight, option):
kintree = np.array(self.cfg.shape.kintree)
nShapes = params['shapes'].shape[-1]
# shapes: (1, 10)
shapes = params['shapes'].T
shapes0 = shapes.clone()
device, dtype = shapes.device, shapes.dtype
lengths3d_est = torch.norm(keypoints3d[:, kintree[:, 1], :3] - keypoints3d[:, kintree[:, 0], :3], dim=-1)
conf = torch.sqrt(keypoints3d[:, kintree[:, 1], 3:] * keypoints3d[:, kintree[:, 0], 3:])
conf = conf.repeat(1, 1, 3).reshape(-1, 1)
nFrames = keypoints3d.shape[0]
# jacobian: (nFrames, nLimbs, 3, nShapes)
jacob_limb_shapes = self.jacobian_limb_shapes[None].repeat(nFrames, 1, 1, 1)
jacob_limb_shapes = jacob_limb_shapes.reshape(-1, nShapes)
# 注意:这里乘到雅克比的应该是 sqrt(conf),这里把两个合并了
JTJ_limb_shapes = jacob_limb_shapes.t() @ (jacob_limb_shapes * conf)
lossnorm = 0
self.time = time()
for iter_ in range(option.max_iter):
# perform shape blending
shape_offset = self.k_shapeBlend @ shapes
keyShaped = self.k_template + shape_offset[..., 0]
JTJ = JTJ_limb_shapes
JTr = torch.zeros((nShapes, 1), device=device, dtype=dtype)
# 并行添加所有的骨架
dir = keyShaped[kintree[:, 1]] - keyShaped[kintree[:, 0]]
dir_normalized = dir / torch.norm(dir, dim=-1, keepdim=True)
# res: (nFrames, nLimbs, 3)
res = lengths3d_est[..., None] * dir_normalized[None] - dir[None]
res = conf * res.reshape(-1, 1)
JTr += jacob_limb_shapes.t() @ res
self.add_any_reg(shapes, shapes0, JTJ, JTr, w=weight.init_shape)
delta = torch.linalg.solve(JTJ, JTr)
shapes += delta
norm_d, norm_f = self.log('shape', iter_, delta, res)
if torch.norm(delta) < option.gtol:
break
if iter_ > 0 and abs(norm_f - lossnorm)/norm_f < option.ftol:
break
lossnorm = norm_f
shapes = shapes.t()
params['shapes'] = shapes
return params, keyShaped
def fitRT(self, keypoints3d, params, weight, option, kpts_index=None):
keys_optimized = ['Rh', 'Th']
if kpts_index is not None:
keypoints3d = keypoints3d[:, kpts_index]
init_dict = {
'Rh': params['Rh'].clone(),
'Th': params['Th'].clone(),
}
init_dict['Rot'] = batch_rodrigues(init_dict['Rh'])
params_dict = {
'Rh': params['Rh'],
'Th': params['Th'],
}
keys_range = {}
for ikey, key in enumerate(keys_optimized):
if ikey == 0:
keys_range[key] = [0, init_dict[key].shape[-1]]
else:
start = keys_range[keys_optimized[ikey-1]][1]
keys_range[key] = [start, start+init_dict[key].shape[-1]]
NUM_FRAME = keys_range[keys_optimized[-1]][1]
kpts = self.body_model.keypoints({'poses': params['poses'], 'shapes': params['shapes']})
bn = keypoints3d.shape[0]
conf = keypoints3d[..., -1:].repeat(1, 1, 3).reshape(bn, -1)
dtype, device = kpts.dtype, kpts.device
w_joints = 1./keypoints3d.shape[-2] * weight.joints
self.time = time()
for iter_ in range(option.max_iter):
Rh, Th = params_dict['Rh'], params_dict['Th']
rot, jacobi_R_rvec, jacobi_joints_RT = getJacobianOfRT(Rh, Th, kpts)
kpts_rt = torch.matmul(kpts, rot.transpose(-1, -2)) + Th[:, None]
# // loss: J_obs - (R @ jest + T) => -dR/drvec - dT/dtvec - Rx(djest/dtheta)
jacobi_keypoints = -jacobi_joints_RT
if kpts_index is not None:
jacobi_keypoints = jacobi_keypoints[:, kpts_index]
kpts_rt = kpts_rt[:, kpts_index]
jacobi_keypoints_flat = jacobi_keypoints.view(bn, -1, jacobi_keypoints.shape[-1])
JTJ_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ (jacobi_keypoints_flat * conf[..., None])
res = conf[..., None] * ((keypoints3d[..., :3] - kpts_rt).view(bn, -1, 1))
JTr_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ res
#
JTJ = torch.eye(bn*NUM_FRAME, device=device, dtype=dtype) * 1e-4
JTr = torch.zeros((bn*NUM_FRAME, 1), device=device, dtype=dtype)
# accumulate loss
for nf in range(bn):
JTJ[nf*NUM_FRAME:(nf+1)*NUM_FRAME, nf*NUM_FRAME:(nf+1)*NUM_FRAME] += w_joints * JTJ_keypoints[nf]
# add regularization for each parameter
for nf in range(bn):
for key in keys_optimized:
start, end = nf*NUM_FRAME + keys_range[key][0], nf*NUM_FRAME + keys_range[key][1]
if key == 'Rh':
# 增加初始化的loss
res_init = rot[nf] - init_dict['Rot'][nf]
JTJ[start:end, start:end] += weight['init_'+key] * jacobi_R_rvec[nf] @ jacobi_R_rvec[nf].T
JTr[start:end] += -weight.init_Rh * jacobi_R_rvec[nf] @ res_init.reshape(-1, 1)
else:
res_init = Th[nf] - init_dict['Th'][nf]
JTJ[start:end, start:end] += weight['init_'+key] * torch.eye(3)
JTr[start:end] += -weight['init_'+key] * res_init.reshape(-1, 1)
JTr += - w_joints * JTr_keypoints.reshape(-1, 1)
# solve
delta = torch.linalg.solve(JTJ, JTr)
norm_d, norm_f = self.log('pose', iter_, delta, res)
if torch.norm(delta) < option.gtol:
break
if iter_ > 0 and abs(norm_f - lossnorm)/norm_f < option.ftol:
break
delta = delta.view(bn, NUM_FRAME)
lossnorm = norm_f
for key, _range in keys_range.items():
if key not in params_dict.keys():continue
params_dict[key] += delta[:, _range[0]:_range[1]]
norm_key = torch.norm(delta[:, _range[0]:_range[1]])
params.update(params_dict)
return params
@staticmethod
def localTransform(J_shaped, poses, rootIdx, kintree):
bn = poses.shape[0]
nThetas = poses.shape[1]//3
localTrans = torch.eye(4, device=poses.device)[None, None].repeat(bn, nThetas, 1, 1)
poses_flat = poses.reshape(-1, 3)
rot_flat = batch_rodrigues(poses_flat)
rot = rot_flat.view(bn, nThetas, 3, 3)
localTrans[:, :, :3, :3] = rot
# set the root
localTrans[:, rootIdx, :3, 3] = J_shaped[rootIdx].view(1, 3)
# relTrans: (nKintree, 3)
relTrans = J_shaped[kintree[:, 1]] - J_shaped[kintree[:, 0]]
localTrans[:, kintree[:, 1], :3, 3] = relTrans[None]
return localTrans
@staticmethod
def globalTransform(localTrans, rootIdx, kintree):
# localTrans: (bn, nJoints, 4, 4)
globalTrans = localTrans.clone()
# set the root
for (parent, child) in kintree:
globalTrans[:, child] = globalTrans[:, parent] @ localTrans[:, child]
return globalTrans
def jacobi_GlobalTrans_theta(self, poses, j_shaped, rootIdx, kintree,
device, dtype):
parents = self.parents
start = time()
tic = lambda x: print('-> [{:20s}]: {:.3f}ms'.format(x, 1000*(time()-start)))
localTrans = self.localTransform(j_shaped, poses, rootIdx, kintree)
# tic('local trans')
globalTrans = self.globalTransform(localTrans, rootIdx, kintree)
# tic('global trans')
# 计算localTransformation
poses_flat = poses.view(poses.shape[0], -1, 3)
# jacobi_R_rvec: (bn, nJ, 3, 9)
Rot, jacobi_R_rvec = batch_rodrigues_jacobi(poses_flat)
bn, nJoints = localTrans.shape[:2]
dGlobalTrans_template = torch.zeros((bn, nJoints, 4, 4), device=device, dtype=dtype)
# compute global transformation
# results: global transformation to each theta: (bn, nJ, 4, 4, nTheta)
jacobi_GlobalTrans_theta = torch.zeros((bn, nJoints, 4, 4, nJoints*3), device=device, dtype=dtype)
# tic('rodrigues')
for djIdx in range(1, nJoints):
if djIdx in self.cfg.IGNORE_JOINTS: continue
# // 第djIdx个轴角的第dAxis个维度
for dAxis in range(3):
if dAxis in self.cfg.IGNORE_AXIS.get(str(djIdx), []): continue
# if(model->frozenJoint[3*djIdx+dAxis])continue;
# // 从上至下堆叠起来
dGlobalTrans = dGlobalTrans_template.clone()
# // 将local的映射过来
dGlobalTrans[:, djIdx, :3, :3] = jacobi_R_rvec[:, djIdx, dAxis].view(bn, 3, 3)
if djIdx != rootIdx:
# // d(R0 @ R1)/dt1 = R0 @ dR1/dt1, 这里的R0是上一级所有的累积因此使用全局的
parent = parents[djIdx]
dGlobalTrans[:, djIdx] = globalTrans[:, parent] @ dGlobalTrans[:, djIdx]
valid = np.zeros(nJoints, dtype=np.bool)
valid[djIdx] = True
# tic('current {}'.format(djIdx))
# // 遍历骨架树: 将对当前theta的导数传递下去
for (src, dst) in kintree:
# // 当前处理的关节为子节点的不用考虑
if dst == djIdx: continue
# if dst in self.cfg.IGNORE_JOINTS:continue
valid[dst] = valid[src]
if valid[src]:
# // 如果父节点是有效的: d(R0 @ R1)/dt0 = dR0/dt0 @ R1这里的R1是当前的局部的因此使用local的
dGlobalTrans[:, dst] = dGlobalTrans[:, src] @ localTrans[:, dst]
# tic('forward {}'.format(djIdx))
jacobi_GlobalTrans_theta[..., 3*djIdx+dAxis] = dGlobalTrans
# tic('jacobia')
return globalTrans, jacobi_GlobalTrans_theta
def fitPose(self, keypoints3d, params, weight, option, kpts_index=None):
# preprocess input data
if kpts_index is not None:
keypoints3d = keypoints3d[:, kpts_index]
bn = keypoints3d.shape[0]
conf = keypoints3d[..., -1:].repeat(1, 1, 3).reshape(bn, -1)
if (conf > 0.3).sum() < 4:
print('skip')
return params
w_joints = 1./keypoints3d.shape[-2] * weight.joints
# pre-calculate the shape
Rh, Th, poses = params['Rh'], params['Th'], params['poses']
init_dict = {
'Rh': Rh.clone(),
'Th': Th.clone(),
'poses': poses.clone()
}
init_dict['Rot'] = batch_rodrigues(init_dict['Rh'])
zero_dict = {key:torch.zeros_like(val) for key, val in init_dict.items()}
keys_optimized = ['Rh', 'Th', 'poses']
keys_range = {}
for ikey, key in enumerate(keys_optimized):
if ikey == 0:
keys_range[key] = [0, init_dict[key].shape[-1]]
else:
start = keys_range[keys_optimized[ikey-1]][1]
keys_range[key] = [start, start+init_dict[key].shape[-1]]
NUM_FRAME = keys_range[keys_optimized[-1]][1]
# calculate J_shaped
shapes_t = params['shapes'].t()
shape_offset = self.j_shapeBlend @ shapes_t
# jshaped: (nJoints, 3)
j_shaped = self.j_template + shape_offset[..., 0]
shape_offset = self.k_shapeBlend @ shapes_t
# kshaped: (nJoints, 3)
k_shaped = self.k_template + shape_offset[..., 0]
# optimize parameters
nJoints = j_shaped.shape[0]
dtype, device = poses.dtype, poses.device
lossnorm = 0
self.time = time()
for iter_ in range(option.max_iter):
# forward the model
# 0. poses => full poses
def tic(name):
print('[{:20}] {:.3f}ms'.format(name, 1000*(time()-self.time)))
if 'handl' in params.keys():
poses_full = self.body_model.extend_poses(poses, params['handl'], params['handr'])
jacobi_posesful_poses = self.body_model.jacobian_posesfull_poses_
else:
poses_full = self.body_model.extend_poses(poses)
jacobi_posesful_poses = self.body_model.jacobian_posesfull_poses(poses, poses_full)
# tic('jacobian poses full')
# 1. poses => local transformation => global transformation(bn, nJ, 4, 4)
globalTrans, jacobi_GlobalTrans_theta = self.jacobi_GlobalTrans_theta(poses_full, j_shaped, self.rootIdx, self.kintree, device, dtype)
# tic('global transform')
# 2. global transformation => relative transformation => final transformation
relTrans = globalTrans.clone()
relTrans[..., :3, 3:] -= torch.matmul(globalTrans[..., :3, :3], j_shaped[None, :, :, None])
relTrans_weight = torch.einsum('kj,fjab->fkab', self.k_weights, relTrans)
jacobi_relTrans_theta = jacobi_GlobalTrans_theta.clone()
# // consider topRight: T - RT0: add -dRT0/dt
# rot: (bn, nJoints, 3, 3, nThetas) @ (bn, nJoints, 1, 3, 1) => (bn, nJoints, 3, nThetas)
rot = jacobi_GlobalTrans_theta[..., :3, :3, :]
j0 = j_shaped.reshape(1, nJoints, 1, 3, 1).expand(bn, -1, -1, -1, -1)
jacobi_relTrans_theta[..., :3, 3, :] -= torch.sum(rot*j0, dim=-2)
jacobi_blendtrans_theta = torch.einsum('kj,fjabt->fkabt', self.k_weights, jacobi_relTrans_theta)
kpts = torch.einsum('fkab,kb->fka', relTrans_weight[..., :3, :3], k_shaped) + relTrans_weight[..., :3, 3]
# d(RJ0 + J1)/dtheta = d(R)/dtheta @ J0 + dJ1/dtheta
rot = jacobi_blendtrans_theta[..., :3, :3, :]
k0 = k_shaped.reshape(1, k_shaped.shape[0], 1, 3, 1).expand(bn, -1, -1, -1, -1)
# jacobi_keypoints_theta: (bn, nKeypoints, 3, nThetas)
jacobi_keypoints_theta = torch.sum(rot*k0, dim=-2) + jacobi_blendtrans_theta[..., :3, 3, :]
# tic('keypoints')
# // compute the jacobian of R T
# // loss: J_obs - (R @ jest + T)
rot, jacobi_R_rvec, jacobi_joints_RT = getJacobianOfRT(Rh, Th, kpts)
kpts_rt = torch.matmul(kpts, rot.transpose(-1, -2)) + Th[:, None]
rot_nk = rot[:, None].expand(-1, k_shaped.shape[0], -1, -1)
jacobi_keypoints_theta = torch.matmul(rot_nk, jacobi_keypoints_theta)
# compute jacobian of poses
shape_jacobi = jacobi_keypoints_theta.shape[:-1]
NUM_THETAS = jacobi_posesful_poses.shape[0]
jacobi_keypoints_poses = (jacobi_keypoints_theta[..., :NUM_THETAS].view(-1, NUM_THETAS) @ jacobi_posesful_poses).reshape(*shape_jacobi, -1)
# // loss: J_obs - (R @ jest + T) => -dR/drvec - dT/dtvec - Rx(djest/dtheta)
jacobi_keypoints = torch.cat([-jacobi_joints_RT, -jacobi_keypoints_poses], dim=-1)
if kpts_index is not None:
jacobi_keypoints = jacobi_keypoints[:, kpts_index]
kpts_rt = kpts_rt[:, kpts_index]
jacobi_keypoints_flat = jacobi_keypoints.view(bn, -1, jacobi_keypoints.shape[-1])
# tic('jacobian keypoints')
JTJ_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ (jacobi_keypoints_flat * conf[..., None])
res = conf[..., None] * ((keypoints3d[..., :3] - kpts_rt).view(bn, -1, 1))
JTr_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ res
cache_dict = {
'Th': Th,
'Rh': Rh,
'poses': poses,
}
# 计算loss
# JTJ = torch.zeros((bn*NUM_FRAME, bn*NUM_FRAME), device=device, dtype=dtype)
JTJ = torch.eye(bn*NUM_FRAME, device=device, dtype=dtype) * 1e-4
JTr = torch.zeros((bn*NUM_FRAME, 1), device=device, dtype=dtype)
# add regularization for each parameter
for nf in range(bn):
for key in keys_optimized:
start, end = nf*NUM_FRAME + keys_range[key][0], nf*NUM_FRAME + keys_range[key][1]
JTJ[start:end, start:end] += weight['reg_{}'.format(key)] * torch.eye(end-start)
JTr[start:end] += -weight['reg_{}'.format(key)] * cache_dict[key][nf].view(-1, 1)
# add init for Rh
if key == 'Rh':
# 增加初始化的loss
res_init = rot[nf] - init_dict['Rot'][nf]
JTJ[start:end, start:end] += weight['init_'+key] * jacobi_R_rvec[nf] @ jacobi_R_rvec[nf].T
JTr[start:end] += -weight.init_Rh * jacobi_R_rvec[nf] @ res_init.reshape(-1, 1)
else:
res_init = cache_dict[key][nf] - init_dict[key][nf]
JTJ[start:end, start:end] += weight['init_'+key] * torch.eye(end-start)
JTr[start:end] += -weight['init_'+key] * res_init.reshape(-1, 1)
# add keypoints loss
for nf in range(bn):
JTJ[nf*NUM_FRAME:(nf+1)*NUM_FRAME, nf*NUM_FRAME:(nf+1)*NUM_FRAME] += w_joints * JTJ_keypoints[nf]
JTr += - w_joints * JTr_keypoints.reshape(-1, 1)
# tic('add loss')
delta = torch.linalg.solve(JTJ, JTr)
# tic('solve')
norm_d, norm_f = self.log('pose', iter_, delta, res, keys_range)
if torch.norm(delta) < option.gtol:
break
if iter_ > 0 and abs(norm_f - lossnorm)/norm_f < option.ftol:
break
delta = delta.view(bn, NUM_FRAME)
lossnorm = norm_f
for key, _range in keys_range.items():
if key not in cache_dict.keys():continue
cache_dict[key] += delta[:, _range[0]:_range[1]]
res = {
'id': params['id'],
'poses': poses,
'shapes': params['shapes'],
'Rh': Rh,
'Th': Th,
}
for key, val in params.items():
if key not in res.keys():
res[key] = val
return res
def try_to_init(self, records):
if self.init:
return copy.deepcopy(self.params_newest)
mywarn('>> Initialize')
keypoints3d = self.get_keypoints3d(records)
params, keypoints_template = self.fitShape(keypoints3d, self.get_init_params(keypoints3d.shape[0]), self.cfg.shape.weight, self.cfg.shape.option)
Rot = batch_rodrigues(params['Rh'])
keypoints_template = torch.matmul(keypoints_template, Rot[0].t())
if self.cfg.initRT.mean_T:
conf = keypoints3d[..., 3:]
T = ((keypoints3d[..., :3] - keypoints_template[None])*conf).sum(dim=-2)/(conf.sum(dim=-2))
params['Th'] = T
params = self.fitRT(keypoints3d, params, self.cfg.initRT.weight, self.cfg.initRT.option,
kpts_index=self.cfg.TORSO_INDEX)
params = self.fitPose(keypoints3d, params, self.cfg.init_pose.weight, self.cfg.init_pose.option,
kpts_index=self.cfg.TORSO_INDEX)
params = self.fitPose(keypoints3d, params, self.cfg.init_pose.weight, self.cfg.init_pose.option,
kpts_index=self.cfg.BODY_INDEX)
mywarn('>> Initialize Rh = {}, Th = {}'.format(params['Rh'][0], params['Th'][0]))
params = Params(params)[-1]
self.init = True
return params
def fitting(self, params_init, records):
keypoints3d = self.get_keypoints3d(records[-self.WINDOW_SIZE:])
params = params_init
params = self.fitRT(keypoints3d[-self.FITTING_SIZE:], params, self.cfg.RT.weight, self.cfg.RT.option)
params = self.fitPose(keypoints3d[-self.FITTING_SIZE:], params, self.cfg.pose.weight, self.cfg.pose.option)
return params
def filter_result(self, result):
poses = result['poses'].reshape(-1, 3)
# TODO: 这里的xyz是scipy中的XYZ
euler = axis_angle_to_euler(poses, order='xyz')
# euler[euler>np.pi] = 0.
poses = euler_to_axis_angle(euler, order='xyz')
result['euler'] = euler
return result
def smooth_results(self):
results_ = {}
for key in self.results[0].keys():
if key == 'id': continue
if key not in self.SMOOTH_SIZE.keys():continue
results_[key] = np.vstack([r[key] for r in self.results[-self.SMOOTH_SIZE[key]:]])
results_[key] = np.mean(results_[key], axis=0, keepdims=True)
results_['id'] = 0
for key, val in self.fix_dict.items():
results_[key] = np.array(val)
# results_['Th'][:] = 0.
return [results_]
def check_keypoints(self, keypoints3d):
flag = (keypoints3d[..., -1]>0.3).sum() > 5
if len(self.records) > 1:
pre = self.records[-1]
k_pre = self.get_keypoints3d([pre])
dist = torch.norm(keypoints3d[..., :3] - k_pre[..., :3], dim=-1)
conf = torch.sqrt(keypoints3d[..., 3] * k_pre[..., 3])
dist_mean = (dist * conf).sum()/conf.sum()
flag = flag and dist_mean < 0.1
return flag
def __call__(self, data):
self.frame_index += 1
k3d = self.triangulator(data)[0]
keypoints3d = self.get_keypoints3d([k3d])
flag = self.check_keypoints(keypoints3d)
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
if self.frame_index - self.frame_latest > 10 and self.init:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
self.results = []
return -1
self.records.append(k3d)
if len(self.records) < self.INIT_SIZE:
return -1
with Timer('fitting', True):
params = self.try_to_init(self.records)
params = self.fitting(params, self.records)
params = self.filter_result(params)
self.results.append(params)
self.params_newest = params
self.frame_latest = self.frame_index
return self.smooth_results()
class FitterCPPCache:
def __init__(self) -> None:
self.init = False
self.records = []
self.frame_index = 0
self.frame_latest = 0
def try_to_init(self, keypoints3d):
if self.init:
return copy.deepcopy(self.params_newest)
mywarn('>> Initialize')
params = self.handmodel.init_params(1)
if not (keypoints3d[..., -1] > 0.).all(): # 只有一半的点可见
return params
params = self.handmodel.fit3DShape(keypoints3d, params)
params = self.handmodel.init3DRT(keypoints3d[-1:], params)
params = self.handmodel.fit3DPose(keypoints3d[-1:], params)
mywarn('>> Initialize Rh = {}, Th = {}'.format(params['Rh'][0], params['Th'][0]))
self.init = True
return params
def check_keypoints(self, keypoints3d):
flag = (keypoints3d[..., -1]>0.3).sum() > 5
if len(self.records) > 1:
k_pre = self.records[-1][None]
dist = np.linalg.norm(keypoints3d[..., :3] - k_pre[..., :3], axis=-1)
conf = np.sqrt(keypoints3d[..., 3] * k_pre[..., 3])
dist_mean = (dist * conf).sum()/(1e-5+conf.sum())
print(dist_mean)
flag = flag and dist_mean < 0.1
return flag
def smooth_results(self, params=None):
if params is None:
params = self.params_newest.copy()
params['poses'] = params['poses'][:, 3:]
params['id'] = 0
return [params]
def __call__(self, keypoints3d):
self.frame_index += 1
flag = self.check_keypoints(keypoints3d)
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
if self.frame_index - self.frame_latest > 10:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
return self.smooth_results(self.handmodel.init_params(1))
self.records.append(keypoints3d)
with Timer('fitting'):
params = self.try_to_init(keypoints3d[None])
params = self.handmodel.fit3DPose(keypoints3d[None], params)
self.params_newest = params
self.frame_latest = self.frame_index
return self.smooth_results()
class ManoFitterCPPCache(FitterCPPCache):
def __init__(self, name='LEFT') -> None:
super().__init__()
self.handmodel = load_object_from_cmd('config/model/mano_full_cpp.yml', [
'args.model_path', 'data/bodymodels/manov1.2/MANO_{}.pkl'.format(name),
'args.regressor_path', 'data/smplx/J_regressor_mano_{}.txt'.format(name),
])
class SMPLFitterCPPCache(FitterCPPCache):
def __init__(self, name='half') -> None:
super().__init__()
self.handmodel = load_object_from_cmd('config/model/mano_full_cpp.yml', [
'args.model_path', 'data/bodymodels/manov1.2/MANO_{}.pkl'.format(name),
'args.regressor_path', 'data/smplx/J_regressor_mano_{}.txt'.format(name),
])
class ManoFitterCPP:
def __init__(self, cfg_triangulator, key) -> None:
self.handmodel = load_object_from_cmd('config/model/mano_full_cpp.yml', [])
self.triangulator = load_object_from_cmd(cfg_triangulator, [])
self.time = 0
self.frame_latest = 0
self.frame_index = 0
self.key = 'handl3d'
self.init = False
self.params_newest = None
self.records, self.results = [], []
self.INIT_SIZE = 10
def get_keypoints3d(self, records, key=None):
if key is None:
return np.stack([r[self.key] for r in records])
else:
return np.stack([r[key] for r in records])
def try_to_init(self, records):
if self.init:
return copy.deepcopy(self.params_newest)
mywarn('>> Initialize')
keypoints3d = self.get_keypoints3d(records)
params = self.handmodel.init_params(1)
params = self.handmodel.fit3DShape(keypoints3d, params)
params = self.handmodel.init3DRT(keypoints3d[-1:], params)
params = self.handmodel.fit3DPose(keypoints3d[-1:], params)
mywarn('>> Initialize Rh = {}, Th = {}'.format(params['Rh'][0], params['Th'][0]))
self.init = True
return params
def smooth_results(self):
params = self.params_newest.copy()
params['poses'] = params['poses'][:, 3:]
params['id'] = 0
return [params]
def __call__(self, data):
self.frame_index += 1
k3d = self.triangulator(data)[0]
keypoints3d = self.get_keypoints3d([k3d])
# flag = self.check_keypoints(keypoints3d)
flag = True
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
if self.frame_index - self.frame_latest > 10 and self.init:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
self.results = []
return -1
self.records.append(k3d)
if len(self.records) < self.INIT_SIZE:
return -1
with Timer('fitting'):
params = self.try_to_init(self.records)
k3d = self.get_keypoints3d(self.records[-1:])
params = self.handmodel.fit3DPose(k3d, params)
# params['poses'] = torch.Tensor(params['poses'][:, 3:])
# params['shapes'] = torch.Tensor(params['shapes'])
# params['Rh'] = torch.Tensor(params['Rh'])
# params['Th'] = torch.Tensor(params['Th'])
# params = self.filter_result(params)
self.results.append(params)
self.params_newest = params
self.frame_latest = self.frame_index
return self.smooth_results()
class BodyFitter(BaseFitter):
def __init__(self, key, **kwargs):
super().__init__(**kwargs)
self.key = key
def get_keypoints3d(self, records, key=None):
if key is None:
return torch.Tensor(np.stack([r[self.key] for r in records]))
else:
return torch.Tensor(np.stack([r[key] for r in records]))
class ManoFitter(BodyFitter):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
class HalfFitter(BodyFitter):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.INDEX_HALF = [0,1,2,3,4,5,6,7,15,16,17,18]
def get_keypoints3d(self, records):
THRES_WRIST = 0.05
keypoints3d = super().get_keypoints3d(records)
keypoints3d = keypoints3d[:, self.INDEX_HALF]
handl = super().get_keypoints3d(records, key='handl3d')
handr = super().get_keypoints3d(records, key='handr3d')
dist_ll = torch.norm(keypoints3d[:, 7, :3] - handl[:, 0, :3], dim=-1)
dist_rr = torch.norm(keypoints3d[:, 4, :3] - handr[:, 0, :3], dim=-1)
handl[dist_ll>THRES_WRIST] = 0.
handr[dist_rr>THRES_WRIST] = 0.
keypoints3d = np.hstack([keypoints3d, handl, handr])
conf = keypoints3d[..., 3:]
keypoints3d = np.hstack([(keypoints3d[..., :3] * conf).sum(axis=0)/(1e-5 + conf.sum(axis=0)), conf.min(axis=0)])
keypoints3d = keypoints3d[None]
# if (keypoints3d.shape[0] == 10):
return torch.Tensor(keypoints3d)
def filter_result(self, result):
result = super().filter_result(result)
# 限定一下关节旋转
# 手肘
# result['poses'][:, 5*3+1] = np.clip(result['poses'][:, 5*3+1], -2.5, 0.1)
# result['poses'][:, 6*3+1] = np.clip(result['poses'][:, 5*3+1], -0.1, 2.5)
# 手腕
return result
class HalfHandFitter(HalfFitter):
def __init__(self, cfg_handl, cfg_handr, **kwargs) -> None:
super().__init__(**kwargs)
self.handl = load_object_from_cmd(cfg_handl, [])
self.handr = load_object_from_cmd(cfg_handr, [])
def get_init_params(self, nFrames):
params = super().get_init_params(nFrames)
params_ = self.handl.get_init_params(nFrames)
params['shapes_handl'] = params_['shapes']
params['shapes_handr'] = params_['shapes'].clone()
params['Rh_handl'] = torch.zeros((nFrames, 3))
params['Rh_handr'] = torch.zeros((nFrames, 3))
params['Th_handl'] = torch.zeros((nFrames, 3))
params['Th_handr'] = torch.zeros((nFrames, 3))
return params
def fitPose(self, keypoints3d, params, weight, option):
keypoints = {
'handl': keypoints3d[:, -21-21:-21, :],
'handr': keypoints3d[:, -21:, :]
}
for key in ['handl', 'handr']:
kpts = keypoints[key]
params_ = {
'id': 0,
'Rh': params['Rh_'+key],
'Th': params['Th_'+key],
'shapes': params['shapes_'+key],
'poses': params[key],
}
if key == 'handl':
params_ = self.handl.fitPose(kpts, params_, self.handl.cfg.pose.weight, self.handl.cfg.pose.option)
else:
params_ = self.handr.fitPose(kpts, params_, self.handr.cfg.pose.weight, self.handr.cfg.pose.option)
params['Rh_'+key] = params_['Rh']
params['Th_'+key] = params_['Th']
params['shapes_'+key] = params_['shapes']
params[key] = params_['poses']
return super().fitPose(keypoints3d, params, weight, option,
kpts_index=[0,1,2,3,4,5,6,7,8,9,10,11,
12, 17, 21, 25, 29,
24, 29, 33, 37, 41])
def try_to_init(self, records):
if self.init:
return self.params_newest
params = super().try_to_init(records)
self.handl.init = False
self.handr.init = False
key = 'handl'
params_ = self.handl.try_to_init(records)
params['handl'] = params_['poses']
params['Rh_'+key] = params_['Rh']
params['Th_'+key] = params_['Th']
params['shapes_'+key] = params_['shapes']
key = 'handr'
params_ = self.handr.try_to_init(records)
params[key] = params_['poses']
params['Rh_'+key] = params_['Rh']
params['Th_'+key] = params_['Th']
params['shapes_'+key] = params_['shapes']
return params
if __name__ == '__main__':
from glob import glob
from os.path import join
from tqdm import tqdm
from ..mytools.file_utils import read_json
from ..config.baseconfig import load_object_from_cmd
data= '/nas/datasets/EasyMocap/302'
# data = '/home/qing/Dataset/handl'
mode = 'half'
# data = '/home/qing/DGPU/home/shuaiqing/zju-mocap-mp/female-jump'
data = '/home/qing/Dataset/desktop/0402/test3'
k3dnames = sorted(glob(join(data, 'output-keypoints3d', 'keypoints3d', '*.json')))
if mode == 'handl':
fitter = load_object_from_cmd('config/recon/fit_manol.yml', [])
elif mode == 'half':
fitter = load_object_from_cmd('config/recon/fit_half.yml', [])
elif mode == 'smpl':
fitter = load_object_from_cmd('config/recon/fit_smpl.yml', [])
from easymocap.socket.base_client import BaseSocketClient
client = BaseSocketClient('0.0.0.0', 9999)
for k3dname in tqdm(k3dnames):
k3ds = read_json(k3dname)
if mode == 'handl':
k3ds = np.array(k3ds[0]['handl3d'])
data = {fitter.key3d: k3ds}
elif mode == 'half':
data = {
'keypoints3d': np.array(k3ds[0]['keypoints3d']),
'handl3d': np.array(k3ds[0]['handl3d']),
'handr3d': np.array(k3ds[0]['handr3d'])
}
elif mode == 'smpl':
k3ds = np.array(k3ds[0]['keypoints3d'])
data = {fitter.key3d: k3ds}
results = fitter(data)
if results != -1:
client.send_smpl(results)