undistortion applied to calibration and personAssociation too

This commit is contained in:
davidpagnon 2024-01-04 14:19:08 +01:00
parent ec650276b4
commit 9f8f40d413
4 changed files with 235 additions and 509 deletions

View File

@ -687,11 +687,13 @@ def calibrate_extrinsics(calib_dir, extrinsics_config_dict, C, S, K, D):
r, t = r.flatten(), t.flatten()
# Projection of object points to image plane
Kh_cam = np.block([mtx, np.zeros(3).reshape(3,1)])
r_mat, _ = cv2.Rodrigues(r)
H_cam = np.block([[r_mat,t.reshape(3,1)], [np.zeros(3), 1 ]])
P_cam = Kh_cam.dot(H_cam)
proj_obj = [ ( P_cam[0].dot(np.append(o, 1)) / P_cam[2].dot(np.append(o, 1)), P_cam[1].dot(np.append(o, 1)) / P_cam[2].dot(np.append(o, 1)) ) for o in objp]
# # Former way, distortions used to be ignored
# Kh_cam = np.block([mtx, np.zeros(3).reshape(3,1)])
# r_mat, _ = cv2.Rodrigues(r)
# H_cam = np.block([[r_mat,t.reshape(3,1)], [np.zeros(3), 1 ]])
# P_cam = Kh_cam.dot(H_cam)
# proj_obj = [ ( P_cam[0].dot(np.append(o, 1)) / P_cam[2].dot(np.append(o, 1)), P_cam[1].dot(np.append(o, 1)) / P_cam[2].dot(np.append(o, 1)) ) for o in objp]
proj_obj = np.squeeze(cv2.projectPoints(objp,r,t,mtx,dist)[0])
# Check calibration results
if show_reprojection_error:

View File

@ -87,9 +87,9 @@ def computeP(calib_file, undistort=False):
P = []
for cam in list(calib.keys()):
if cam != 'metadata':
S = np.array(calib[cam]['size'])
K = np.array(calib[cam]['matrix'])
if undistort:
S = np.array(calib[cam]['size'])
dist = np.array(calib[cam]['distortions'])
optim_K = cv2.getOptimalNewCameraMatrix(K, dist, [int(s) for s in S], 1, [int(s) for s in S])[0]
Kh = np.block([optim_K, np.zeros(3).reshape(3,1)])

View File

