EasyMocap/easymocap/multistage/fitting.py

1772 lines
78 KiB
Python
Raw Normal View History

'''
@ Date: 2022-03-22 16:11:44
@ Author: Qing Shuai
@ Mail: s_q@zju.edu.cn
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-07-25 11:51:50
@ FilePath: /EasyMocapPublic/easymocap/multistage/fitting.py
'''
# This function provides a realtime fitting interface
from collections import namedtuple
from time import time, sleep
import numpy as np
import cv2
import torch
import copy
from ..config.baseconfig import load_object_from_cmd
from ..mytools.debug_utils import log, mywarn
from ..mytools import Timer
from ..config import Config
from ..mytools.triangulator import iterative_triangulate
from ..bodymodel.base import Params
from .torchgeometry import axis_angle_to_euler, euler_to_axis_angle
def batch_rodrigues(rot_vecs, epsilon=1e-8, dtype=torch.float32):
''' Calculates the rotation matrices for a batch of rotation vectors
Parameters
----------
rot_vecs: torch.tensor Nx3
array of N axis-angle vectors
Returns
-------
R: torch.tensor Nx3x3
The rotation matrices for the given axis-angle parameters
'''
batch_size = rot_vecs.shape[0]
device = rot_vecs.device
angle = torch.norm(rot_vecs + 1e-8, dim=1, keepdim=True)
rot_dir = rot_vecs / angle
cos = torch.unsqueeze(torch.cos(angle), dim=1)
sin = torch.unsqueeze(torch.sin(angle), dim=1)
# Bx1 arrays
rx, ry, rz = torch.split(rot_dir, 1, dim=1)
K = torch.zeros((batch_size, 3, 3), dtype=dtype, device=device)
zeros = torch.zeros((batch_size, 1), dtype=dtype, device=device)
K = torch.cat([zeros, -rz, ry, rz, zeros, -rx, -ry, rx, zeros], dim=1) \
.view((batch_size, 3, 3))
ident = torch.eye(3, dtype=dtype, device=device).unsqueeze(dim=0)
rot_mat = ident + sin * K + (1 - cos) * torch.bmm(K, K)
return rot_mat
from scipy.spatial.transform import Rotation
def aa2euler(aa):
aa = np.array(aa)
R = cv2.Rodrigues(aa)[0]
# res = Rotation.from_dcm(R).as_euler('XYZ', degrees=True)
res = Rotation.from_matrix(R).as_euler('XYZ', degrees=False)
return np.round(res, 2).tolist()
def rotmat2euler(rot):
res = Rotation.from_matrix(rot).as_euler('XYZ', degrees=True)
return res
def euler2rotmat(euler):
res = Rotation.from_euler('XYZ', euler, degrees=True)
return res.as_matrix()
def batch_rodrigues_jacobi(rvec):
shape = rvec.shape
rvec = rvec.view(-1, 3)
device = rvec.device
dSkew = torch.zeros(3, 9, device=device)
dSkew[0, 5] = -1
dSkew[1, 6] = -1
dSkew[2, 1] = -1
dSkew[0, 7] = 1
dSkew[1, 2] = 1
dSkew[2, 3] = 1
dSkew = dSkew[None]
theta = torch.norm(rvec, dim=-1, keepdim=True) + 1e-5
c = torch.cos(theta)
s = torch.sin(theta)
c1 = 1 - c
itheta = 1 / theta
r = rvec / theta
zeros = torch.zeros_like(r[:, :1])
rx, ry, rz = torch.split(r, 1, dim=1)
rrt = torch.matmul(r[:, :, None], r[:, None, :])
skew = torch.cat([zeros, -rz, ry, rz, zeros, -rx, -ry, rx, zeros], dim=1) \
.view((r.shape[0], 3, 3))
I = torch.eye(3, device=rvec.device, dtype=rvec.dtype)[None]
rot_mat = I + s[:, None] * skew + c1[:, None] * torch.bmm(skew, skew)
drrt = torch.stack([
rx + rx, ry, rz, ry, zeros, zeros, rz, zeros, zeros,
zeros, rx, zeros, rx, ry + ry, rz, zeros, rz, zeros,
zeros, zeros, rx, zeros, zeros, ry, rx, ry, rz + rz
], dim=-1).view((r.shape[0], 3, 9))
jacobi = torch.zeros((r.shape[0], 3, 9), device=rvec.device, dtype=rvec.dtype)
for i in range(3):
ri = r[:, i:i+1]
a0 = -s * ri
a1 = (s - 2*c1*itheta)*ri
a2 = c1 * itheta
a3 = (c-s*itheta)*ri
a4 = s * itheta
jaco = a0[:, None] * I + a1[:, None] * rrt + a2[:, None] * drrt[:, i].view(-1, 3, 3) + a3[:, None] * skew + a4[:, None] * dSkew[:, i].view(-1, 3, 3)
jacobi[:, i] = jaco.view(-1, 9)
rot_mat = rot_mat.view(*shape[:-1], 3, 3)
jacobi = jacobi.view(*shape[:-1], 3, 9)
return rot_mat, jacobi
def getJacobianOfRT(rvec, tvec, joints):
# joints: (bn, nJ, 3)
dtype, device = rvec.dtype, rvec.device
bn, nJoints = joints.shape[:2]
# jacobiToRvec: (bn, 3, 9) // tested by OpenCV and PyTorch
Rot, jacobiToRvec = batch_rodrigues_jacobi(rvec)
I3 = torch.eye(3, dtype=dtype, device=device)[None]
# jacobiJ_R: (bn, nJ, 3, 3+3+3) => (bn, nJ, 3, 9)
# // flat by column:
# // x, 0, 0 | y, 0, 0 | z, 0, 0
# // 0, x, 0 | 0, y, 0 | 0, z, 0
# // 0, 0, x | 0, 0, y | 0, 0, z
jacobi_J_R = torch.zeros((bn, nJoints, 3, 9), dtype=dtype, device=device)
jacobi_J_R[:, :, 0, :3] = joints
jacobi_J_R[:, :, 1, 3:6] = joints
jacobi_J_R[:, :, 2, 6:9] = joints
# jacobi_J_rvec: (bn, nJ, 3, 3)
jacobi_J_rvec = torch.matmul(jacobi_J_R, jacobiToRvec[:, None].transpose(-1, -2))
# if True: # 测试自动梯度
# def test_func(rvec):
# Rot = batch_rodrigues(rvec[None])[0]
# joints_new = joints[0] @ Rot.t()
# return joints_new
# jac_J_rvec = torch.autograd.functional.jacobian(test_func, rvec[0])
# my_j = jacobi_joints_RT[0, ..., :3]
# jacobi_J_tvec: (bn, nJx3, 3)
jacobi_J_tvec = I3[None].expand(bn, nJoints, -1, -1)
jacobi_J_rt = torch.cat([jacobi_J_rvec, jacobi_J_tvec], dim=-1)
return Rot, jacobiToRvec, jacobi_J_rt
class Model:
rootIdx = 0
parents = []
INDEX_HALF = [0,1,2,3,4,5,6,7,15,16,17,18]
class LowPassFilter:
def __init__(self):
self.prev_raw_value = None
self.prev_filtered_value = None
def process(self, value, alpha):
if self.prev_raw_value is None:
s = value
else:
s = alpha * value + (1.0 - alpha) * self.prev_filtered_value
self.prev_raw_value = value
self.prev_filtered_value = s
return s
class OneEuroFilter:
def __init__(self, mincutoff=1.0, beta=0.0, dcutoff=1.0, freq=30):
self.freq = freq
self.mincutoff = mincutoff
self.beta = beta
self.dcutoff = dcutoff
self.x_filter = LowPassFilter()
self.dx_filter = LowPassFilter()
def compute_alpha(self, cutoff):
te = 1.0 / self.freq
tau = 1.0 / (2 * np.pi * cutoff)
return 1.0 / (1.0 + tau / te)
def process(self, x):
prev_x = self.x_filter.prev_raw_value
dx = 0.0 if prev_x is None else (x - prev_x) * self.freq
edx = self.dx_filter.process(dx, self.compute_alpha(self.dcutoff))
cutoff = self.mincutoff + self.beta * np.abs(edx)
return self.x_filter.process(x, self.compute_alpha(cutoff))
class BaseBody:
def __init__(self, cfg_triangulator, cfg_model, cfg) -> None:
self.triangulator = load_object_from_cmd(cfg_triangulator, [])
self.body_model = load_object_from_cmd(cfg_model, ['args.use_pose_blending', False, 'args.device', 'cpu'])
self.cfg = cfg
self.register_from_lbs(self.body_model)
def register_from_lbs(self, body_model):
kintree_shape = np.array(self.cfg.shape.kintree)
self.nJoints = body_model.J_regressor.shape[0]
self.k_shapeBlend = body_model.j_shapedirs[self.nJoints:]
self.j_shapeBlend = body_model.j_shapedirs[:self.nJoints]
self.jacobian_limb_shapes = self.k_shapeBlend[kintree_shape[:, 1]] - self.k_shapeBlend[kintree_shape[:, 0]]
self.k_template = body_model.j_v_template[self.nJoints:]
self.j_template = body_model.j_v_template[:self.nJoints]
self.k_weights = body_model.j_weights[self.nJoints:]
self.j_weights = body_model.j_weights[:self.nJoints]
parents = body_model.parents[1:].cpu().numpy()
child = np.arange(1, parents.shape[0]+1, dtype=np.int64)
self.kintree = np.stack([parents, child], axis=1)
self.parents = np.zeros(parents.shape[0]+1, dtype=int) - 1
self.parents[self.kintree[:, 1]] = self.kintree[:, 0]
self.rootIdx = 0
self.time = time()
def rotation_matrix_from_3x3(A):
U, s, Vt = np.linalg.svd(A, full_matrices=False)
V = Vt.T
T = np.dot(V, U.T)
# does the current solution use a reflection?
have_reflection = np.linalg.det(T) < 0
# if that's not what was specified, force another reflection
if have_reflection:
V[:,-1] *= -1
s[-1] *= -1
T = np.dot(V, U.T)
return T
def svd_rot(src, tgt, reflection=False, debug=True):
# optimum rotation matrix of Y
A = np.dot(src.T, tgt)
U, s, Vt = np.linalg.svd(A, full_matrices=False)
V = Vt.T
T = np.dot(V, U.T)
# does the current solution use a reflection?
have_reflection = np.linalg.det(T) < 0
# if that's not what was specified, force another reflection
if reflection != have_reflection:
V[:,-1] *= -1
s[-1] *= -1
T = np.dot(V, U.T)
if debug:
err = np.linalg.norm(tgt - src @ T.T, axis=1)
print('[svd] ', err)
return T
def normalize(vector):
return vector/np.linalg.norm(vector)
def rad_from_2vec(vec1, vec2):
return np.arccos((normalize(vec1)*normalize(vec2)).sum())
def smoothing_factor(t_e, cutoff):
r = 2 * 3.14 * cutoff * t_e
return r / (r + 1)
def exponential_smoothing(a, x, x_prev):
return a * x + (1 - a) * x_prev
FilterResult = namedtuple('FilterResult', ['x', 'dx', 'v', 't'])
class MyFilter:
def __init__(self, key, filled, min_cutoff=1.0, d_cutoff=1.0,
beta=0.1) -> None:
self.key = key
self.fill_result = filled
self.min_cutoff = min_cutoff
self.d_cutoff = d_cutoff
self.beta = beta
self.init = False
self.records = []
self.result = None
self.counter = 0
self.data = []
self.conf = []
self.smooth_record = []
def fill(self, value, conf):
filled = conf < 0.1
if filled.sum() >= 1:
value[filled] = self.fill_result[0][filled]
if self.key == 'Rh':
value = rotation_matrix_from_3x3(value.reshape(3, 3))
value = cv2.Rodrigues(value)[0].reshape(3,)
if (value < 0).all():
value = -value
return value[None]
def __call__(self, value, conf):
self.counter += 1
x, v = value[0], conf[0]
if self.key == 'Rh':
x = cv2.Rodrigues(x)[0].reshape(-1)
v = np.zeros((9,)) + v[0]
t = np.zeros_like(x)
t[v>0.1] = self.counter
self.data.append(x)
self.conf.append(v)
if len(self.smooth_record) == 0:
if self.key == 'Rh':
start = x
else:
# start = self.fill_result[0]
start = x
smoothed = FilterResult(start, np.zeros_like(x), np.zeros_like(x), t)
self.smooth_record.append(smoothed)
if len(self.data) < 3:
return self.fill(x, v)
data = np.array(self.data)
conf = np.array(self.conf)
smoothed = self.smooth_record[-1]
# 预计的速度
dx_new = x - smoothed.x
# 滤波器可见,当前可见
flag_vis = (smoothed.v > 0.1) & (v > 0.1)
# - 速度异常的,移除掉,认为当前帧不可见
flag_outlier = (np.abs(smoothed.dx) > 0.05) & (np.abs(dx_new - smoothed.dx)/(1e-5 + smoothed.dx) > 2.)
if self.key != 'Rh':
v[flag_vis&flag_outlier] = 0.
# 滤波器不可见,当前可见,速度打折,认为是新增的帧
flag_new = (smoothed.v < 0.1)&(v>0.1)
dx_new[flag_new] /= 3
# 滤波器可见,当前不可见,速度使用滤波器的速度
flag_unvis = (v<0.1) & (conf[-2] < 0.1)
dx_new[flag_unvis] = smoothed.dx[flag_unvis]
# 滤波器不可见当前也不可见速度清0
dx_new[(v<0.1)&(smoothed.v<0.1)] = 0.
# 实际估计出来的速度,这里要去掉不可见的地方
# 混合的权重使用 0.7, 0.3默认全部使用新的一半
weight_dx = np.zeros_like(dx_new) + 0.7
dx_smoothed = smoothed.dx*(1-weight_dx) + dx_new*weight_dx
smoothed_value = smoothed.x + dx_smoothed
v_new = smoothed.v.copy()
v_new = v_new * (1-weight_dx) + v*weight_dx
t_new = smoothed.t.copy()
t_new[v>0.1] = t[v>0.1]
smooth_new = FilterResult(smoothed_value, dx_smoothed, v_new, t_new)
self.smooth_record.append(smooth_new)
if self.counter == 1000:
if self.key == 'poses':
import matplotlib.pyplot as plt
xrange = np.arange(0, data.shape[0])
smoothed = np.array([d.x for d in self.smooth_record])
for nj in range(data.shape[1]):
valid = conf[:, nj] > 0.
plt.scatter(xrange[valid], data[valid, nj])
# yhat = savgol_filter(data[:, nj], data.shape[0], 3)
# plt.plot(yhat)
plt.plot(smoothed)
plt.show()
import ipdb;ipdb.set_trace()
# return self.fill(x, v)
return self.fill(smooth_new.x, smooth_new.v)
def __call__0(self, value, conf):
self.counter += 1
x, v = value[0], conf[0]
if self.key == 'Rh':
x = cv2.Rodrigues(x)[0].reshape(-1)
v = np.zeros((9,)) + v[0]
if self.result is None:
result = FilterResult(x, np.zeros_like(x), v, (v>0)*self.counter)
self.result = result
self.records.append(result)
return self.fill(result.x, result.v)
# return self.fill(x, v)
# 维护一个前一帧的去除outlier
prev = self.result
t = prev.t.copy()
t[v>0.] = self.counter
dx = x - prev.x # 这里直接使用与之前的结果的差了,避免多帧不可见,然后速度过大
MAX_DX = 1.
WINDOW = 31
not_valid = ((np.abs(dx) > MAX_DX) & (prev.v > 0.1))|\
(t-prev.t > WINDOW)
v[not_valid] = 0.
x_all = np.stack([r.x for r in self.records[-WINDOW:]])
v_all = np.stack([r.v for r in self.records[-WINDOW:]])
dx_all = np.stack([r.dx for r in self.records[-WINDOW:]])
v_sum = v_all.sum(axis=0)
dx_mean = (dx_all * v_all).sum(axis=0)/(1e-5 + v_all.sum(axis=0))
# if (x_all.shape[0] > 30) & (self.counter % 40 == 0):
if True:
x_mean = (x_all * v_all).sum(axis=0)/(1e-5 + v_all.sum(axis=0))
x_pred = x_mean
dx_pred = np.zeros_like(x_pred)
elif x_all.shape[0] >= 5:
# 进行smooth
axt = np.zeros((2, x_all.shape[1]))
xrange = np.arange(x_all.shape[0]).reshape(-1, 1)
A0 = np.hstack([xrange, np.ones((x_all.shape[0], 1))])
for nj in range(x_all.shape[1]):
conf = v_all[:, nj:nj+1]
if (conf>0.).sum() < 3:
continue
A = conf * A0
b = conf * (x_all[:, nj:nj+1])
est = np.linalg.inv(A.T @ A) @ A.T @ b
axt[:, nj] = est[:, 0]
x_all_smoothed = xrange * axt[0:1] + axt[1:]
x_pred = x_all_smoothed[x_all.shape[0]//2]
dx_pred = axt[0]
else:
x_pred = x_all[x_all.shape[0]//2]
dx_pred = dx_mean
if x_all.shape[0] == 1:
current = FilterResult(x, dx, v, t)
self.records.append(current)
self.result = current
else:
# dx_hat = (dx * v + dx_mean * v_mean)/(v+v_mean+1e-5)
# x_pred = x_mean + dx_hat
# current = FilterResult(x_pred, dx_hat, v, t)
current = FilterResult(x, dx, v, t)
self.records.append(current)
# 使用平均速度模型
self.result = FilterResult(x_pred, dx_pred, v_sum, t)
return self.fill(self.result.x, self.result.v)
def __call__2(self, value, conf):
self.counter += 1
x, v = value[0], conf[0]
if self.result is None:
result = FilterResult(x, np.zeros_like(x), v, (v>0)*self.counter)
self.result = result
return self.fill(result.x, result.v)
prev = self.result
t = prev.t.copy()
t[v>0.] = self.counter
# update t
# dx = (x - prev.x)/(np.maximum(t-prev.t, 1))
dx = x - prev.x # 这里直接使用与之前的结果的差了,避免多帧不可见,然后速度过大
dx_ = dx.copy()
# 判断dx的大小
large_dx = np.abs(dx) > 0.5
if large_dx.sum() > 0:
v[large_dx] = 0.
t[large_dx] = prev.t[large_dx]
dx[large_dx] = 0.
missing_index = ((prev.v > 0.1) & (v < 0.1)) | (t - prev.t > 10)
if missing_index.sum() > 0:
print('missing', missing_index)
new_index = (prev.v < 0.1) & (v > 0.1)
if new_index.sum() > 0:
print('new', new_index)
dx[new_index] = 0.
weight_dx = v/(1e-5+ 3*prev.v + 1*v)
weight_x = v/(1e-5+ 3*prev.v + 1*v)
# 移除速度过大的点
dx_hat = exponential_smoothing(weight_dx, dx, prev.dx)
x_pred = prev.x + dx_hat
x_hat = exponential_smoothing(weight_x, x, x_pred)
dx_real = x_hat - prev.x
# consider the unvisible v
print_val = {
't_pre': prev.t,
'x_inp': x,
'x_pre': prev.x,
'x_new': x_hat,
'dx_inp': dx_,
'dx_pre': prev.dx,
'dx_new': dx_hat,
'dx_real': dx_real,
'v': v,
'v_pre': prev.v,
'w_vel': weight_dx,
'w_x': weight_x
}
for key in print_val.keys():
print('{:7s}'.format(key), end=' ')
print('')
for i in range(x.shape[0]):
for key in print_val.keys():
print('{:7.2f}'.format(print_val[key][i]), end=' ')
print('')
v[missing_index] = prev.v[missing_index] / 1.2 # 衰减系数
result = FilterResult(x_hat, dx_hat, v, t)
self.result = result
return self.fill(result.x, result.v)
if len(self.records) < 10:
self.records.append([self.counter, value, conf])
return self.fill(value[0], conf[0])
if self.x is None:
time = np.vstack([x[0] for x in self.records])
value_pre = np.vstack([x[1] for x in self.records])
conf_pre = np.vstack([x[2] for x in self.records])
conf_sum = conf_pre.sum(axis=0)
value_mean = (value_pre * conf_pre).sum(axis=0)/(conf_sum + 1e-5)
self.x = value_mean
self.x_conf = conf_sum
t_prev = np.zeros_like(self.x, dtype=int) - 1
t_prev[conf_sum>0] = self.counter
self.t_prev = t_prev
# 零速度初始化
self.d_x = np.zeros_like(self.x)
return self.fill(self.x, self.x_conf)
# 假设每帧都传进来的吧
return self.fill(self.x, self.x_conf)
x_est, v_est, conf_est = self.x.copy(), self.d_x.copy(), self.x_conf.copy()
value = value[0]
conf = conf[0]
d_x = value - self.x
t_current = np.zeros_like(self.x, dtype=int) - 1
t_current[conf>0.] = self.counter
t_est = t_current - self.t_prev
# 前一帧有观测当前帧有观测两帧之差在10帧以内。正常更新
flag_vv = (t_current > 0) & (self.t_prev > 0) & \
(t_current - self.t_prev < 10)
# 前一帧无观测;当前帧有观测的;判断为新增的
flag_iv = (self.t_prev < 0) & (t_current > 0)
weight_vel = smoothing_factor(t_est, self.d_cutoff)
# 将观测的速度权重置0
weight_vel[flag_vv] = 0.
vel_hat = exponential_smoothing(weight_vel, d_x, self.d_x)
cutoff = self.min_cutoff + self.beta * np.abs(vel_hat)
weight_value = smoothing_factor(t_est, cutoff)
# 将观测的数值权重置0
weight_value[flag_vv] = 0.
weight_value[flag_iv] = 1. # 当前帧可见的,之前的帧不可见的,直接选择当前帧
vel_hat[flag_iv] = 0.
x_hat = exponential_smoothing(weight_value, value, self.x)
flag_vi = (self.t_prev > 0) & (~flag_vv)
flag_v = flag_vv | flag_vi | flag_iv
# 前一帧有观测;当前帧无观测的;判断为丢失的
x_est[flag_v] = x_hat[flag_v]
v_est[flag_v] = vel_hat[flag_v]
conf_est[flag_v] = (self.x_conf + conf)[flag_v]/2
self.t_prev[flag_v] = self.counter
self.x = x_est
self.d_x = v_est
self.x_conf = conf_est
return self.fill(x_est, conf_est)
class IKBody(BaseBody):
def __init__(self, key, cfg_triangulator, cfg_model, cfg, debug) -> None:
super().__init__(cfg_triangulator, cfg_model, cfg)
self.key = key
self.frame_index = 0
self.frame_latest = 0
self.init = False
self.records = []
self.results = []
self.blank_result = self.make_blank()
self.fill_result = self.make_fill()
self.results_newest = self.blank_result
if True:
self.lefthand = ManoFitterCPPCache('LEFT')
self.righthand = ManoFitterCPPCache('RIGHT')
self.up_vector = 'z'
self.filter = {}
for key in ['Rh', 'poses', 'handl', 'handr']:
self.filter[key] = MyFilter(key, self.fill_result[key])
def make_blank(self):
raise NotImplementedError
def make_fill(self):
raise NotImplementedError
def smooth_results(self, params=None):
results = {'id': 0, 'type': 'smplh_half'}
for key in ['Rh', 'poses', 'handl', 'handr']:
value = self.filter[key](params[key], params[key+'_conf'])
results[key] = value
for key in ['shapes', 'Th']:
results[key] = self.blank_result[key]
return results
def smooth_results_old(self, params=None):
if params is not None:
self.results.append(params)
if len(self.results) < 10:
return params
else:
if len(self.results) < 10:
return self.fill_result
else:
params = self.fill_result
results = {'id': 0}
if False:
for key in ['Rh', 'poses', 'handl', 'handr']:
if not self.filter[key].init:
import ipdb;ipdb.set_trace()
else:
value = self.filter[key](self.results[-1][key])
if True:
for key in ['Rh', 'poses', 'handl', 'handr']:
find = False
for WINDOW in [10, 20, 40]:
if WINDOW > len(self.results):
break
records = self.results[-WINDOW:]
value = np.vstack([r[key] for r in records])
conf = np.vstack([r[key+'_conf'] for r in records])
valid = conf[..., 0] > 0
if valid.sum() < WINDOW // 3:
import ipdb;ipdb.set_trace()
else:
value, conf = value[valid], conf[valid]
mean_value = value.mean(axis=0)
std_value = value.std(axis=0)
valid2 = (np.abs(value - mean_value) < WINDOW//3 * std_value).any(axis=-1)
if valid2.sum() < WINDOW // 4:
continue
find = True
value, conf = value[valid2], conf[valid2]
conf_sum = conf.sum(axis=0)
mean = (value*conf).sum(axis=0)/(conf_sum + 1e-5)
# 计算latest
break
if key in ['poses', 'handl', 'handr']:
conf_sum_p = conf.sum(axis=0)
mean_previous = (value*conf).sum(axis=0)/(conf_sum_p + 1e-5)
mean[conf_sum<0.01] = mean_previous[conf_sum<0.01]
conf_sum[conf_sum<0.01] = conf_sum_p[conf_sum<0.01]
# 使用fill的填值
mean[conf_sum<0.01] = self.fill_result[key][0][conf_sum<0.01]
break
if find:
results[key] = mean[None]
else:
results[key] = self.fill_result[key]
if False: # 均值滤波
for key in ['Rh', 'poses', 'handl', 'handr']:
if key not in params.keys():
continue
if key not in self.cfg.SMOOTH_SIZE.keys():
results[key] = params[key]
records = self.results[-self.cfg.SMOOTH_SIZE[key]:]
value = np.vstack([r[key] for r in records])
conf = np.vstack([r[key+'_conf'] for r in records])
conf_sum = conf.sum(axis=0)
mean = (value*conf).sum(axis=0)/(conf_sum + 1e-5)
# 计算latest
if key in ['poses', 'handl', 'handr']:
records = self.results[-5*self.cfg.SMOOTH_SIZE[key]:]
value = np.vstack([r[key] for r in records])
conf = np.vstack([r[key+'_conf'] for r in records])
conf_sum_p = conf.sum(axis=0)
mean_previous = (value*conf).sum(axis=0)/(conf_sum_p + 1e-5)
mean[conf_sum<0.01] = mean_previous[conf_sum<0.01]
conf_sum[conf_sum<0.01] = conf_sum_p[conf_sum<0.01]
# 使用fill的填值
mean[conf_sum<0.01] = self.fill_result[key][0][conf_sum<0.01]
results[key] = mean[None]
results['Th'] = self.blank_result['Th']
results['shapes'] = self.blank_result['shapes']
return results
def get_keypoints3d(self, records, key=None):
if key is None:
return np.stack([r[self.key] for r in records])
else:
return np.stack([r[key] for r in records])
def check_keypoints(self, keypoints3d):
flag = (keypoints3d[..., -1]>self.cfg.MIN_THRES).sum() > 5
if len(self.records) > 1:
pre = self.records[-1]
k_pre = self.get_keypoints3d([pre])
dist = np.linalg.norm(keypoints3d[..., :3] - k_pre[..., :3], axis=-1)
conf = np.sqrt(keypoints3d[..., 3] * k_pre[..., 3])
dist_mean = (dist * conf).sum()/conf.sum()
flag = flag and dist_mean < 0.1
return flag
def __call__(self, data):
self.frame_index += 1
k3d = self.triangulator(data)[0]
keypoints3d = self.get_keypoints3d([k3d])
flag = self.check_keypoints(keypoints3d)
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
# 1. 初始化过了,但是超出帧数了,清零
# 2. 没有初始化过,超出了,清零
if (self.frame_index - self.frame_latest > 10 and self.init) or not self.init:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
self.results = []
return [self.fill_result]
else:
return [self.smooth_results()]
elif not self.init: # 暂时还没有初始化,先等待
if len(self.records) < 10:
self.records.append(k3d)
return [self.fill_result]
self.records.append(k3d)
flag, params = self.fitting(keypoints3d, self.results_newest)
if not flag:
return [self.fill_result]
self.frame_latest = self.frame_index
# smooth results
results = self.smooth_results(params)
self.results_newest = results
k3d['type'] = 'body25'
return [results, k3d]
class HalfBodyIK(IKBody):
def get_keypoints3d(self, records):
THRES_WRIST = 0.2
keypoints3d = super().get_keypoints3d(records)
keypoints3d = keypoints3d[:, INDEX_HALF]
handl = super().get_keypoints3d(records, key='handl3d')
handr = super().get_keypoints3d(records, key='handr3d')
dist_ll = np.linalg.norm(keypoints3d[:, 7, :3] - handl[:, 0, :3], axis=-1)
dist_rr = np.linalg.norm(keypoints3d[:, 4, :3] - handr[:, 0, :3], axis=-1)
log('Dist left = {}, right = {}'.format(dist_ll, dist_rr))
handl[dist_ll>THRES_WRIST] = 0.
handr[dist_rr>THRES_WRIST] = 0.
keypoints3d = np.hstack([keypoints3d, handl, handr])
conf = keypoints3d[..., 3:]
keypoints3d = np.hstack([(keypoints3d[..., :3] * conf).sum(axis=0)/(1e-5 + conf.sum(axis=0)), conf.min(axis=0)])
keypoints3d = keypoints3d[None]
# if (keypoints3d.shape[0] == 10):
return keypoints3d
def _ik_shoulder(self, keypoints3d, params):
SHOULDER_IDX = [2, 5]
shoulder = keypoints3d[SHOULDER_IDX[1], :3] - keypoints3d[SHOULDER_IDX[0], :3]
if self.up_vector == 'x':
shoulder[..., 0] = 0.
up_vector = np.array([1., 0., 0.], dtype=np.float32)
elif self.up_vector == 'z':
shoulder[..., 2] = 0.
up_vector = np.array([0., 0., 1.], dtype=np.float32)
shoulder = shoulder/np.linalg.norm(shoulder, keepdims=True)
# 限定一下角度范围
theta = -np.rad2deg(np.arctan2(shoulder[1], shoulder[2]))
if (theta < 30 or theta > 150) and False:
return False, params
front = np.cross(shoulder, up_vector)
front = front/np.linalg.norm(front, keepdims=True)
R = np.stack([shoulder, up_vector, front]).T
Rh = cv2.Rodrigues(R)[0].reshape(1, 3)
log('Shoulder:{}'.format(Rh))
params['R'] = R
params['Rh'] = Rh
params['Rh_conf'] = np.zeros((1, 3)) + keypoints3d[SHOULDER_IDX, 3].min()
return True, params
def _ik_head(self, keypoints3d, params):
HEAD_IDX = [0, 8, 9, 10, 11]
HEAD_ROT_IDX = 0
est_points = keypoints3d[HEAD_IDX, :3]
valid = (keypoints3d[HEAD_IDX[0], 3] > self.cfg.MIN_THRES) and (keypoints3d[HEAD_IDX[1:], 3]>self.cfg.MIN_THRES).sum()>=2
if not valid:
params['poses_conf'][:, 3*HEAD_ROT_IDX:3*(HEAD_ROT_IDX+1)] = 0.
return params
params['poses_conf'][:, 3*HEAD_ROT_IDX:3*(HEAD_ROT_IDX+1)] = keypoints3d[HEAD_IDX, 3].sum()
gt_points = self.k_template[HEAD_IDX].numpy()
gt_points = gt_points - gt_points[:1]
est_points = est_points - est_points[:1]
# gt_points = gt_points / np.linalg.norm(gt_points, axis=-1, keepdims=True)
# est_points = est_points / np.linalg.norm(est_points, axis=-1, keepdims=True)
if True:
R_global = svd_rot(gt_points, est_points)
R_local = params['R'].T @ R_global
elif False:
est_points_inv = est_points @ params['R'].T.T
R_local = svd_rot(gt_points, est_points_inv)
else:
gt_points = gt_points @ params['R'].T
R_local = svd_rot(gt_points, est_points)
euler = rotmat2euler(R_local)
euler[0] = euler[0] - 25
# log('euler before filter: {}'.format(euler))
euler[0] = max(min(euler[0], 30), -30)
euler[1] = max(min(euler[1], 60), -60)
euler[2] = max(min(euler[2], 45), -45)
# log('euler after filter: {}'.format(euler))
R_local = euler2rotmat(euler)
R_head = cv2.Rodrigues(R_local)[0].reshape(1, 3)
params['poses'][:, 3*HEAD_ROT_IDX:3*(HEAD_ROT_IDX+1)] = R_head
return params
@staticmethod
def _rad_from_twovec(keypoints3d, start, mid, end, MIN_THRES):
start = keypoints3d[start]
mid = keypoints3d[mid]
end = keypoints3d[end]
if isinstance(end, list):
# dst is a list
if (end[:, 3] > MIN_THRES).sum() < 2:
return 0, 0.
end = np.sum(end * end[:, 3:], axis=0)/(end[:, 3:].sum())
# use its mean to represent the points
conf = [start[3], mid[3], end[3]]
valid = (min(conf) > MIN_THRES).all()
if not valid:
return 0, 0.
conf = sum(conf)
dir_src = normalize(mid[:3] - start[:3])
dir_dst = normalize(end[:3] - mid[:3])
rad = rad_from_2vec(dir_src, dir_dst)
return conf, rad
def _ik_elbow(self, keypoints3d, params):
for name, info in self.cfg.LEAF.items():
conf, rad = self._rad_from_twovec(keypoints3d, info.start, info.mid, info.end, self.cfg.MIN_THRES)
if conf <= 0.: continue
params['poses_conf'][:, 3*info['index']:3*(info['index']+1)] = conf
rad = np.clip(rad, *info['ranges'])
rot = rad*np.array(info['axis']).reshape(1, 3)
params['poses'][:, 3*info['index']:3*(info['index']+1)] = rot
return params
def _ik_arm(self, keypoints3d, params):
# forward一遍获得关键点
# 这里需要确保求解的关节点没有父节点了
template = self.body_model.keypoints({'poses': params['poses'], 'shapes': params['shapes']}, return_tensor=False)[0]
for name, info in self.cfg.NODE.items():
idx = info['children']
conf = keypoints3d[info['children'], 3]
if not (conf >self.cfg.MIN_THRES).all():
continue
params['poses_conf'][:, 3*info['index']:3*(info['index']+1)] = conf.sum()
est_points = keypoints3d[idx, :3]
gt_points = template[idx]
est_points = est_points - est_points[:1]
gt_points = gt_points - gt_points[:1]
R_children = svd_rot(gt_points, est_points)
R_local = params['R'].T @ R_children
euler = rotmat2euler(R_local)
# log('euler {} before filter: {}'.format(name, euler))
# euler[0] = max(min(euler[0], 90), -90)
# euler[1] = max(min(euler[1], 90), -90)
# euler[2] = max(min(euler[2], 90), -90)
# log('euler {} after filter: {}'.format(name, euler))
R_local = euler2rotmat(euler)
params['poses'][:, 3*info['index']:3*(info['index']+1)] = cv2.Rodrigues(R_local)[0].reshape(1, 3)
return params
def _ik_palm(self, keypoints3d, params):
# template = self.body_model.keypoints({'poses': params['poses'], 'shapes': params['shapes']}, return_tensor=False)[0]
T_joints, _ = self.body_model.transform({'poses': params['poses'], 'shapes': params['shapes']}, return_vertices=False)
T_joints = T_joints[0].cpu().numpy()
for name, info in self.cfg.PALM.items():
# 计算手掌的朝向
est_points = keypoints3d[:, :3]
est_conf = keypoints3d[:, 3]
if est_conf[info.children].min() < self.cfg.MIN_THRES:
continue
# 计算朝向
dir0 = normalize(est_points[info.children[1]] - est_points[info.children[0]])
dir1 = normalize(est_points[info.children[-1]] - est_points[info.children[0]])
normal = normalize(np.cross(dir0, dir1))
dir_parent = normalize(est_points[info.parent[1]] - est_points[info.parent[0]])
# 计算夹角
rad = np.arccos((normal * dir_parent).sum()) - np.pi/2
rad = np.clip(rad, *info['ranges'])
rot = rad*np.array(info['axis']).reshape(1, 3)
params['poses'][:, 3*info['index']:3*(info['index']+1)] = rot
# 考虑手肘的朝向;这个时候还差一个绕手肘的朝向的方向的旋转;这个旋转是在手肘扭曲之前的
# 先计算出这个朝向;再转化
R_parents = params['R'] @ T_joints[info.index, :3, :3]
normal_canonical = R_parents.T @ normal.reshape(3, 1)
normal_canonical[0, 0] = 0
normal_canonical = normalize(normal_canonical)
# 在canonical下的投影
# normal_T = np.array([0., -1., 0.])
# trick: 旋转角度的正弦值等于在z轴上的坐标
rad = np.arcsin(normal_canonical[2, 0])
rot_x = np.array([-rad, 0., 0.])
R_x = cv2.Rodrigues(rot_x)[0]
R_elbow = cv2.Rodrigues(params['poses'][:, 3*info.index_elbow:3*info.index_elbow+3])[0]
R_elbow = R_elbow @ R_x
params['poses'][:, 3*info.index_elbow:3*info.index_elbow+3] = cv2.Rodrigues(R_elbow)[0].reshape(1, 3)
return params
def _ik_hand(self, template, keypoints3d, poses, conf, is_left):
# 计算手的每一段的置信度
poses_full = np.zeros((1, 45))
conf_full = np.zeros((1, 45))
y_axis = np.array([0., 1., 0.])
log('_ik for left: {}'.format(is_left))
for name, info in self.cfg.HAND.LEAF.items():
conf, rad = self._rad_from_twovec(keypoints3d, *info.ranges, self.cfg.MIN_THRES)
if conf <= 0.:
log('- skip: {}'.format(name))
continue
# trick: 手的朝向是反的
rad = - rad
if info.axis == 'auto':
template_dir = template[info.ranges[2]] - template[info.ranges[1]]
# y轴方向设成0
template_dir[1] = 0.
template_dir = normalize(template_dir)
# 计算旋转轴在与z轴的cross方向上
rot_vec = normalize(np.cross(template_dir, y_axis)).reshape(1, 3)
elif info.axis == 'root':
template_dir0 = template[info.ranges[1]] - template[info.ranges[0]]
template_dir1 = template[info.ranges[2]] - template[info.ranges[1]]
template_dir0 = normalize(template_dir0)
template_dir1 = normalize(template_dir1)
costheta0 = (template_dir0 *template_dir1).sum()
# 计算当前的夹角
est_dir0 = keypoints3d[info.ranges[1], :3] - keypoints3d[info.ranges[0], :3]
est_dir1 = keypoints3d[info.ranges[2], :3] - keypoints3d[info.ranges[1], :3]
est_dir0 = normalize(est_dir0)
est_dir1 = normalize(est_dir1)
costheta1 = (est_dir0 * est_dir1).sum()
# trick: 手的旋转角度都是相反的
rad = - np.arccos(np.clip(costheta1/costheta0, 0., 1.))
rot_vec = normalize(np.cross(template_dir1, y_axis)).reshape(1, 3)
log('- get: {}: {:.1f}, {}'.format(name, np.rad2deg(rad), rot_vec))
poses_full[:, 3*info.index:3*info.index+3] = rot_vec * rad
conf_full[:, 3*info.index:3*info.index+3] = conf
# 求解
usePCA = False
if usePCA:
ncomp = 24
lamb = 0.05
if is_left:
A_full = self.body_model.components_full_l[:ncomp].T
mean_full = self.body_model.mean_full_l
else:
A_full = self.body_model.components_full_r[:ncomp].T
mean_full = self.body_model.mean_full_r
valid = conf_full[0] > 0.
A = A_full[valid, :]
res = (poses_full[:, valid] - mean_full[:, valid]).T
x = np.linalg.inv(A.T @ A + lamb * np.eye(ncomp)) @ A.T @ res
poses_full = (A_full @ x).reshape(1, -1) + mean_full
conf_full = np.zeros_like(poses_full) + valid.sum()
return poses_full, conf_full
def make_blank(self):
params = self.body_model.init_params(1, ret_tensor=False)
params['id'] = 0
params['type'] = 'smplh_half'
params['Th'][0, 0] = 1.
params['Th'][0, 1] = -1
params['Th'][0, 2] = 1.
return params
def make_fill(self):
params = self.body_model.init_params(1, ret_tensor=False)
params['id'] = 0
params['type'] = 'smplh_half'
params['Rh'][0, 2] = -np.pi/2
params['handl'] = self.body_model.mean_full_l
params['handr'] = self.body_model.mean_full_r
params['poses'][0, 3*4+2] = -np.pi/4
params['poses'][0, 3*5+2] = np.pi/4
return params
def fitting(self, keypoints3d, results_pre):
# keypoints3d: (nFrames, nJoints, 4)
# 根据肩膀计算身体朝向
if len(keypoints3d.shape) == 3:
keypoints3d = keypoints3d[0]
params = self.body_model.init_params(1, ret_tensor=False)
params['poses_conf'] = np.zeros_like(params['poses'])
params['handl_conf'] = np.zeros_like(params['handl'])
params['handr_conf'] = np.zeros_like(params['handr'])
params['Rh_conf'] = 0.
params['id'] = 0
flag, params = self._ik_shoulder(keypoints3d, params)
if (params['Rh_conf'] <= 0.01).all():
return False, params
params = self._ik_head(keypoints3d, params)
params = self._ik_elbow(keypoints3d, params)
params = self._ik_arm(keypoints3d, params)
params = self._ik_palm(keypoints3d, params)
if False:
params['handl'], params['handl_conf'] = self._ik_hand(self.k_template[12:12+21].numpy(), keypoints3d[12:12+21], params['handl'], params['handl_conf'], is_left=True)
params['handr'], params['handr_conf'] = self._ik_hand(self.k_template[12+21:12+21+21].numpy(), keypoints3d[12+21:12+21+21], params['handr'], params['handr_conf'], is_left=False)
else:
params_l = self.lefthand(keypoints3d[12:12+21])[0]
params_r = self.righthand(keypoints3d[12+21:12+21+21])[0]
# log('[{:06d}] {}'.format(self.frame_index, params_l['poses'][0]))
# log('[{:06d}] {}'.format(self.frame_index, params_r['poses'][0]))
ncomp = params_l['poses'].shape[1]
A_full = self.body_model.components_full_l[:ncomp].T
mean_full = self.body_model.mean_full_l
poses_full = (A_full @ params_l['poses'].T).T + mean_full
params['handl'] = poses_full
A_full = self.body_model.components_full_r[:ncomp].T
mean_full = self.body_model.mean_full_r
poses_full = (A_full @ params_r['poses'].T).T + mean_full
params['handr'] = poses_full
params['handl_conf'] = np.ones((1, 45))
params['handr_conf'] = np.ones((1, 45))
return True, params
class BaseFitter(BaseBody):
def __init__(self, cfg_triangulator, cfg_model,
INIT_SIZE, WINDOW_SIZE, FITTING_SIZE, SMOOTH_SIZE,
init_dict, fix_dict,
cfg) -> None:
super().__init__(cfg_triangulator, cfg_model, cfg)
self.records = []
self.results = []
self.INIT_SIZE = INIT_SIZE
self.WINDOW_SIZE = WINDOW_SIZE
self.FITTING_SIZE = FITTING_SIZE
self.SMOOTH_SIZE = SMOOTH_SIZE
self.time = 0
self.frame_latest = 0
self.frame_index = 0
self.init = False
self.init_dict = init_dict
self.fix_dict = fix_dict
self.identity_cache = {}
def get_keypoints3d(self, records):
raise NotImplementedError
def get_init_params(self, nFrames):
params = self.body_model.init_params(nFrames, ret_tensor=True)
for key, val in self.init_dict.items():
if key == 'Rh':
import cv2
R = cv2.Rodrigues(params['Rh'][0].cpu().numpy())[0]
for vec in self.init_dict['Rh']:
Rrel = cv2.Rodrigues(np.deg2rad(np.array(vec)))[0]
R = Rrel @ R
Rh = cv2.Rodrigues(R)[0]
params['Rh'] = torch.Tensor(Rh).reshape(-1, 3).repeat(nFrames, 1)
else:
params[key] = torch.Tensor(val).repeat(nFrames, 1)
params['id'] = 0
return params
def add_any_reg(self, val, val0, JTJ, JTr, w):
# loss: (val - val0)
nVals = val.shape[0]
if nVals not in self.identity_cache.keys():
self.identity_cache[nVals] = torch.eye(nVals, device=val.device, dtype=val.dtype)
identity = self.identity_cache[nVals]
JTJ += w * identity
JTr += -w*(val - val0).view(-1, 1)
return 0
def log(self, name, step, delta, res, keys_range=None):
toc = (time() - self.time)*1000
norm_d = torch.norm(delta).item()
norm_f = torch.norm(res).item()
text = '[{}:{:6.2f}]: step = {:3d}, ||delta|| = {:.4f}, ||res|| = {:.4f}'.format(name, toc, step, norm_d, norm_f)
print(text)
self.time = time()
return norm_d, norm_f
def fitShape(self, keypoints3d, params, weight, option):
kintree = np.array(self.cfg.shape.kintree)
nShapes = params['shapes'].shape[-1]
# shapes: (1, 10)
shapes = params['shapes'].T
shapes0 = shapes.clone()
device, dtype = shapes.device, shapes.dtype
lengths3d_est = torch.norm(keypoints3d[:, kintree[:, 1], :3] - keypoints3d[:, kintree[:, 0], :3], dim=-1)
conf = torch.sqrt(keypoints3d[:, kintree[:, 1], 3:] * keypoints3d[:, kintree[:, 0], 3:])
conf = conf.repeat(1, 1, 3).reshape(-1, 1)
nFrames = keypoints3d.shape[0]
# jacobian: (nFrames, nLimbs, 3, nShapes)
jacob_limb_shapes = self.jacobian_limb_shapes[None].repeat(nFrames, 1, 1, 1)
jacob_limb_shapes = jacob_limb_shapes.reshape(-1, nShapes)
# 注意:这里乘到雅克比的应该是 sqrt(conf),这里把两个合并了
JTJ_limb_shapes = jacob_limb_shapes.t() @ (jacob_limb_shapes * conf)
lossnorm = 0
self.time = time()
for iter_ in range(option.max_iter):
# perform shape blending
shape_offset = self.k_shapeBlend @ shapes
keyShaped = self.k_template + shape_offset[..., 0]
JTJ = JTJ_limb_shapes
JTr = torch.zeros((nShapes, 1), device=device, dtype=dtype)
# 并行添加所有的骨架
dir = keyShaped[kintree[:, 1]] - keyShaped[kintree[:, 0]]
dir_normalized = dir / torch.norm(dir, dim=-1, keepdim=True)
# res: (nFrames, nLimbs, 3)
res = lengths3d_est[..., None] * dir_normalized[None] - dir[None]
res = conf * res.reshape(-1, 1)
JTr += jacob_limb_shapes.t() @ res
self.add_any_reg(shapes, shapes0, JTJ, JTr, w=weight.init_shape)
delta = torch.linalg.solve(JTJ, JTr)
shapes += delta
norm_d, norm_f = self.log('shape', iter_, delta, res)
if torch.norm(delta) < option.gtol:
break
if iter_ > 0 and abs(norm_f - lossnorm)/norm_f < option.ftol:
break
lossnorm = norm_f
shapes = shapes.t()
params['shapes'] = shapes
return params, keyShaped
def fitRT(self, keypoints3d, params, weight, option, kpts_index=None):
keys_optimized = ['Rh', 'Th']
if kpts_index is not None:
keypoints3d = keypoints3d[:, kpts_index]
init_dict = {
'Rh': params['Rh'].clone(),
'Th': params['Th'].clone(),
}
init_dict['Rot'] = batch_rodrigues(init_dict['Rh'])
params_dict = {
'Rh': params['Rh'],
'Th': params['Th'],
}
keys_range = {}
for ikey, key in enumerate(keys_optimized):
if ikey == 0:
keys_range[key] = [0, init_dict[key].shape[-1]]
else:
start = keys_range[keys_optimized[ikey-1]][1]
keys_range[key] = [start, start+init_dict[key].shape[-1]]
NUM_FRAME = keys_range[keys_optimized[-1]][1]
kpts = self.body_model.keypoints({'poses': params['poses'], 'shapes': params['shapes']})
bn = keypoints3d.shape[0]
conf = keypoints3d[..., -1:].repeat(1, 1, 3).reshape(bn, -1)
dtype, device = kpts.dtype, kpts.device
w_joints = 1./keypoints3d.shape[-2] * weight.joints
self.time = time()
for iter_ in range(option.max_iter):
Rh, Th = params_dict['Rh'], params_dict['Th']
rot, jacobi_R_rvec, jacobi_joints_RT = getJacobianOfRT(Rh, Th, kpts)
kpts_rt = torch.matmul(kpts, rot.transpose(-1, -2)) + Th[:, None]
# // loss: J_obs - (R @ jest + T) => -dR/drvec - dT/dtvec - Rx(djest/dtheta)
jacobi_keypoints = -jacobi_joints_RT
if kpts_index is not None:
jacobi_keypoints = jacobi_keypoints[:, kpts_index]
kpts_rt = kpts_rt[:, kpts_index]
jacobi_keypoints_flat = jacobi_keypoints.view(bn, -1, jacobi_keypoints.shape[-1])
JTJ_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ (jacobi_keypoints_flat * conf[..., None])
res = conf[..., None] * ((keypoints3d[..., :3] - kpts_rt).view(bn, -1, 1))
JTr_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ res
#
JTJ = torch.eye(bn*NUM_FRAME, device=device, dtype=dtype) * 1e-4
JTr = torch.zeros((bn*NUM_FRAME, 1), device=device, dtype=dtype)
# accumulate loss
for nf in range(bn):
JTJ[nf*NUM_FRAME:(nf+1)*NUM_FRAME, nf*NUM_FRAME:(nf+1)*NUM_FRAME] += w_joints * JTJ_keypoints[nf]
# add regularization for each parameter
for nf in range(bn):
for key in keys_optimized:
start, end = nf*NUM_FRAME + keys_range[key][0], nf*NUM_FRAME + keys_range[key][1]
if key == 'Rh':
# 增加初始化的loss
res_init = rot[nf] - init_dict['Rot'][nf]
JTJ[start:end, start:end] += weight['init_'+key] * jacobi_R_rvec[nf] @ jacobi_R_rvec[nf].T
JTr[start:end] += -weight.init_Rh * jacobi_R_rvec[nf] @ res_init.reshape(-1, 1)
else:
res_init = Th[nf] - init_dict['Th'][nf]
JTJ[start:end, start:end] += weight['init_'+key] * torch.eye(3)
JTr[start:end] += -weight['init_'+key] * res_init.reshape(-1, 1)
JTr += - w_joints * JTr_keypoints.reshape(-1, 1)
# solve
delta = torch.linalg.solve(JTJ, JTr)
norm_d, norm_f = self.log('pose', iter_, delta, res)
if torch.norm(delta) < option.gtol:
break
if iter_ > 0 and abs(norm_f - lossnorm)/norm_f < option.ftol:
break
delta = delta.view(bn, NUM_FRAME)
lossnorm = norm_f
for key, _range in keys_range.items():
if key not in params_dict.keys():continue
params_dict[key] += delta[:, _range[0]:_range[1]]
norm_key = torch.norm(delta[:, _range[0]:_range[1]])
params.update(params_dict)
return params
@staticmethod
def localTransform(J_shaped, poses, rootIdx, kintree):
bn = poses.shape[0]
nThetas = poses.shape[1]//3
localTrans = torch.eye(4, device=poses.device)[None, None].repeat(bn, nThetas, 1, 1)
poses_flat = poses.reshape(-1, 3)
rot_flat = batch_rodrigues(poses_flat)
rot = rot_flat.view(bn, nThetas, 3, 3)
localTrans[:, :, :3, :3] = rot
# set the root
localTrans[:, rootIdx, :3, 3] = J_shaped[rootIdx].view(1, 3)
# relTrans: (nKintree, 3)
relTrans = J_shaped[kintree[:, 1]] - J_shaped[kintree[:, 0]]
localTrans[:, kintree[:, 1], :3, 3] = relTrans[None]
return localTrans
@staticmethod
def globalTransform(localTrans, rootIdx, kintree):
# localTrans: (bn, nJoints, 4, 4)
globalTrans = localTrans.clone()
# set the root
for (parent, child) in kintree:
globalTrans[:, child] = globalTrans[:, parent] @ localTrans[:, child]
return globalTrans
def jacobi_GlobalTrans_theta(self, poses, j_shaped, rootIdx, kintree,
device, dtype):
parents = self.parents
start = time()
tic = lambda x: print('-> [{:20s}]: {:.3f}ms'.format(x, 1000*(time()-start)))
localTrans = self.localTransform(j_shaped, poses, rootIdx, kintree)
# tic('local trans')
globalTrans = self.globalTransform(localTrans, rootIdx, kintree)
# tic('global trans')
# 计算localTransformation
poses_flat = poses.view(poses.shape[0], -1, 3)
# jacobi_R_rvec: (bn, nJ, 3, 9)
Rot, jacobi_R_rvec = batch_rodrigues_jacobi(poses_flat)
bn, nJoints = localTrans.shape[:2]
dGlobalTrans_template = torch.zeros((bn, nJoints, 4, 4), device=device, dtype=dtype)
# compute global transformation
# results: global transformation to each theta: (bn, nJ, 4, 4, nTheta)
jacobi_GlobalTrans_theta = torch.zeros((bn, nJoints, 4, 4, nJoints*3), device=device, dtype=dtype)
# tic('rodrigues')
for djIdx in range(1, nJoints):
if djIdx in self.cfg.IGNORE_JOINTS: continue
# // 第djIdx个轴角的第dAxis个维度
for dAxis in range(3):
if dAxis in self.cfg.IGNORE_AXIS.get(str(djIdx), []): continue
# if(model->frozenJoint[3*djIdx+dAxis])continue;
# // 从上至下堆叠起来
dGlobalTrans = dGlobalTrans_template.clone()
# // 将local的映射过来
dGlobalTrans[:, djIdx, :3, :3] = jacobi_R_rvec[:, djIdx, dAxis].view(bn, 3, 3)
if djIdx != rootIdx:
# // d(R0 @ R1)/dt1 = R0 @ dR1/dt1, 这里的R0是上一级所有的累积因此使用全局的
parent = parents[djIdx]
dGlobalTrans[:, djIdx] = globalTrans[:, parent] @ dGlobalTrans[:, djIdx]
valid = np.zeros(nJoints, dtype=np.bool)
valid[djIdx] = True
# tic('current {}'.format(djIdx))
# // 遍历骨架树: 将对当前theta的导数传递下去
for (src, dst) in kintree:
# // 当前处理的关节为子节点的不用考虑
if dst == djIdx: continue
# if dst in self.cfg.IGNORE_JOINTS:continue
valid[dst] = valid[src]
if valid[src]:
# // 如果父节点是有效的: d(R0 @ R1)/dt0 = dR0/dt0 @ R1这里的R1是当前的局部的因此使用local的
dGlobalTrans[:, dst] = dGlobalTrans[:, src] @ localTrans[:, dst]
# tic('forward {}'.format(djIdx))
jacobi_GlobalTrans_theta[..., 3*djIdx+dAxis] = dGlobalTrans
# tic('jacobia')
return globalTrans, jacobi_GlobalTrans_theta
def fitPose(self, keypoints3d, params, weight, option, kpts_index=None):
# preprocess input data
if kpts_index is not None:
keypoints3d = keypoints3d[:, kpts_index]
bn = keypoints3d.shape[0]
conf = keypoints3d[..., -1:].repeat(1, 1, 3).reshape(bn, -1)
if (conf > 0.3).sum() < 4:
print('skip')
return params
w_joints = 1./keypoints3d.shape[-2] * weight.joints
# pre-calculate the shape
Rh, Th, poses = params['Rh'], params['Th'], params['poses']
init_dict = {
'Rh': Rh.clone(),
'Th': Th.clone(),
'poses': poses.clone()
}
init_dict['Rot'] = batch_rodrigues(init_dict['Rh'])
zero_dict = {key:torch.zeros_like(val) for key, val in init_dict.items()}
keys_optimized = ['Rh', 'Th', 'poses']
keys_range = {}
for ikey, key in enumerate(keys_optimized):
if ikey == 0:
keys_range[key] = [0, init_dict[key].shape[-1]]
else:
start = keys_range[keys_optimized[ikey-1]][1]
keys_range[key] = [start, start+init_dict[key].shape[-1]]
NUM_FRAME = keys_range[keys_optimized[-1]][1]
# calculate J_shaped
shapes_t = params['shapes'].t()
shape_offset = self.j_shapeBlend @ shapes_t
# jshaped: (nJoints, 3)
j_shaped = self.j_template + shape_offset[..., 0]
shape_offset = self.k_shapeBlend @ shapes_t
# kshaped: (nJoints, 3)
k_shaped = self.k_template + shape_offset[..., 0]
# optimize parameters
nJoints = j_shaped.shape[0]
dtype, device = poses.dtype, poses.device
lossnorm = 0
self.time = time()
for iter_ in range(option.max_iter):
# forward the model
# 0. poses => full poses
def tic(name):
print('[{:20}] {:.3f}ms'.format(name, 1000*(time()-self.time)))
if 'handl' in params.keys():
poses_full = self.body_model.extend_poses(poses, params['handl'], params['handr'])
jacobi_posesful_poses = self.body_model.jacobian_posesfull_poses_
else:
poses_full = self.body_model.extend_poses(poses)
jacobi_posesful_poses = self.body_model.jacobian_posesfull_poses(poses, poses_full)
# tic('jacobian poses full')
# 1. poses => local transformation => global transformation(bn, nJ, 4, 4)
globalTrans, jacobi_GlobalTrans_theta = self.jacobi_GlobalTrans_theta(poses_full, j_shaped, self.rootIdx, self.kintree, device, dtype)
# tic('global transform')
# 2. global transformation => relative transformation => final transformation
relTrans = globalTrans.clone()
relTrans[..., :3, 3:] -= torch.matmul(globalTrans[..., :3, :3], j_shaped[None, :, :, None])
relTrans_weight = torch.einsum('kj,fjab->fkab', self.k_weights, relTrans)
jacobi_relTrans_theta = jacobi_GlobalTrans_theta.clone()
# // consider topRight: T - RT0: add -dRT0/dt
# rot: (bn, nJoints, 3, 3, nThetas) @ (bn, nJoints, 1, 3, 1) => (bn, nJoints, 3, nThetas)
rot = jacobi_GlobalTrans_theta[..., :3, :3, :]
j0 = j_shaped.reshape(1, nJoints, 1, 3, 1).expand(bn, -1, -1, -1, -1)
jacobi_relTrans_theta[..., :3, 3, :] -= torch.sum(rot*j0, dim=-2)
jacobi_blendtrans_theta = torch.einsum('kj,fjabt->fkabt', self.k_weights, jacobi_relTrans_theta)
kpts = torch.einsum('fkab,kb->fka', relTrans_weight[..., :3, :3], k_shaped) + relTrans_weight[..., :3, 3]
# d(RJ0 + J1)/dtheta = d(R)/dtheta @ J0 + dJ1/dtheta
rot = jacobi_blendtrans_theta[..., :3, :3, :]
k0 = k_shaped.reshape(1, k_shaped.shape[0], 1, 3, 1).expand(bn, -1, -1, -1, -1)
# jacobi_keypoints_theta: (bn, nKeypoints, 3, nThetas)
jacobi_keypoints_theta = torch.sum(rot*k0, dim=-2) + jacobi_blendtrans_theta[..., :3, 3, :]
# tic('keypoints')
# // compute the jacobian of R T
# // loss: J_obs - (R @ jest + T)
rot, jacobi_R_rvec, jacobi_joints_RT = getJacobianOfRT(Rh, Th, kpts)
kpts_rt = torch.matmul(kpts, rot.transpose(-1, -2)) + Th[:, None]
rot_nk = rot[:, None].expand(-1, k_shaped.shape[0], -1, -1)
jacobi_keypoints_theta = torch.matmul(rot_nk, jacobi_keypoints_theta)
# compute jacobian of poses
shape_jacobi = jacobi_keypoints_theta.shape[:-1]
NUM_THETAS = jacobi_posesful_poses.shape[0]
jacobi_keypoints_poses = (jacobi_keypoints_theta[..., :NUM_THETAS].view(-1, NUM_THETAS) @ jacobi_posesful_poses).reshape(*shape_jacobi, -1)
# // loss: J_obs - (R @ jest + T) => -dR/drvec - dT/dtvec - Rx(djest/dtheta)
jacobi_keypoints = torch.cat([-jacobi_joints_RT, -jacobi_keypoints_poses], dim=-1)
if kpts_index is not None:
jacobi_keypoints = jacobi_keypoints[:, kpts_index]
kpts_rt = kpts_rt[:, kpts_index]
jacobi_keypoints_flat = jacobi_keypoints.view(bn, -1, jacobi_keypoints.shape[-1])
# tic('jacobian keypoints')
JTJ_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ (jacobi_keypoints_flat * conf[..., None])
res = conf[..., None] * ((keypoints3d[..., :3] - kpts_rt).view(bn, -1, 1))
JTr_keypoints = jacobi_keypoints_flat.transpose(-1, -2) @ res
cache_dict = {
'Th': Th,
'Rh': Rh,
'poses': poses,
}
# 计算loss
# JTJ = torch.zeros((bn*NUM_FRAME, bn*NUM_FRAME), device=device, dtype=dtype)
JTJ = torch.eye(bn*NUM_FRAME, device=device, dtype=dtype) * 1e-4
JTr = torch.zeros((bn*NUM_FRAME, 1), device=device, dtype=dtype)
# add regularization for each parameter
for nf in range(bn):
for key in keys_optimized:
start, end = nf*NUM_FRAME + keys_range[key][0], nf*NUM_FRAME + keys_range[key][1]
JTJ[start:end, start:end] += weight['reg_{}'.format(key)] * torch.eye(end-start)
JTr[start:end] += -weight['reg_{}'.format(key)] * cache_dict[key][nf].view(-1, 1)
# add init for Rh
if key == 'Rh':
# 增加初始化的loss
res_init = rot[nf] - init_dict['Rot'][nf]
JTJ[start:end, start:end] += weight['init_'+key] * jacobi_R_rvec[nf] @ jacobi_R_rvec[nf].T
JTr[start:end] += -weight.init_Rh * jacobi_R_rvec[nf] @ res_init.reshape(-1, 1)
else:
res_init = cache_dict[key][nf] - init_dict[key][nf]
JTJ[start:end, start:end] += weight['init_'+key] * torch.eye(end-start)
JTr[start:end] += -weight['init_'+key] * res_init.reshape(-1, 1)
# add keypoints loss
for nf in range(bn):
JTJ[nf*NUM_FRAME:(nf+1)*NUM_FRAME, nf*NUM_FRAME:(nf+1)*NUM_FRAME] += w_joints * JTJ_keypoints[nf]
JTr += - w_joints * JTr_keypoints.reshape(-1, 1)
# tic('add loss')
delta = torch.linalg.solve(JTJ, JTr)
# tic('solve')
norm_d, norm_f = self.log('pose', iter_, delta, res, keys_range)
if torch.norm(delta) < option.gtol:
break
if iter_ > 0 and abs(norm_f - lossnorm)/norm_f < option.ftol:
break
delta = delta.view(bn, NUM_FRAME)
lossnorm = norm_f
for key, _range in keys_range.items():
if key not in cache_dict.keys():continue
cache_dict[key] += delta[:, _range[0]:_range[1]]
res = {
'id': params['id'],
'poses': poses,
'shapes': params['shapes'],
'Rh': Rh,
'Th': Th,
}
for key, val in params.items():
if key not in res.keys():
res[key] = val
return res
def try_to_init(self, records):
if self.init:
return copy.deepcopy(self.params_newest)
mywarn('>> Initialize')
keypoints3d = self.get_keypoints3d(records)
params, keypoints_template = self.fitShape(keypoints3d, self.get_init_params(keypoints3d.shape[0]), self.cfg.shape.weight, self.cfg.shape.option)
Rot = batch_rodrigues(params['Rh'])
keypoints_template = torch.matmul(keypoints_template, Rot[0].t())
if self.cfg.initRT.mean_T:
conf = keypoints3d[..., 3:]
T = ((keypoints3d[..., :3] - keypoints_template[None])*conf).sum(dim=-2)/(conf.sum(dim=-2))
params['Th'] = T
params = self.fitRT(keypoints3d, params, self.cfg.initRT.weight, self.cfg.initRT.option,
kpts_index=self.cfg.TORSO_INDEX)
params = self.fitPose(keypoints3d, params, self.cfg.init_pose.weight, self.cfg.init_pose.option,
kpts_index=self.cfg.TORSO_INDEX)
params = self.fitPose(keypoints3d, params, self.cfg.init_pose.weight, self.cfg.init_pose.option,
kpts_index=self.cfg.BODY_INDEX)
mywarn('>> Initialize Rh = {}, Th = {}'.format(params['Rh'][0], params['Th'][0]))
params = Params(params)[-1]
self.init = True
return params
def fitting(self, params_init, records):
keypoints3d = self.get_keypoints3d(records[-self.WINDOW_SIZE:])
params = params_init
params = self.fitRT(keypoints3d[-self.FITTING_SIZE:], params, self.cfg.RT.weight, self.cfg.RT.option)
params = self.fitPose(keypoints3d[-self.FITTING_SIZE:], params, self.cfg.pose.weight, self.cfg.pose.option)
return params
def filter_result(self, result):
poses = result['poses'].reshape(-1, 3)
# TODO: 这里的xyz是scipy中的XYZ
euler = axis_angle_to_euler(poses, order='xyz')
# euler[euler>np.pi] = 0.
poses = euler_to_axis_angle(euler, order='xyz')
result['euler'] = euler
return result
def smooth_results(self):
results_ = {}
for key in self.results[0].keys():
if key == 'id': continue
if key not in self.SMOOTH_SIZE.keys():continue
results_[key] = np.vstack([r[key] for r in self.results[-self.SMOOTH_SIZE[key]:]])
results_[key] = np.mean(results_[key], axis=0, keepdims=True)
results_['id'] = 0
for key, val in self.fix_dict.items():
results_[key] = np.array(val)
# results_['Th'][:] = 0.
return [results_]
def check_keypoints(self, keypoints3d):
flag = (keypoints3d[..., -1]>0.3).sum() > 5
if len(self.records) > 1:
pre = self.records[-1]
k_pre = self.get_keypoints3d([pre])
dist = torch.norm(keypoints3d[..., :3] - k_pre[..., :3], dim=-1)
conf = torch.sqrt(keypoints3d[..., 3] * k_pre[..., 3])
dist_mean = (dist * conf).sum()/conf.sum()
flag = flag and dist_mean < 0.1
return flag
def __call__(self, data):
self.frame_index += 1
k3d = self.triangulator(data)[0]
keypoints3d = self.get_keypoints3d([k3d])
flag = self.check_keypoints(keypoints3d)
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
if self.frame_index - self.frame_latest > 10 and self.init:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
self.results = []
return -1
self.records.append(k3d)
if len(self.records) < self.INIT_SIZE:
return -1
with Timer('fitting', True):
params = self.try_to_init(self.records)
params = self.fitting(params, self.records)
params = self.filter_result(params)
self.results.append(params)
self.params_newest = params
self.frame_latest = self.frame_index
return self.smooth_results()
class FitterCPPCache:
def __init__(self) -> None:
self.init = False
self.records = []
self.frame_index = 0
self.frame_latest = 0
def try_to_init(self, keypoints3d):
if self.init:
return copy.deepcopy(self.params_newest)
mywarn('>> Initialize')
params = self.handmodel.init_params(1)
if not (keypoints3d[..., -1] > 0.).all(): # 只有一半的点可见
return params
params = self.handmodel.fit3DShape(keypoints3d, params)
params = self.handmodel.init3DRT(keypoints3d[-1:], params)
params = self.handmodel.fit3DPose(keypoints3d[-1:], params)
mywarn('>> Initialize Rh = {}, Th = {}'.format(params['Rh'][0], params['Th'][0]))
self.init = True
return params
def check_keypoints(self, keypoints3d):
flag = (keypoints3d[..., -1]>0.3).sum() > 5
if len(self.records) > 1:
k_pre = self.records[-1][None]
dist = np.linalg.norm(keypoints3d[..., :3] - k_pre[..., :3], axis=-1)
conf = np.sqrt(keypoints3d[..., 3] * k_pre[..., 3])
dist_mean = (dist * conf).sum()/(1e-5+conf.sum())
print(dist_mean)
flag = flag and dist_mean < 0.1
return flag
def smooth_results(self, params=None):
if params is None:
params = self.params_newest.copy()
params['poses'] = params['poses'][:, 3:]
params['id'] = 0
return [params]
def __call__(self, keypoints3d):
self.frame_index += 1
flag = self.check_keypoints(keypoints3d)
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
if self.frame_index - self.frame_latest > 10:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
return self.smooth_results(self.handmodel.init_params(1))
self.records.append(keypoints3d)
with Timer('fitting'):
params = self.try_to_init(keypoints3d[None])
params = self.handmodel.fit3DPose(keypoints3d[None], params)
self.params_newest = params
self.frame_latest = self.frame_index
return self.smooth_results()
class ManoFitterCPPCache(FitterCPPCache):
def __init__(self, name='LEFT') -> None:
super().__init__()
self.handmodel = load_object_from_cmd('config/model/mano_full_cpp.yml', [
'args.model_path', 'data/bodymodels/manov1.2/MANO_{}.pkl'.format(name),
'args.regressor_path', 'data/smplx/J_regressor_mano_{}.txt'.format(name),
])
class SMPLFitterCPPCache(FitterCPPCache):
def __init__(self, name='half') -> None:
super().__init__()
self.handmodel = load_object_from_cmd('config/model/mano_full_cpp.yml', [
'args.model_path', 'data/bodymodels/manov1.2/MANO_{}.pkl'.format(name),
'args.regressor_path', 'data/smplx/J_regressor_mano_{}.txt'.format(name),
])
class ManoFitterCPP:
def __init__(self, cfg_triangulator, key) -> None:
self.handmodel = load_object_from_cmd('config/model/mano_full_cpp.yml', [])
self.triangulator = load_object_from_cmd(cfg_triangulator, [])
self.time = 0
self.frame_latest = 0
self.frame_index = 0
self.key = 'handl3d'
self.init = False
self.params_newest = None
self.records, self.results = [], []
self.INIT_SIZE = 10
def get_keypoints3d(self, records, key=None):
if key is None:
return np.stack([r[self.key] for r in records])
else:
return np.stack([r[key] for r in records])
def try_to_init(self, records):
if self.init:
return copy.deepcopy(self.params_newest)
mywarn('>> Initialize')
keypoints3d = self.get_keypoints3d(records)
params = self.handmodel.init_params(1)
params = self.handmodel.fit3DShape(keypoints3d, params)
params = self.handmodel.init3DRT(keypoints3d[-1:], params)
params = self.handmodel.fit3DPose(keypoints3d[-1:], params)
mywarn('>> Initialize Rh = {}, Th = {}'.format(params['Rh'][0], params['Th'][0]))
self.init = True
return params
def smooth_results(self):
params = self.params_newest.copy()
params['poses'] = params['poses'][:, 3:]
params['id'] = 0
return [params]
def __call__(self, data):
self.frame_index += 1
k3d = self.triangulator(data)[0]
keypoints3d = self.get_keypoints3d([k3d])
# flag = self.check_keypoints(keypoints3d)
flag = True
if not flag:
mywarn('Missing keypoints {} [{}->{}]'.format(keypoints3d[..., -1].sum(), self.frame_latest, self.frame_index))
if self.frame_index - self.frame_latest > 10 and self.init:
mywarn('Missing keypoints, resetting...')
self.init = False
self.records = []
self.results = []
return -1
self.records.append(k3d)
if len(self.records) < self.INIT_SIZE:
return -1
with Timer('fitting'):
params = self.try_to_init(self.records)
k3d = self.get_keypoints3d(self.records[-1:])
params = self.handmodel.fit3DPose(k3d, params)
# params['poses'] = torch.Tensor(params['poses'][:, 3:])
# params['shapes'] = torch.Tensor(params['shapes'])
# params['Rh'] = torch.Tensor(params['Rh'])
# params['Th'] = torch.Tensor(params['Th'])
# params = self.filter_result(params)
self.results.append(params)
self.params_newest = params
self.frame_latest = self.frame_index
return self.smooth_results()
class BodyFitter(BaseFitter):
def __init__(self, key, **kwargs):
super().__init__(**kwargs)
self.key = key
def get_keypoints3d(self, records, key=None):
if key is None:
return torch.Tensor(np.stack([r[self.key] for r in records]))
else:
return torch.Tensor(np.stack([r[key] for r in records]))
class ManoFitter(BodyFitter):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
class HalfFitter(BodyFitter):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.INDEX_HALF = [0,1,2,3,4,5,6,7,15,16,17,18]
def get_keypoints3d(self, records):
THRES_WRIST = 0.05
keypoints3d = super().get_keypoints3d(records)
keypoints3d = keypoints3d[:, self.INDEX_HALF]
handl = super().get_keypoints3d(records, key='handl3d')
handr = super().get_keypoints3d(records, key='handr3d')
dist_ll = torch.norm(keypoints3d[:, 7, :3] - handl[:, 0, :3], dim=-1)
dist_rr = torch.norm(keypoints3d[:, 4, :3] - handr[:, 0, :3], dim=-1)
handl[dist_ll>THRES_WRIST] = 0.
handr[dist_rr>THRES_WRIST] = 0.
keypoints3d = np.hstack([keypoints3d, handl, handr])
conf = keypoints3d[..., 3:]
keypoints3d = np.hstack([(keypoints3d[..., :3] * conf).sum(axis=0)/(1e-5 + conf.sum(axis=0)), conf.min(axis=0)])
keypoints3d = keypoints3d[None]
# if (keypoints3d.shape[0] == 10):
return torch.Tensor(keypoints3d)
def filter_result(self, result):
result = super().filter_result(result)
# 限定一下关节旋转
# 手肘
# result['poses'][:, 5*3+1] = np.clip(result['poses'][:, 5*3+1], -2.5, 0.1)
# result['poses'][:, 6*3+1] = np.clip(result['poses'][:, 5*3+1], -0.1, 2.5)
# 手腕
return result
class HalfHandFitter(HalfFitter):
def __init__(self, cfg_handl, cfg_handr, **kwargs) -> None:
super().__init__(**kwargs)
self.handl = load_object_from_cmd(cfg_handl, [])
self.handr = load_object_from_cmd(cfg_handr, [])
def get_init_params(self, nFrames):
params = super().get_init_params(nFrames)
params_ = self.handl.get_init_params(nFrames)
params['shapes_handl'] = params_['shapes']
params['shapes_handr'] = params_['shapes'].clone()
params['Rh_handl'] = torch.zeros((nFrames, 3))
params['Rh_handr'] = torch.zeros((nFrames, 3))
params['Th_handl'] = torch.zeros((nFrames, 3))
params['Th_handr'] = torch.zeros((nFrames, 3))
return params
def fitPose(self, keypoints3d, params, weight, option):
keypoints = {
'handl': keypoints3d[:, -21-21:-21, :],
'handr': keypoints3d[:, -21:, :]
}
for key in ['handl', 'handr']:
kpts = keypoints[key]
params_ = {
'id': 0,
'Rh': params['Rh_'+key],
'Th': params['Th_'+key],
'shapes': params['shapes_'+key],
'poses': params[key],
}
if key == 'handl':
params_ = self.handl.fitPose(kpts, params_, self.handl.cfg.pose.weight, self.handl.cfg.pose.option)
else:
params_ = self.handr.fitPose(kpts, params_, self.handr.cfg.pose.weight, self.handr.cfg.pose.option)
params['Rh_'+key] = params_['Rh']
params['Th_'+key] = params_['Th']
params['shapes_'+key] = params_['shapes']
params[key] = params_['poses']
return super().fitPose(keypoints3d, params, weight, option,
kpts_index=[0,1,2,3,4,5,6,7,8,9,10,11,
12, 17, 21, 25, 29,
24, 29, 33, 37, 41])
def try_to_init(self, records):
if self.init:
return self.params_newest
params = super().try_to_init(records)
self.handl.init = False
self.handr.init = False
key = 'handl'
params_ = self.handl.try_to_init(records)
params['handl'] = params_['poses']
params['Rh_'+key] = params_['Rh']
params['Th_'+key] = params_['Th']
params['shapes_'+key] = params_['shapes']
key = 'handr'
params_ = self.handr.try_to_init(records)
params[key] = params_['poses']
params['Rh_'+key] = params_['Rh']
params['Th_'+key] = params_['Th']
params['shapes_'+key] = params_['shapes']
return params
if __name__ == '__main__':
from glob import glob
from os.path import join
from tqdm import tqdm
from ..mytools.file_utils import read_json
from ..config.baseconfig import load_object_from_cmd
data= '/nas/datasets/EasyMocap/302'
# data = '/home/qing/Dataset/handl'
mode = 'half'
# data = '/home/qing/DGPU/home/shuaiqing/zju-mocap-mp/female-jump'
data = '/home/qing/Dataset/desktop/0402/test3'
k3dnames = sorted(glob(join(data, 'output-keypoints3d', 'keypoints3d', '*.json')))
if mode == 'handl':
fitter = load_object_from_cmd('config/recon/fit_manol.yml', [])
elif mode == 'half':
fitter = load_object_from_cmd('config/recon/fit_half.yml', [])
elif mode == 'smpl':
fitter = load_object_from_cmd('config/recon/fit_smpl.yml', [])
from easymocap.socket.base_client import BaseSocketClient
client = BaseSocketClient('0.0.0.0', 9999)
for k3dname in tqdm(k3dnames):
k3ds = read_json(k3dname)
if mode == 'handl':
k3ds = np.array(k3ds[0]['handl3d'])
data = {fitter.key3d: k3ds}
elif mode == 'half':
data = {
'keypoints3d': np.array(k3ds[0]['keypoints3d']),
'handl3d': np.array(k3ds[0]['handl3d']),
'handr3d': np.array(k3ds[0]['handr3d'])
}
elif mode == 'smpl':
k3ds = np.array(k3ds[0]['keypoints3d'])
data = {fitter.key3d: k3ds}
results = fitter(data)
if results != -1:
client.send_smpl(results)