@ -37,6 +37,7 @@ import json
import itertools as it
import toml
from tqdm import tqdm
import cv2
from anytree import RenderTree
from anytree.importer import DictImporter
import logging
@ -88,7 +89,7 @@ def persons_combinations(json_files_framef):
return personsIDs_comb
def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_combinations, projection_matrices, tracked_keypoint_id):
def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_combinations, projection_matrices, tracked_keypoint_id, calib_params):
'''
At the same time, chooses the right person among the multiple ones found by
OpenPose & excludes cameras with wrong 2d-pose estimation.
@ -114,6 +115,7 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
error_threshold_tracking = config.get('personAssociation').get('reproj_error_threshold_association')
min_cameras_for_triangulation = config.get('triangulation').get('min_cameras_for_triangulation')
likelihood_threshold = config.get('triangulation').get('likelihood_threshold')
undistort_points = config.get('triangulation').get('undistort_points')
n_cams = len(json_files_framef)
error_min = np.inf
@ -136,6 +138,13 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
y_files.append(np.nan)
likelihood_files.append(np.nan)
# undistort points
if undistort_points:
points = np.array(tuple(zip(x_files,y_files))).reshape(-1, 1, 2).astype('float32')
undistorted_points = [cv2.undistortPoints(points[i], calib_params['K'][i], calib_params['dist'][i], None, calib_params['optim_K'][i]) for i in range(n_cams)]
x_files = np.array([[u[i][0][0] for i in range(len(u))] for u in undistorted_points]).squeeze()
y_files = np.array([[u[i][0][1] for i in range(len(u))] for u in undistorted_points]).squeeze()
# Replace likelihood by 0. if under likelihood_threshold
likelihood_files = [0. if lik < likelihood_threshold else lik for lik in likelihood_files]
@ -153,13 +162,23 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
y_files_filt = [y_files[i] for i in range(len(comb)) if not np.isnan(comb[i])]
likelihood_files_filt = [likelihood_files[i] for i in range(len(comb)) if not np.isnan(comb[i])]
projection_matrices_filt = [projection_matrices[i] for i in range(len(comb)) if not np.isnan(comb[i])]
if undistort_points:
calib_params_R_filt = [calib_params['R'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
calib_params_T_filt = [calib_params['T'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
calib_params_K_filt = [calib_params['K'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
calib_params_dist_filt = [calib_params['dist'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
# Triangulate 2D points
Q_comb = weighted_triangulation(projection_matrices_filt, x_files_filt, y_files_filt, likelihood_files_filt)
# Reprojection
x_calc, y_calc = reprojection(projection_matrices_filt, Q_comb)
if undistort_points:
coords_2D_kpt_calc_filt = [cv2.projectPoints(np.array(Q_comb[:-1]), calib_params_R_filt[i], calib_params_T_filt[i], calib_params_K_filt[i], calib_params_dist_filt[i])[0] for i in range(n_cams-nb_cams_off)]
x_calc = [coords_2D_kpt_calc_filt[i][0,0,0] for i in range(n_cams-nb_cams_off)]
y_calc = [coords_2D_kpt_calc_filt[i][0,0,1] for i in range(n_cams-nb_cams_off)]
else:
x_calc, y_calc = reprojection(projection_matrices_filt, Q_comb)
# Reprojection error
error_comb_per_cam = []
for cam in range(len(x_calc)):
@ -198,7 +217,7 @@ def recap_tracking(config, error, nb_cams_excluded):
project_dir = config.get('project').get('project_dir')
session_dir = os.path.realpath(os.path.join(project_dir, '..', '..'))
tracked_keypoint = config.get('personAssociation').get('tracked_keypoint')
error_threshold_tracking = config.get('personAssociation').get('error_threshold_tracking')
error_threshold_tracking = config.get('personAssociation').get('reproj_error_threshold_association')
poseTracked_dir = os.path.join(project_dir, 'pose-associated')
calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if ('Calib' or 'calib') in c][0]
calib_file = glob.glob(os.path.join(calib_dir, '*.toml'))[0] # lastly created calibration file
@ -256,6 +275,7 @@ def track_2d_all(config):
# projection matrix from toml calibration file
P = computeP(calib_file, undistort=undistort_points)
calib_params = retrieve_calib_params(calib_file)
# selection of tracked keypoint id
try: # from skeletons.py
@ -296,7 +316,7 @@ def track_2d_all(config):
personsIDs_comb = persons_combinations(json_files_f)
# choose person of interest and exclude cameras with bad pose estimation
error_min, persons_and_cameras_combination = best_persons_and_cameras_combination(config, json_files_f, personsIDs_comb, P, tracked_keypoint_id)
error_min, persons_and_cameras_combination = best_persons_and_cameras_combination(config, json_files_f, personsIDs_comb, P, tracked_keypoint_id, calib_params)
error_min_tot.append(error_min)
cameras_off_count = np.count_nonzero(np.isnan(persons_and_cameras_combination))
cameras_off_tot.append(cameras_off_count)

View File

@ -4,30 +4,26 @@
'''
###########################################################################
## ROBUST TRIANGULATION OF 2D COORDINATES ##
## TRACKING OF PERSON OF INTEREST ##
###########################################################################
This module triangulates 2D json coordinates and builds a .trc file readable
by OpenSim.
Openpose detects all people in the field of view.
Which is the one of interest?
This module tries all possible triangulations of a chosen anatomical
point. If "single_person" mode is used, it chooses the person for whom the
reprojection error is smallest. If multi-person is used, it selects all
persons with a reprojection error smaller than a threshold, and then
associates them across time frames by minimizing the displacement speed.
The triangulation is weighted by the likelihood of each detected 2D keypoint
(if they meet the likelihood threshold). If the reprojection error is above a
threshold, right and left sides are swapped; if it is still above, a camera
is removed for this point and this frame, until the threshold is met. If more
cameras are removed than a predefined minimum, triangulation is skipped for
the point and this frame. In the end, missing values are interpolated.
In case of multiple subjects detection, make sure you first run the
personAssociation module.
INPUTS:
- a calibration file (.toml extension)
- json files for each camera with only one person of interest
- json files from each camera folders with several detected persons
- a Config.toml file
- a skeleton model
OUTPUTS:
- a .trc file with 3D coordinates in Y-up system coordinates
- json files for each camera with only one person of interest
'''
@ -39,12 +35,9 @@ import fnmatch
import numpy as np
import json
import itertools as it
import pandas as pd
import cv2
import toml
from tqdm import tqdm
from scipy import interpolate
from collections import Counter
import cv2
from anytree import RenderTree
from anytree.importer import DictImporter
import logging
@ -66,466 +59,225 @@ __status__ = "Development"
## FUNCTIONS
def zup2yup(Q):
def persons_combinations(json_files_framef):
'''
Turns Z-up system coordinates into Y-up coordinates
Find all possible combinations of detected persons' ids.
Person's id when no person detected is set to -1.
INPUT:
- Q: pandas dataframe
N 3D points as columns, ie 3*N columns in Z-up system coordinates
and frame number as rows
- json_files_framef: list of strings
OUTPUT:
- Q: pandas dataframe with N 3D points in Y-up system coordinates
- personsIDs_comb: array, list of lists of int
'''
# X->Y, Y->Z, Z->X
cols = list(Q.columns)
cols = np.array([[cols[i*3+1],cols[i*3+2],cols[i*3]] for i in range(int(len(cols)/3))]).flatten()
Q = Q[cols]
return Q
n_cams = len(json_files_framef)
# amount of persons detected for each cam
nb_persons_per_cam = []
for c in range(n_cams):
with open(json_files_framef[c], 'r') as js:
nb_persons_per_cam += [len(json.load(js)['people'])]
# persons_combinations
id_no_detect = [i for i, x in enumerate(nb_persons_per_cam) if x == 0] # ids of cameras that have not detected any person
nb_persons_per_cam = [x if x != 0 else 1 for x in nb_persons_per_cam] # temporarily replace persons count by 1 when no detection
range_persons_per_cam = [range(nb_persons_per_cam[c]) for c in range(n_cams)]
personsIDs_comb = np.array(list(it.product(*range_persons_per_cam)), float) # all possible combinations of persons' ids
personsIDs_comb[:,id_no_detect] = np.nan # -1 = persons' ids when no person detected
return personsIDs_comb
def interpolate_zeros_nans(col, *args):
def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_combinations, projection_matrices, tracked_keypoint_id, calib_params):
'''
Interpolate missing points (of value zero),
unless more than N contiguous values are missing.
At the same time, chooses the right person among the multiple ones found by
OpenPose & excludes cameras with wrong 2d-pose estimation.
1. triangulate the tracked keypoint for all possible combinations of people,
2. compute difference between reprojection & original openpose detection,
3. take combination with smallest difference.
If error is too big, take off one or several of the cameras until err is
lower than "max_err_px".
INPUTS:
- col: pandas column of coordinates
- args[0] = N: max number of contiguous bad values, above which they won't be interpolated
- args[1] = kind: 'linear', 'slinear', 'quadratic', 'cubic'. Default: 'cubic'
- a Config.toml file
- json_files_framef: list of strings
- personsIDs_combinations: array, list of lists of int
- projection_matrices: list of arrays
- tracked_keypoint_id: int
OUTPUT:
- col_interp: interpolated pandas column
OUTPUTS:
- error_min: float
- persons_and_cameras_combination: array of ints
'''
if len(args)==2:
N, kind = args
if len(args)==1:
N = np.inf
kind = args[0]
if not args:
N = np.inf
# Interpolate nans
mask = ~(np.isnan(col) | col.eq(0)) # true where nans or zeros
idx_good = np.where(mask)[0]
if 'kind' not in locals(): # 'linear', 'slinear', 'quadratic', 'cubic'
f_interp = interpolate.interp1d(idx_good, col[idx_good], kind="linear", bounds_error=False)
else:
f_interp = interpolate.interp1d(idx_good, col[idx_good], kind=kind, fill_value='extrapolate', bounds_error=False)
col_interp = np.where(mask, col, f_interp(col.index)) #replace at false index with interpolated values
error_threshold_tracking = config.get('personAssociation').get('reproj_error_threshold_association')
min_cameras_for_triangulation = config.get('triangulation').get('min_cameras_for_triangulation')
likelihood_threshold = config.get('triangulation').get('likelihood_threshold')
undistort_points = config.get('triangulation').get('undistort_points')
n_cams = len(json_files_framef)
error_min = np.inf
nb_cams_off = 0 # cameras will be taken-off until the reprojection error is under threshold
# Reintroduce nans if lenght of sequence > N
idx_notgood = np.where(~mask)[0]
gaps = np.where(np.diff(idx_notgood) > 1)[0] + 1 # where the indices of true are not contiguous
sequences = np.split(idx_notgood, gaps)
if sequences[0].size>0:
for seq in sequences:
if len(seq) > N: # values to exclude from interpolation are set to false when they are too long
col_interp[seq] = np.nan
while error_min > error_threshold_tracking and n_cams - nb_cams_off >= min_cameras_for_triangulation:
# Try all persons combinations
for combination in personsIDs_combinations:
# Get x,y,likelihood values from files
x_files, y_files,likelihood_files = [], [], []
for index_cam, person_nb in enumerate(combination):
with open(json_files_framef[index_cam], 'r') as json_f:
js = json.load(json_f)
try:
x_files.append( js['people'][int(person_nb)]['pose_keypoints_2d'][tracked_keypoint_id*3] )
y_files.append( js['people'][int(person_nb)]['pose_keypoints_2d'][tracked_keypoint_id*3+1] )
likelihood_files.append( js['people'][int(person_nb)]['pose_keypoints_2d'][tracked_keypoint_id*3+2] )
except:
x_files.append(np.nan)
y_files.append(np.nan)
likelihood_files.append(np.nan)
# undistort points
if undistort_points:
points = np.array(tuple(zip(x_files,y_files))).reshape(-1, 1, 2).astype('float32')
undistorted_points = [cv2.undistortPoints(points[i], calib_params['K'][i], calib_params['dist'][i], None, calib_params['optim_K'][i]) for i in range(n_cams)]
x_files = np.array([[u[i][0][0] for i in range(len(u))] for u in undistorted_points]).squeeze()
y_files = np.array([[u[i][0][1] for i in range(len(u))] for u in undistorted_points]).squeeze()
# Replace likelihood by 0. if under likelihood_threshold
likelihood_files = [0. if lik < likelihood_threshold else lik for lik in likelihood_files]
# For each persons combination, create subsets with "nb_cams_off" cameras excluded
id_cams_off = list(it.combinations(range(len(combination)), nb_cams_off))
combinations_with_cams_off = np.array([combination.copy()]*len(id_cams_off))
for i, id in enumerate(id_cams_off):
combinations_with_cams_off[i,id] = np.nan
# Try all subsets
error_comb = []
for comb in combinations_with_cams_off:
# Filter x, y, likelihood, projection_matrices, with subset
x_files_filt = [x_files[i] for i in range(len(comb)) if not np.isnan(comb[i])]
y_files_filt = [y_files[i] for i in range(len(comb)) if not np.isnan(comb[i])]
likelihood_files_filt = [likelihood_files[i] for i in range(len(comb)) if not np.isnan(comb[i])]
projection_matrices_filt = [projection_matrices[i] for i in range(len(comb)) if not np.isnan(comb[i])]
if undistort_points:
calib_params_R_filt = [calib_params['R'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
calib_params_T_filt = [calib_params['T'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
calib_params_K_filt = [calib_params['K'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
calib_params_dist_filt = [calib_params['dist'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
# Triangulate 2D points
Q_comb = weighted_triangulation(projection_matrices_filt, x_files_filt, y_files_filt, likelihood_files_filt)
# Reprojection
if undistort_points:
coords_2D_kpt_calc_filt = [cv2.projectPoints(np.array(Q_comb[:-1]), calib_params_R_filt[i], calib_params_T_filt[i], calib_params_K_filt[i], calib_params_dist_filt[i])[0] for i in range(n_cams-nb_cams_off)]
x_calc = [coords_2D_kpt_calc_filt[i][0,0,0] for i in range(n_cams-nb_cams_off)]
y_calc = [coords_2D_kpt_calc_filt[i][0,0,1] for i in range(n_cams-nb_cams_off)]
else:
x_calc, y_calc = reprojection(projection_matrices_filt, Q_comb)
# Reprojection error
error_comb_per_cam = []
for cam in range(len(x_calc)):
q_file = (x_files_filt[cam], y_files_filt[cam])
q_calc = (x_calc[cam], y_calc[cam])
error_comb_per_cam.append( euclidean_distance(q_file, q_calc) )
error_comb.append( np.mean(error_comb_per_cam) )
error_min = min(error_comb)
persons_and_cameras_combination = combinations_with_cams_off[np.argmin(error_comb)]
if error_min < error_threshold_tracking:
break
nb_cams_off += 1
return col_interp
return error_min, persons_and_cameras_combination
def make_trc(config, Q, keypoints_names, f_range):
'''
Make Opensim compatible trc file from a dataframe with 3D coordinates
INPUT:
- config: dictionary of configuration parameters
- Q: pandas dataframe with 3D coordinates as columns, frame number as rows
- keypoints_names: list of strings
- f_range: list of two numbers. Range of frames
OUTPUT:
- trc file
'''
# Read config
project_dir = config.get('project').get('project_dir')
frame_rate = config.get('project').get('frame_rate')
seq_name = os.path.basename(os.path.realpath(project_dir))
pose3d_dir = os.path.join(project_dir, 'pose-3d')
trc_f = f'{seq_name}_{f_range[0]}-{f_range[1]}.trc'
#Header
DataRate = CameraRate = OrigDataRate = frame_rate
NumFrames = len(Q)
NumMarkers = len(keypoints_names)
header_trc = ['PathFileType\t4\t(X/Y/Z)\t' + trc_f,
'DataRate\tCameraRate\tNumFrames\tNumMarkers\tUnits\tOrigDataRate\tOrigDataStartFrame\tOrigNumFrames',
'\t'.join(map(str,[DataRate, CameraRate, NumFrames, NumMarkers, 'm', OrigDataRate, f_range[0], f_range[1]])),
'Frame#\tTime\t' + '\t\t\t'.join(keypoints_names) + '\t\t',
'\t\t'+'\t'.join([f'X{i+1}\tY{i+1}\tZ{i+1}' for i in range(len(keypoints_names))])]
# Zup to Yup coordinate system
Q = zup2yup(Q)
#Add Frame# and Time columns
Q.index = np.array(range(0, f_range[1]-f_range[0])) + 1
Q.insert(0, 't', Q.index / frame_rate)
#Write file
if not os.path.exists(pose3d_dir): os.mkdir(pose3d_dir)
trc_path = os.path.realpath(os.path.join(pose3d_dir, trc_f))
with open(trc_path, 'w') as trc_o:
[trc_o.write(line+'\n') for line in header_trc]
Q.to_csv(trc_o, sep='\t', index=True, header=None, lineterminator='\n')
return trc_path
def recap_triangulate(config, error, nb_cams_excluded, keypoints_names, cam_excluded_count, interp_frames, non_interp_frames, trc_path):
def recap_tracking(config, error, nb_cams_excluded):
'''
Print a message giving statistics on reprojection errors (in pixel and in m)
as well as the number of cameras that had to be excluded to reach threshold
as well as the number of cameras that had to be excluded to reach threshold
conditions. Also stored in User/logs.txt.
INPUT:
- a Config.toml file
- error: dataframe
- nb_cams_excluded: dataframe
- keypoints_names: list of strings
OUTPUT:
- Message in console
'''
# Read config
project_dir = config.get('project').get('project_dir')
session_dir = os.path.realpath(os.path.join(project_dir, '..', '..'))
tracked_keypoint = config.get('personAssociation').get('tracked_keypoint')
error_threshold_tracking = config.get('personAssociation').get('reproj_error_threshold_association')
poseTracked_dir = os.path.join(project_dir, 'pose-associated')
calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if ('Calib' or 'calib') in c][0]
calib_file = glob.glob(os.path.join(calib_dir, '*.toml'))[0] # lastly created calibration file
calib = toml.load(calib_file)
cam_names = np.array([calib[c].get('name') for c in list(calib.keys())])
cam_names = cam_names[list(cam_excluded_count.keys())]
error_threshold_triangulation = config.get('triangulation').get('reproj_error_threshold_triangulation')
likelihood_threshold = config.get('triangulation').get('likelihood_threshold')
show_interp_indices = config.get('triangulation').get('show_interp_indices')
interpolation_kind = config.get('triangulation').get('interpolation')
handle_LR_swap = config.get('triangulation').get('handle_LR_swap')
# Recap
# Error
mean_error_px = np.around(np.mean(error), decimals=1)
calib = toml.load(calib_file)
calib_cam1 = calib[list(calib.keys())[0]]
fm = calib_cam1['matrix'][0][0]
Dm = euclidean_distance(calib_cam1['translation'], [0,0,0])
logging.info('')
for idx, name in enumerate(keypoints_names):
mean_error_keypoint_px = np.around(error.iloc[:,idx].mean(), decimals=1) # RMS à la place?
mean_error_keypoint_m = np.around(mean_error_keypoint_px * Dm / fm, decimals=3)
mean_cam_excluded_keypoint = np.around(nb_cams_excluded.iloc[:,idx].mean(), decimals=2)
logging.info(f'Mean reprojection error for {name} is {mean_error_keypoint_px} px (~ {mean_error_keypoint_m} m), reached with {mean_cam_excluded_keypoint} excluded cameras. ')
if show_interp_indices:
if interpolation_kind != 'none':
if len(list(interp_frames[idx])) ==0:
logging.info(f' No frames needed to be interpolated.')
else:
interp_str = str(interp_frames[idx]).replace(":", " to ").replace("'", "").replace("]", "").replace("[", "")
logging.info(f' Frames {interp_str} were interpolated.')
if len(list(non_interp_frames[idx]))>0:
noninterp_str = str(non_interp_frames[idx]).replace(":", " to ").replace("'", "").replace("]", "").replace("[", "")
logging.info(f' Frames {non_interp_frames[idx]} could not be interpolated: consider adjusting thresholds.')
else:
logging.info(f' No frames were interpolated because \'interpolation_kind\' was set to none. ')
mean_error_mm = np.around(mean_error_px * Dm / fm * 1000, decimals=1)
mean_error_px = np.around(error['mean'].mean(), decimals=1)
mean_error_mm = np.around(mean_error_px * Dm / fm *1000, decimals=1)
mean_cam_excluded = np.around(nb_cams_excluded['mean'].mean(), decimals=2)
# Excluded cameras
mean_cam_off_count = np.around(np.mean(nb_cams_excluded), decimals=2)
logging.info(f'\n--> Mean reprojection error for all points on all frames is {mean_error_px} px, which roughly corresponds to {mean_error_mm} mm. ')
logging.info(f'Cameras were excluded if likelihood was below {likelihood_threshold} and if the reprojection error was above {error_threshold_triangulation} px. Limb swapping was {"handled" if handle_LR_swap else "not handled"}.')
logging.info(f'In average, {mean_cam_excluded} cameras had to be excluded to reach these thresholds.')
cam_excluded_count = {i: v for i, v in zip(cam_names, cam_excluded_count.values())}
str_cam_excluded_count = ''
for i, (k, v) in enumerate(cam_excluded_count.items()):
if i ==0:
str_cam_excluded_count += f'Camera {k} was excluded {int(np.round(v*100))}% of the time, '
elif i == len(cam_excluded_count)-1:
str_cam_excluded_count += f'and Camera {k}: {int(np.round(v*100))}%.'
else:
str_cam_excluded_count += f'Camera {k}: {int(np.round(v*100))}%, '
logging.info(str_cam_excluded_count)
# Recap
logging.info(f'\n--> Mean reprojection error for {tracked_keypoint} point on all frames is {mean_error_px} px, which roughly corresponds to {mean_error_mm} mm. ')
logging.info(f'--> In average, {mean_cam_off_count} cameras had to be excluded to reach the demanded {error_threshold_tracking} px error threshold.')
logging.info(f'\nTracked json files are stored in {os.path.realpath(poseTracked_dir)}.')
logging.info(f'\n3D coordinates are stored at {trc_path}.')
def triangulation_from_best_cameras(config, coords_2D_kpt, coords_2D_kpt_swapped, projection_matrices, calib_params):
def track_2d_all(config):
'''
Triangulates 2D keypoint coordinates. If reprojection error is above threshold,
tries swapping left and right sides. If still above, removes a camera until error
is below threshold unless the number of remaining cameras is below a predefined number.
1. Creates subset with N cameras excluded
2. Tries all possible triangulations
3. Chooses the one with smallest reprojection error
If error too big, take off one more camera.
If then below threshold, retain result.
If better but still too big, take off one more camera.
INPUTS:
- a Config.toml file
- coords_2D_kpt: (x,y,likelihood) * ncams array
- coords_2D_kpt_swapped: (x,y,likelihood) * ncams array with left/right swap
- projection_matrices: list of arrays
OUTPUTS:
- Q: array of triangulated point (x,y,z,1.)
- error_min: float
- nb_cams_excluded: int
'''
# Read config
error_threshold_triangulation = config.get('triangulation').get('reproj_error_threshold_triangulation')
min_cameras_for_triangulation = config.get('triangulation').get('min_cameras_for_triangulation')
handle_LR_swap = config.get('triangulation').get('handle_LR_swap')
undistort_points = config.get('triangulation').get('undistort_points')
if undistort_points:
calib_params_K = calib_params['K']
calib_params_dist = calib_params['dist']
calib_params_R = calib_params['R']
calib_params_T = calib_params['T']
# Initialize
x_files, y_files, likelihood_files = coords_2D_kpt
x_files_swapped, y_files_swapped, likelihood_files_swapped = coords_2D_kpt_swapped
n_cams = len(x_files)
error_min = np.inf
nb_cams_off = 0 # cameras will be taken-off until reprojection error is under threshold
while error_min > error_threshold_triangulation and n_cams - nb_cams_off >= min_cameras_for_triangulation:
# Create subsets with "nb_cams_off" cameras excluded
id_cams_off = np.array(list(it.combinations(range(n_cams), nb_cams_off)))
if undistort_points:
calib_params_K_filt = [calib_params_K]*len(id_cams_off)
calib_params_dist_filt = [calib_params_dist]*len(id_cams_off)
calib_params_R_filt = [calib_params_R]*len(id_cams_off)
calib_params_T_filt = [calib_params_T]*len(id_cams_off)
projection_matrices_filt = [projection_matrices]*len(id_cams_off)
x_files_filt = np.vstack([x_files.copy()]*len(id_cams_off))
y_files_filt = np.vstack([y_files.copy()]*len(id_cams_off))
x_files_swapped_filt = np.vstack([x_files_swapped.copy()]*len(id_cams_off))
y_files_swapped_filt = np.vstack([y_files_swapped.copy()]*len(id_cams_off))
likelihood_files_filt = np.vstack([likelihood_files_swapped.copy()]*len(id_cams_off))
if nb_cams_off > 0:
for i in range(len(id_cams_off)):
x_files_filt[i][id_cams_off[i]] = np.nan
y_files_filt[i][id_cams_off[i]] = np.nan
x_files_swapped_filt[i][id_cams_off[i]] = np.nan
y_files_swapped_filt[i][id_cams_off[i]] = np.nan
likelihood_files_filt[i][id_cams_off[i]] = np.nan
nb_cams_excluded_filt = [np.count_nonzero(np.nan_to_num(x)==0) for x in likelihood_files_filt] # count nans and zeros
if undistort_points:
calib_params_K_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(x_files_filt[j][i]) ] for j, c in enumerate(calib_params_K_filt) ]
calib_params_dist_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(x_files_filt[j][i]) ] for j, c in enumerate(calib_params_dist_filt) ]
calib_params_R_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(x_files_filt[j][i]) ] for j, c in enumerate(calib_params_R_filt) ]
calib_params_T_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(x_files_filt[j][i]) ] for j, c in enumerate(calib_params_T_filt) ]
projection_matrices_filt = [ [ p[i] for i in range(n_cams) if not np.isnan(x_files_filt[j][i]) ] for j, p in enumerate(projection_matrices_filt) ]
x_files_filt = np.array([ [ xx for ii, xx in enumerate(x) if not np.isnan(xx) ] for x in x_files_filt ])
y_files_filt = np.array([ [ xx for ii, xx in enumerate(x) if not np.isnan(xx) ] for x in y_files_filt ])
x_files_swapped_filt = np.array([ [ xx for ii, xx in enumerate(x) if not np.isnan(xx) ] for x in x_files_swapped_filt ])
y_files_swapped_filt = np.array([ [ xx for ii, xx in enumerate(x) if not np.isnan(xx) ] for x in y_files_swapped_filt ])
likelihood_files_filt = np.array([ [ xx for ii, xx in enumerate(x) if not np.isnan(xx) ] for x in likelihood_files_filt ])
# Triangulate 2D points
Q_filt = [weighted_triangulation(projection_matrices_filt[i], x_files_filt[i], y_files_filt[i], likelihood_files_filt[i]) for i in range(len(id_cams_off))]
# Reprojection
if undistort_points:
coords_2D_kpt_calc_filt = np.array([[cv2.projectPoints(np.array(Q_filt[i][:-1]), calib_params_R_filt[i][j], calib_params_T_filt[i][j], calib_params_K_filt[i][j], calib_params_dist_filt[i][j])[0].ravel()
for j in range(n_cams-nb_cams_off)]
for i in range(len(id_cams_off))])
coords_2D_kpt_calc_filt = [[coords_2D_kpt_calc_filt[i,:,0], coords_2D_kpt_calc_filt[i,:,1]] for i in range(len(id_cams_off))]
else:
coords_2D_kpt_calc_filt = [reprojection(projection_matrices_filt[i], Q_filt[i]) for i in range(len(id_cams_off))]
coords_2D_kpt_calc_filt = np.array(coords_2D_kpt_calc_filt, dtype=object)
x_calc_filt = coords_2D_kpt_calc_filt[:,0]
y_calc_filt = coords_2D_kpt_calc_filt[:,1]
# Reprojection error
error = []
for config_off_id in range(len(x_calc_filt)):
q_file = [(x_files_filt[config_off_id][i], y_files_filt[config_off_id][i]) for i in range(len(x_files_filt[config_off_id]))]
q_calc = [(x_calc_filt[config_off_id][i], y_calc_filt[config_off_id][i]) for i in range(len(x_calc_filt[config_off_id]))]
error.append( np.mean( [euclidean_distance(q_file[i], q_calc[i]) for i in range(len(q_file))] ) )
# Choosing best triangulation (with min reprojection error)
error_min = min(error)
best_cams = np.argmin(error)
nb_cams_excluded = nb_cams_excluded_filt[best_cams]
Q = Q_filt[best_cams][:-1]
# Swap left and right sides if reprojection error still too high
if handle_LR_swap and error_min > error_threshold_triangulation:
n_cams_swapped = 1
error_off_swap_min = error_min
while error_off_swap_min > error_threshold_triangulation and n_cams_swapped < (n_cams - nb_cams_off) / 2: # more than half of the cameras switched: may triangulate twice the same side
# Create subsets
id_cams_swapped = np.array(list(it.combinations(range(n_cams-nb_cams_off), n_cams_swapped)))
x_files_filt_off_swap = np.array([[x] * len(id_cams_swapped) for x in x_files_filt])
y_files_filt_off_swap = np.array([[y] * len(id_cams_swapped) for y in y_files_filt])
for id_off in range(len(id_cams_off)): # for each configuration with nb_cams_off removed
for id_swapped, config_swapped in enumerate(id_cams_swapped): # for each of these configurations, test all subconfigurations with with n_cams_swapped swapped
x_files_filt_off_swap[id_off, id_swapped, config_swapped] = x_files_swapped_filt[id_off, config_swapped]
y_files_filt_off_swap[id_off, id_swapped, config_swapped] = y_files_swapped_filt[id_off, config_swapped]
# Triangulate 2D points
Q_filt_off_swap = np.array([[weighted_triangulation(projection_matrices_filt[id_off], x_files_filt_off_swap[id_off, id_swapped], y_files_filt_off_swap[id_off, id_swapped], likelihood_files_filt[id_off])
for id_swapped in range(len(id_cams_swapped))]
for id_off in range(len(id_cams_off))] )
# Reprojection
if undistort_points:
coords_2D_kpt_calc_off_swap = np.array([[[cv2.projectPoints(np.array(Q_filt_off_swap[id_off,id_swapped][:-1]), calib_params_R_filt[id_off][j], calib_params_T_filt[id_off][j], calib_params_K_filt[id_off][j], calib_params_dist_filt[id_off][j])[0].ravel()
for j in range(n_cams-nb_cams_off)]
for id_swapped in range(len(id_cams_swapped))]
for id_off in range(len(id_cams_off))])
coords_2D_kpt_calc_off_swap = np.array([[[coords_2D_kpt_calc_off_swap[id_off,id_swapped,:,0], coords_2D_kpt_calc_off_swap[id_off,id_swapped,:,1]]
for id_swapped in range(len(id_cams_swapped))]
for id_off in range(len(id_cams_off))])
else:
coords_2D_kpt_calc_off_swap = np.array([[reprojection(projection_matrices_filt[id_off], Q_filt_off_swap[id_off, id_swapped])
for id_swapped in range(len(id_cams_swapped))]
for id_off in range(len(id_cams_off))])
x_calc_off_swap = coords_2D_kpt_calc_off_swap[:,:,0]
y_calc_off_swap = coords_2D_kpt_calc_off_swap[:,:,1]
# Reprojection error
error_off_swap = []
for id_off in range(len(id_cams_off)):
error_percam = []
for id_swapped, config_swapped in enumerate(id_cams_swapped):
q_file_off_swap = [(x_files_filt_off_swap[id_off,id_swapped,i], y_files_filt_off_swap[id_off,id_swapped,i]) for i in range(n_cams - nb_cams_off)]
q_calc_off_swap = [(x_calc_off_swap[id_off,id_swapped,i], y_calc_off_swap[id_off,id_swapped,i]) for i in range(n_cams - nb_cams_off)]
error_percam.append( np.mean( [euclidean_distance(q_file_off_swap[i], q_calc_off_swap[i]) for i in range(len(q_file_off_swap))] ) )
error_off_swap.append(error_percam)
error_off_swap = np.array(error_off_swap)
# Choosing best triangulation (with min reprojection error)
error_off_swap_min = np.min(error_off_swap)
best_off_swap_config = np.unravel_index(error_off_swap.argmin(), error_off_swap.shape)
id_off_cams = best_off_swap_config[0]
id_swapped_cams = id_cams_swapped[best_off_swap_config[1]]
Q_best = Q_filt_off_swap[best_off_swap_config][:-1]
n_cams_swapped += 1
if error_off_swap_min < error_min:
error_min = error_off_swap_min
best_cams = id_off_cams
Q = Q_best
nb_cams_off += 1
# Index of excluded cams for this keypoint
id_excluded_cams = id_cams_off[best_cams]
# If triangulation not successful, error = 0, and 3D coordinates as missing values
if error_min > error_threshold_triangulation:
error_min = np.nan
# Q = np.array([0.,0.,0.])
Q = np.array([np.nan, np.nan, np.nan])
return Q, error_min, nb_cams_excluded, id_excluded_cams
def extract_files_frame_f(json_tracked_files_f, keypoints_ids):
'''
Extract data from json files for frame f,
in the order of the body model hierarchy.
INPUTS:
- json_tracked_files_f: list of str. Paths of json_files for frame f.
- keypoints_ids: list of int. Keypoints IDs in the order of the hierarchy.
OUTPUTS:
- x_files, y_files, likelihood_files: array:
n_cams lists of n_keypoints lists of coordinates.
'''
n_cams = len(json_tracked_files_f)
x_files, y_files, likelihood_files = [], [], []
for cam_nb in range(n_cams):
x_files_cam, y_files_cam, likelihood_files_cam = [], [], []
with open(json_tracked_files_f[cam_nb], 'r') as json_f:
js = json.load(json_f)
for keypoint_id in keypoints_ids:
try:
x_files_cam.append( js['people'][0]['pose_keypoints_2d'][keypoint_id*3] )
y_files_cam.append( js['people'][0]['pose_keypoints_2d'][keypoint_id*3+1] )
likelihood_files_cam.append( js['people'][0]['pose_keypoints_2d'][keypoint_id*3+2] )
except:
x_files_cam.append( np.nan )
y_files_cam.append( np.nan )
likelihood_files_cam.append( np.nan )
x_files.append(x_files_cam)
y_files.append(y_files_cam)
likelihood_files.append(likelihood_files_cam)
x_files = np.array(x_files)
y_files = np.array(y_files)
likelihood_files = np.array(likelihood_files)
return x_files, y_files, likelihood_files
def triangulate_all(config):
'''
For each frame
For each keypoint
- Triangulate keypoint
- Reproject it on all cameras
- Take off cameras until requirements are met
Interpolate missing values
Create trc file
For each frame,
- Find all possible combinations of detected persons
- Triangulate 'tracked_keypoint' for all combinations
- Reproject the point on all cameras
- Take combination with smallest reprojection error
- Write json file with only one detected person
Print recap message
INPUTS:
INPUTS:
- a calibration file (.toml extension)
- json files for each camera with only one person of interest
- json files from each camera folders with several detected persons
- a Config.toml file
- a skeleton model
OUTPUTS:
- a .trc file with 3D coordinates in Y-up system coordinates
- json files for each camera with only one person of interest
'''
# Read config
project_dir = config.get('project').get('project_dir')
session_dir = os.path.realpath(os.path.join(project_dir, '..', '..'))
pose_model = config.get('pose').get('pose_model')
tracked_keypoint = config.get('personAssociation').get('tracked_keypoint')
frame_range = config.get('project').get('frame_range')
likelihood_threshold = config.get('triangulation').get('likelihood_threshold')
interpolation_kind = config.get('triangulation').get('interpolation')
interp_gap_smaller_than = config.get('triangulation').get('interp_if_gap_smaller_than')
show_interp_indices = config.get('triangulation').get('show_interp_indices')
undistort_points = config.get('triangulation').get('undistort_points')
calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if ('Calib' or 'calib') in c][0]
calib_file = glob.glob(os.path.join(calib_dir, '*.toml'))[0] # lastly created calibration file
pose_dir = os.path.join(project_dir, 'pose')
poseTracked_dir = os.path.join(project_dir, 'pose-associated')
# Projection matrix from toml calibration file
# projection matrix from toml calibration file
P = computeP(calib_file, undistort=undistort_points)
calib_params = retrieve_calib_params(calib_file)
# Retrieve keypoints from model
# selection of tracked keypoint id
try: # from skeletons.py
model = eval(pose_model)
except:
@ -535,102 +287,54 @@ def triangulate_all(config):
model.id = None
except:
raise NameError('Model not found in skeletons.py nor in Config.toml')
keypoints_ids = [node.id for _, _, node in RenderTree(model) if node.id!=None]
keypoints_names = [node.name for _, _, node in RenderTree(model) if node.id!=None]
keypoints_idx = list(range(len(keypoints_ids)))
keypoints_nb = len(keypoints_ids)
# left/right swapped keypoints
keypoints_names_swapped = [keypoint_name.replace('R', 'L') if keypoint_name.startswith('R') else keypoint_name.replace('L', 'R') if keypoint_name.startswith('L') else keypoint_name for keypoint_name in keypoints_names]
keypoints_names_swapped = [keypoint_name_swapped.replace('right', 'left') if keypoint_name_swapped.startswith('right') else keypoint_name_swapped.replace('left', 'right') if keypoint_name_swapped.startswith('left') else keypoint_name_swapped for keypoint_name_swapped in keypoints_names_swapped]
keypoints_idx_swapped = [keypoints_names.index(keypoint_name_swapped) for keypoint_name_swapped in keypoints_names_swapped] # find index of new keypoint_name
tracked_keypoint_id = [node.id for _, _, node in RenderTree(model) if node.name==tracked_keypoint][0]
# 2d-pose files selection
pose_listdirs_names = next(os.walk(pose_dir))[1]
pose_listdirs_names = natural_sort(pose_listdirs_names)
json_dirs_names = [k for k in pose_listdirs_names if 'json' in k]
try:
json_files_names = [fnmatch.filter(os.listdir(os.path.join(poseTracked_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
json_files_names = [natural_sort(j) for j in json_files_names]
json_tracked_files = [[os.path.join(poseTracked_dir, j_dir, j_file) for j_file in json_files_names[j]] for j, j_dir in enumerate(json_dirs_names)]
except:
json_files_names = [fnmatch.filter(os.listdir(os.path.join(pose_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
json_files_names = [natural_sort(j) for j in json_files_names]
json_tracked_files = [[os.path.join(pose_dir, j_dir, j_file) for j_file in json_files_names[j]] for j, j_dir in enumerate(json_dirs_names)]
json_files_names = [fnmatch.filter(os.listdir(os.path.join(pose_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
json_files_names = [natural_sort(j) for j in json_files_names]
json_files = [[os.path.join(pose_dir, j_dir, j_file) for j_file in json_files_names[j]] for j, j_dir in enumerate(json_dirs_names)]
# Triangulation
f_range = [[0,min([len(j) for j in json_files_names])] if frame_range==[] else frame_range][0]
frames_nb = f_range[1]-f_range[0]
# 2d-pose-associated files creation
if not os.path.exists(poseTracked_dir): os.mkdir(poseTracked_dir)
try: [os.mkdir(os.path.join(poseTracked_dir,k)) for k in json_dirs_names]
except: pass
json_tracked_files = [[os.path.join(poseTracked_dir, j_dir, j_file) for j_file in json_files_names[j]] for j, j_dir in enumerate(json_dirs_names)]
# person's tracking
f_range = [[min([len(j) for j in json_files])] if frame_range==[] else frame_range][0]
n_cams = len(json_dirs_names)
Q_tot, error_tot, nb_cams_excluded_tot,id_excluded_cams_tot = [], [], [], []
error_min_tot, cameras_off_tot = [], []
for f in tqdm(range(*f_range)):
# Get x,y,likelihood values from files
json_files_f = [json_files[c][f] for c in range(n_cams)]
json_tracked_files_f = [json_tracked_files[c][f] for c in range(n_cams)]
x_files, y_files, likelihood_files = extract_files_frame_f(json_tracked_files_f, keypoints_ids)
# undistort points
if undistort_points:
points = [np.array(tuple(zip(x_files[i],y_files[i]))).reshape(-1, 1, 2).astype('float32') for i in range(n_cams)]
undistorted_points = [cv2.undistortPoints(points[i], calib_params['K'][i], calib_params['dist'][i], None, calib_params['optim_K'][i]) for i in range(n_cams)]
x_files = np.array([[u[i][0][0] for i in range(len(u))] for u in undistorted_points])
y_files = np.array([[u[i][0][1] for i in range(len(u))] for u in undistorted_points])
# This is good for slight distortion. For fishey camera, the model does not work anymore. See there for an example https://github.com/lambdaloop/aniposelib/blob/d03b485c4e178d7cff076e9fe1ac36837db49158/aniposelib/cameras.py#L301
# Replace likelihood by 0 if under likelihood_threshold
with np.errstate(invalid='ignore'):
likelihood_files[likelihood_files<likelihood_threshold] = 0.
Q, error, nb_cams_excluded, id_excluded_cams = [], [], [], []
for keypoint_idx in keypoints_idx:
# Triangulate cameras with min reprojection error
coords_2D_kpt = np.array( (x_files[:, keypoint_idx], y_files[:, keypoint_idx], likelihood_files[:, keypoint_idx]) )
coords_2D_kpt_swapped = np.array(( x_files[:, keypoints_idx_swapped[keypoint_idx]], y_files[:, keypoints_idx_swapped[keypoint_idx]], likelihood_files[:, keypoints_idx_swapped[keypoint_idx]] ))
# all possible combinations of persons
personsIDs_comb = persons_combinations(json_files_f)
# choose person of interest and exclude cameras with bad pose estimation
error_min, persons_and_cameras_combination = best_persons_and_cameras_combination(config, json_files_f, personsIDs_comb, P, tracked_keypoint_id, calib_params)
error_min_tot.append(error_min)
cameras_off_count = np.count_nonzero(np.isnan(persons_and_cameras_combination))
cameras_off_tot.append(cameras_off_count)
# rewrite json files with only one person of interest
for cam_nb, person_id in enumerate(persons_and_cameras_combination):
with open(json_tracked_files_f[cam_nb], 'w') as json_tracked_f:
with open(json_files_f[cam_nb], 'r') as json_f:
js = json.load(json_f)
if not np.isnan(person_id):
js['people'] = [js['people'][int(person_id)]]
else:
js['people'] = []
json_tracked_f.write(json.dumps(js))
Q_kpt, error_kpt, nb_cams_excluded_kpt, id_excluded_cams_kpt = triangulation_from_best_cameras(config, coords_2D_kpt, coords_2D_kpt_swapped, P, calib_params) # P has been modified if undistort_points=True
Q.append(Q_kpt)
error.append(error_kpt)
nb_cams_excluded.append(nb_cams_excluded_kpt)
id_excluded_cams.append(id_excluded_cams_kpt)
# Add triangulated points, errors and excluded cameras to pandas dataframes
Q_tot.append(np.concatenate(Q))
error_tot.append(error)
nb_cams_excluded_tot.append(nb_cams_excluded)
id_excluded_cams = [item for sublist in id_excluded_cams for item in sublist]
id_excluded_cams_tot.append(id_excluded_cams)
Q_tot = pd.DataFrame(Q_tot)
error_tot = pd.DataFrame(error_tot)
nb_cams_excluded_tot = pd.DataFrame(nb_cams_excluded_tot)
# recap message
recap_tracking(config, error_min_tot, cameras_off_tot)
id_excluded_cams_tot = [item for sublist in id_excluded_cams_tot for item in sublist]
cam_excluded_count = dict(Counter(id_excluded_cams_tot))
cam_excluded_count.update((x, y/keypoints_nb/frames_nb) for x, y in cam_excluded_count.items())
error_tot['mean'] = error_tot.mean(axis = 1)
nb_cams_excluded_tot['mean'] = nb_cams_excluded_tot.mean(axis = 1)
# Optionally, for each keypoint, show indices of frames that should be interpolated
if show_interp_indices:
zero_nan_frames = np.where( Q_tot.iloc[:,::3].T.eq(0) | ~np.isfinite(Q_tot.iloc[:,::3].T) )
zero_nan_frames_per_kpt = [zero_nan_frames[1][np.where(zero_nan_frames[0]==k)[0]] for k in range(keypoints_nb)]
gaps = [np.where(np.diff(zero_nan_frames_per_kpt[k]) > 1)[0] + 1 for k in range(keypoints_nb)]
sequences = [np.split(zero_nan_frames_per_kpt[k], gaps[k]) for k in range(keypoints_nb)]
interp_frames = [[f'{seq[0]}:{seq[-1]+1}' for seq in seq_kpt if len(seq)<=interp_gap_smaller_than and len(seq)>0] for seq_kpt in sequences]
non_interp_frames = [[f'{seq[0]}:{seq[-1]+1}' for seq in seq_kpt if len(seq)>interp_gap_smaller_than] for seq_kpt in sequences]
else:
interp_frames = None
non_interp_frames = []
# Interpolate missing values
if interpolation_kind != 'none':
Q_tot = Q_tot.apply(interpolate_zeros_nans, axis=0, args = [interp_gap_smaller_than, interpolation_kind])
Q_tot.replace(np.nan, 0, inplace=True)
# Create TRC file
trc_path = make_trc(config, Q_tot, keypoints_names, f_range)
# Recap message
recap_triangulate(config, error_tot, nb_cams_excluded_tot, keypoints_names, cam_excluded_count, interp_frames, non_interp_frames, trc_path)