beta: multi-person

This commit is contained in:
davidpagnon 2024-02-23 18:16:56 +01:00
parent b41bc933b7
commit 607865e3d4
3 changed files with 206 additions and 137 deletions

View File

@ -380,7 +380,7 @@ def triangulation(config=None):
triangulate_all(config_dict)
end = time.time()
logging.info(f'Triangulation took {end-start:.2f} s.')
logging.info(f'\nTriangulation took {end-start:.2f} s.')
def filtering(config=None):

View File

@ -91,12 +91,13 @@ def persons_combinations(json_files_framef):
def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_combinations, projection_matrices, tracked_keypoint_id, calib_params):
'''
At the same time, chooses the right person among the multiple ones found by
- if single_person: Choose the right person among the multiple ones found by
OpenPose & excludes cameras with wrong 2d-pose estimation.
- else: Choose all the combination of cameras that give a reprojection error below a threshold
1. triangulate the tracked keypoint for all possible combinations of people,
2. compute difference between reprojection & original openpose detection,
3. take combination with smallest difference.
3. take combination with smallest error OR all those below the error threshold
If error is too big, take off one or several of the cameras until err is
lower than "max_err_px".
@ -108,10 +109,11 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
- tracked_keypoint_id: int
OUTPUTS:
- error_min: float
- persons_and_cameras_combination: array of ints
- errors_below_thresh: list of float
- comb_errors_below_thresh: list of arrays of ints
'''
single_person = config.get('project').get('single_person')
error_threshold_tracking = config.get('personAssociation').get('reproj_error_threshold_association')
likelihood_threshold = config.get('personAssociation').get('likelihood_threshold_association')
min_cameras_for_triangulation = config.get('triangulation').get('min_cameras_for_triangulation')
@ -121,6 +123,9 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
error_min = np.inf
nb_cams_off = 0 # cameras will be taken-off until the reprojection error is under threshold
errors_below_thresh = []
comb_errors_below_thresh = []
Q_kpt = []
while error_min > error_threshold_tracking and n_cams - nb_cams_off >= min_cameras_for_triangulation:
# Try all persons combinations
for combination in personsIDs_combinations:
@ -156,6 +161,7 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
# Try all subsets
error_comb = []
Q_comb = []
for comb in combinations_with_cams_off:
# Filter x, y, likelihood, projection_matrices, with subset
x_files_filt = [x_files[i] for i in range(len(comb)) if not np.isnan(comb[i])]
@ -169,15 +175,15 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
calib_params_dist_filt = [calib_params['dist'][i] for i in range(len(comb)) if not np.isnan(comb[i])]
# Triangulate 2D points
Q_comb = weighted_triangulation(projection_matrices_filt, x_files_filt, y_files_filt, likelihood_files_filt)
Q_comb.append(weighted_triangulation(projection_matrices_filt, x_files_filt, y_files_filt, likelihood_files_filt))
# Reprojection
if undistort_points:
coords_2D_kpt_calc_filt = [cv2.projectPoints(np.array(Q_comb[:-1]), calib_params_R_filt[i], calib_params_T_filt[i], calib_params_K_filt[i], calib_params_dist_filt[i])[0] for i in range(n_cams-nb_cams_off)]
coords_2D_kpt_calc_filt = [cv2.projectPoints(np.array(Q_comb[-1][:-1]), calib_params_R_filt[i], calib_params_T_filt[i], calib_params_K_filt[i], calib_params_dist_filt[i])[0] for i in range(n_cams-nb_cams_off)]
x_calc = [coords_2D_kpt_calc_filt[i][0,0,0] for i in range(n_cams-nb_cams_off)]
y_calc = [coords_2D_kpt_calc_filt[i][0,0,1] for i in range(n_cams-nb_cams_off)]
else:
x_calc, y_calc = reprojection(projection_matrices_filt, Q_comb)
x_calc, y_calc = reprojection(projection_matrices_filt, Q_comb[-1])
# Reprojection error
error_comb_per_cam = []
@ -187,15 +193,34 @@ def best_persons_and_cameras_combination(config, json_files_framef, personsIDs_c
error_comb_per_cam.append( euclidean_distance(q_file, q_calc) )
error_comb.append( np.mean(error_comb_per_cam) )
error_min = np.nanmin(error_comb)
persons_and_cameras_combination = combinations_with_cams_off[np.argmin(error_comb)]
if error_min < error_threshold_tracking:
if single_person:
error_min = np.nanmin(error_comb)
errors_below_thresh = [error_min]
comb_errors_below_thresh = [combinations_with_cams_off[np.argmin(error_comb)]]
Q_kpt = [Q_comb[np.argmin(error_comb)]]
if errors_below_thresh[0] < error_threshold_tracking:
break
else:
errors_below_thresh += [e for e in error_comb if e<error_threshold_tracking]
comb_errors_below_thresh += [combinations_with_cams_off[error_comb.index(e)] for e in error_comb if e<error_threshold_tracking]
Q_kpt += [Q_comb[error_comb.index(e)] for e in error_comb if e<error_threshold_tracking]
print('\n', personsIDs_combinations)
print(errors_below_thresh)
print(comb_errors_below_thresh)
print(Q_kpt)
if not single_person:
# Remove indices already used for a person
personsIDs_combinations = np.array([personsIDs_combinations[i] for i in range(len(personsIDs_combinations))
if not np.array(
[personsIDs_combinations[i,j]==comb[j] for comb in comb_errors_below_thresh for j in range(len(comb))]
).any()])
if len(personsIDs_combinations) < len(errors_below_thresh):
break
nb_cams_off += 1
return error_min, persons_and_cameras_combination
return errors_below_thresh, comb_errors_below_thresh, Q_kpt
def recap_tracking(config, error, nb_cams_excluded):
@ -307,7 +332,7 @@ def track_2d_all(config):
f_range = [[min([len(j) for j in json_files])] if frame_range==[] else frame_range][0]
n_cams = len(json_dirs_names)
error_min_tot, cameras_off_tot = [], []
# Check that camera number is consistent between calibration file and pose folders
if n_cams != len(P):
raise Exception(f'Error: The number of cameras is not consistent:\
@ -315,32 +340,38 @@ def track_2d_all(config):
and {n_cams} cameras based on the number of pose folders.')
for f in tqdm(range(*f_range)):
print(f'\nFrame {f}:')
json_files_f = [json_files[c][f] for c in range(n_cams)]
json_tracked_files_f = [json_tracked_files[c][f] for c in range(n_cams)]
# all possible combinations of persons
personsIDs_comb = persons_combinations(json_files_f)
# choose person of interest and exclude cameras with bad pose estimation
error_min, persons_and_cameras_combination = best_persons_and_cameras_combination(config, json_files_f, personsIDs_comb, P, tracked_keypoint_id, calib_params)
error_min_tot.append(error_min)
cameras_off_count = np.count_nonzero(np.isnan(persons_and_cameras_combination))
# choose persons of interest and exclude cameras with bad pose estimation
errors_below_thresh, comb_errors_below_thresh, Q_kpt = best_persons_and_cameras_combination(config, json_files_f, personsIDs_comb, P, tracked_keypoint_id, calib_params)
# reID persons across frames by checking the distance from one frame to another
##### TO DO
error_min_tot.append(np.mean(errors_below_thresh))
cameras_off_count = np.count_nonzero([np.isnan(comb) for comb in comb_errors_below_thresh]) / len(comb_errors_below_thresh)
print(cameras_off_count)
cameras_off_tot.append(cameras_off_count)
# rewrite json files with only one person of interest
for cam_nb, person_id in enumerate(persons_and_cameras_combination):
with open(json_tracked_files_f[cam_nb], 'w') as json_tracked_f:
with open(json_files_f[cam_nb], 'r') as json_f:
# rewrite json files with a single or multiple persons of interest
for cam in range(n_cams):
with open(json_tracked_files_f[cam], 'w') as json_tracked_f:
with open(json_files_f[cam], 'r') as json_f:
js = json.load(json_f)
if not np.isnan(person_id):
js['people'] = [js['people'][int(person_id)]]
else:
js['people'] = []
json_tracked_f.write(json.dumps(js))
js_new = js.copy()
js_new['people'] = []
for new_comb in comb_errors_below_thresh:
if not np.isnan(new_comb[cam]):
js_new['people'] += [js['people'][int(new_comb[cam])]]
else:
js_new['people'] += [{}]
json_tracked_f.write(json.dumps(js_new))
# recap message
recap_tracking(config, error_min_tot, cameras_off_tot)

View File

@ -44,7 +44,7 @@ import cv2
import toml
from tqdm import tqdm
from scipy import interpolate
from collections import Counter
from collections import Counter, OrderedDict
from anytree import RenderTree
from anytree.importer import DictImporter
import logging
@ -109,7 +109,7 @@ def interpolate_zeros_nans(col, *args):
return col_interp
def make_trc(config, Q, keypoints_names, f_range):
def make_trc(config, Q, keypoints_names, f_range, id_person=-1):
'''
Make Opensim compatible trc file from a dataframe with 3D coordinates
@ -126,7 +126,10 @@ def make_trc(config, Q, keypoints_names, f_range):
# Read config
project_dir = config.get('project').get('project_dir')
frame_rate = config.get('project').get('frame_rate')
seq_name = os.path.basename(os.path.realpath(project_dir))
if id_person == -1:
seq_name = f'{os.path.basename(os.path.realpath(project_dir))}'
else:
seq_name = f'{os.path.basename(os.path.realpath(project_dir))}_Participant{id_person+1}'
pose3d_dir = os.path.join(project_dir, 'pose-3d')
trc_f = f'{seq_name}_{f_range[0]}-{f_range[1]}.trc'
@ -181,7 +184,7 @@ def recap_triangulate(config, error, nb_cams_excluded, keypoints_names, cam_excl
calib_file = glob.glob(os.path.join(calib_dir, '*.toml'))[0] # lastly created calibration file
calib = toml.load(calib_file)
cam_names = np.array([calib[c].get('name') for c in list(calib.keys())])
cam_names = cam_names[list(cam_excluded_count.keys())]
cam_names = cam_names[list(cam_excluded_count[0].keys())]
error_threshold_triangulation = config.get('triangulation').get('reproj_error_threshold_triangulation')
likelihood_threshold = config.get('triangulation').get('likelihood_threshold_triangulation')
show_interp_indices = config.get('triangulation').get('show_interp_indices')
@ -195,47 +198,53 @@ def recap_triangulate(config, error, nb_cams_excluded, keypoints_names, cam_excl
Dm = euclidean_distance(calib_cam1['translation'], [0,0,0])
logging.info('')
for idx, name in enumerate(keypoints_names):
mean_error_keypoint_px = np.around(error.iloc[:,idx].mean(), decimals=1) # RMS à la place?
mean_error_keypoint_m = np.around(mean_error_keypoint_px * Dm / fm, decimals=3)
mean_cam_excluded_keypoint = np.around(nb_cams_excluded.iloc[:,idx].mean(), decimals=2)
logging.info(f'Mean reprojection error for {name} is {mean_error_keypoint_px} px (~ {mean_error_keypoint_m} m), reached with {mean_cam_excluded_keypoint} excluded cameras. ')
if show_interp_indices:
if interpolation_kind != 'none':
if len(list(interp_frames[idx])) ==0:
logging.info(f' No frames needed to be interpolated.')
else:
interp_str = str(interp_frames[idx]).replace(":", " to ").replace("'", "").replace("]", "").replace("[", "")
logging.info(f' Frames {interp_str} were interpolated.')
if len(list(non_interp_frames[idx]))>0:
noninterp_str = str(non_interp_frames[idx]).replace(":", " to ").replace("'", "").replace("]", "").replace("[", "")
logging.info(f' Frames {non_interp_frames[idx]} could not be interpolated: consider adjusting thresholds.')
nb_persons_to_detect = len(error)
for n in range(nb_persons_to_detect):
if nb_persons_to_detect > 1:
print(f'\n\nPARTICIPANT {n+1}\n')
for idx, name in enumerate(keypoints_names):
mean_error_keypoint_px = np.around(error[n].iloc[:,idx].mean(), decimals=1) # RMS à la place?
mean_error_keypoint_m = np.around(mean_error_keypoint_px * Dm / fm, decimals=3)
mean_cam_excluded_keypoint = np.around(nb_cams_excluded[n].iloc[:,idx].mean(), decimals=2)
logging.info(f'Mean reprojection error for {name} is {mean_error_keypoint_px} px (~ {mean_error_keypoint_m} m), reached with {mean_cam_excluded_keypoint} excluded cameras. ')
if show_interp_indices:
if interpolation_kind != 'none':
if len(list(interp_frames[n][idx])) ==0:
logging.info(f' No frames needed to be interpolated.')
else:
interp_str = str(interp_frames[n][idx]).replace(":", " to ").replace("'", "").replace("]", "").replace("[", "")
logging.info(f' Frames {interp_str} were interpolated.')
if len(list(non_interp_frames[n][idx]))>0:
noninterp_str = str(non_interp_frames[n][idx]).replace(":", " to ").replace("'", "").replace("]", "").replace("[", "")
logging.info(f' Frames {non_interp_frames[n][idx]} could not be interpolated: consider adjusting thresholds.')
else:
logging.info(f' No frames were interpolated because \'interpolation_kind\' was set to none. ')
mean_error_px = np.around(error[n]['mean'].mean(), decimals=1)
mean_error_mm = np.around(mean_error_px * Dm / fm *1000, decimals=1)
mean_cam_excluded = np.around(nb_cams_excluded[n]['mean'].mean(), decimals=2)
logging.info(f'\n--> Mean reprojection error for all points on all frames is {mean_error_px} px, which roughly corresponds to {mean_error_mm} mm. ')
logging.info(f'Cameras were excluded if likelihood was below {likelihood_threshold} and if the reprojection error was above {error_threshold_triangulation} px.')
logging.info(f'In average, {mean_cam_excluded} cameras had to be excluded to reach these thresholds.')
cam_excluded_count[n] = {i: v for i, v in zip(cam_names, cam_excluded_count[n].values())}
cam_excluded_count[n] = {i: cam_excluded_count[n][i] for i in sorted(cam_excluded_count[n].keys())}
str_cam_excluded_count = ''
for i, (k, v) in enumerate(cam_excluded_count[n].items()):
if i ==0:
str_cam_excluded_count += f'Camera {k} was excluded {int(np.round(v*100))}% of the time, '
elif i == len(cam_excluded_count[n])-1:
str_cam_excluded_count += f'and Camera {k}: {int(np.round(v*100))}%.'
else:
logging.info(f' No frames were interpolated because \'interpolation_kind\' was set to none. ')
str_cam_excluded_count += f'Camera {k}: {int(np.round(v*100))}%, '
logging.info(str_cam_excluded_count)
logging.info(f'\n3D coordinates are stored at {trc_path[n]}.')
mean_error_px = np.around(error['mean'].mean(), decimals=1)
mean_error_mm = np.around(mean_error_px * Dm / fm *1000, decimals=1)
mean_cam_excluded = np.around(nb_cams_excluded['mean'].mean(), decimals=2)
logging.info(f'\n--> Mean reprojection error for all points on all frames is {mean_error_px} px, which roughly corresponds to {mean_error_mm} mm. ')
logging.info(f'Cameras were excluded if likelihood was below {likelihood_threshold} and if the reprojection error was above {error_threshold_triangulation} px.')
logging.info(f'In average, {mean_cam_excluded} cameras had to be excluded to reach these thresholds.')
cam_excluded_count = {i: v for i, v in zip(cam_names, cam_excluded_count.values())}
str_cam_excluded_count = ''
for i, (k, v) in enumerate(cam_excluded_count.items()):
if i ==0:
str_cam_excluded_count += f'Camera {k} was excluded {int(np.round(v*100))}% of the time, '
elif i == len(cam_excluded_count)-1:
str_cam_excluded_count += f'and Camera {k}: {int(np.round(v*100))}%.'
else:
str_cam_excluded_count += f'Camera {k}: {int(np.round(v*100))}%, '
logging.info(str_cam_excluded_count)
logging.info(f'Limb swapping was {"handled" if handle_LR_swap else "not handled"}.')
logging.info(f'\n\nLimb swapping was {"handled" if handle_LR_swap else "not handled"}.')
logging.info(f'Lens distortions were {"taken into account" if undistort_points else "not taken into account"}.')
logging.info(f'\n3D coordinates are stored at {trc_path}.')
def triangulation_from_best_cameras(config, coords_2D_kpt, coords_2D_kpt_swapped, projection_matrices, calib_params):
'''
@ -481,7 +490,7 @@ def triangulation_from_best_cameras(config, coords_2D_kpt, coords_2D_kpt_swapped
return Q, error_min, nb_cams_excluded, id_excluded_cams
def extract_files_frame_f(json_tracked_files_f, keypoints_ids):
def extract_files_frame_f(json_tracked_files_f, keypoints_ids, nb_persons_to_detect):
'''
Extract data from json files for frame f,
in the order of the body model hierarchy.
@ -489,32 +498,34 @@ def extract_files_frame_f(json_tracked_files_f, keypoints_ids):
INPUTS:
- json_tracked_files_f: list of str. Paths of json_files for frame f.
- keypoints_ids: list of int. Keypoints IDs in the order of the hierarchy.
- nb_persons_to_detect: int
OUTPUTS:
- x_files, y_files, likelihood_files: array:
n_cams lists of n_keypoints lists of coordinates.
- x_files, y_files, likelihood_files: [[[list of coordinates] * n_cams ] * nb_persons_to_detect]
'''
n_cams = len(json_tracked_files_f)
x_files, y_files, likelihood_files = [], [], []
for cam_nb in range(n_cams):
x_files_cam, y_files_cam, likelihood_files_cam = [], [], []
with open(json_tracked_files_f[cam_nb], 'r') as json_f:
js = json.load(json_f)
for keypoint_id in keypoints_ids:
try:
x_files_cam.append( js['people'][0]['pose_keypoints_2d'][keypoint_id*3] )
y_files_cam.append( js['people'][0]['pose_keypoints_2d'][keypoint_id*3+1] )
likelihood_files_cam.append( js['people'][0]['pose_keypoints_2d'][keypoint_id*3+2] )
except:
x_files_cam.append( np.nan )
y_files_cam.append( np.nan )
likelihood_files_cam.append( np.nan )
x_files.append(x_files_cam)
y_files.append(y_files_cam)
likelihood_files.append(likelihood_files_cam)
x_files = [[] for n in range(nb_persons_to_detect)]
y_files = [[] for n in range(nb_persons_to_detect)]
likelihood_files = [[] for n in range(nb_persons_to_detect)]
for n in range(nb_persons_to_detect):
for cam_nb in range(n_cams):
x_files_cam, y_files_cam, likelihood_files_cam = [], [], []
with open(json_tracked_files_f[cam_nb], 'r') as json_f:
js = json.load(json_f)
for keypoint_id in keypoints_ids:
try:
x_files_cam.append( js['people'][n]['pose_keypoints_2d'][keypoint_id*3] )
y_files_cam.append( js['people'][n]['pose_keypoints_2d'][keypoint_id*3+1] )
likelihood_files_cam.append( js['people'][n]['pose_keypoints_2d'][keypoint_id*3+2] )
except:
x_files_cam.append( np.nan )
y_files_cam.append( np.nan )
likelihood_files_cam.append( np.nan )
x_files[n].append(x_files_cam)
y_files[n].append(y_files_cam)
likelihood_files[n].append(likelihood_files_cam)
x_files = np.array(x_files)
y_files = np.array(y_files)
@ -599,10 +610,12 @@ def triangulate_all(config):
json_files_names = [natural_sort(j) for j in json_files_names]
json_tracked_files = [[os.path.join(pose_dir, j_dir, j_file) for j_file in json_files_names[j]] for j, j_dir in enumerate(json_dirs_names)]
# Triangulation
# Prep triangulation
f_range = [[0,min([len(j) for j in json_files_names])] if frame_range==[] else frame_range][0]
frames_nb = f_range[1]-f_range[0]
nb_persons_to_detect = max([len(json.load(open(json_fname))['people']) for json_fname in json_tracked_files[0]])
n_cams = len(json_dirs_names)
# Check that camera number is consistent between calibration file and pose folders
@ -610,79 +623,104 @@ def triangulate_all(config):
raise Exception(f'Error: The number of cameras is not consistent:\
Found {len(P)} cameras in the calibration file,\
and {n_cams} cameras based on the number of pose folders.')
# Triangulation
Q_tot, error_tot, nb_cams_excluded_tot,id_excluded_cams_tot = [], [], [], []
for f in tqdm(range(*f_range)):
# Get x,y,likelihood values from files
json_tracked_files_f = [json_tracked_files[c][f] for c in range(n_cams)]
# print(json_tracked_files_f)
x_files, y_files, likelihood_files = extract_files_frame_f(json_tracked_files_f, keypoints_ids)
x_files, y_files, likelihood_files = extract_files_frame_f(json_tracked_files_f, keypoints_ids, nb_persons_to_detect)
# [[[list of coordinates] * n_cams ] * nb_persons_to_detect]
# vs. [[list of coordinates] * n_cams ]
# undistort points
if undistort_points:
points = [np.array(tuple(zip(x_files[i],y_files[i]))).reshape(-1, 1, 2).astype('float32') for i in range(n_cams)]
undistorted_points = [cv2.undistortPoints(points[i], calib_params['K'][i], calib_params['dist'][i], None, calib_params['optim_K'][i]) for i in range(n_cams)]
x_files = np.array([[u[i][0][0] for i in range(len(u))] for u in undistorted_points])
y_files = np.array([[u[i][0][1] for i in range(len(u))] for u in undistorted_points])
# This is good for slight distortion. For fishey camera, the model does not work anymore. See there for an example https://github.com/lambdaloop/aniposelib/blob/d03b485c4e178d7cff076e9fe1ac36837db49158/aniposelib/cameras.py#L301
for n in range(nb_persons_to_detect):
points = [np.array(tuple(zip(x_files[n][i],y_files[n][i]))).reshape(-1, 1, 2).astype('float32') for i in range(n_cams)]
undistorted_points = [cv2.undistortPoints(points[i], calib_params['K'][i], calib_params['dist'][i], None, calib_params['optim_K'][i]) for i in range(n_cams)]
x_files[n] = np.array([[u[i][0][0] for i in range(len(u))] for u in undistorted_points])
y_files[n] = np.array([[u[i][0][1] for i in range(len(u))] for u in undistorted_points])
# This is good for slight distortion. For fisheye camera, the model does not work anymore. See there for an example https://github.com/lambdaloop/aniposelib/blob/d03b485c4e178d7cff076e9fe1ac36837db49158/aniposelib/cameras.py#L301
# Replace likelihood by 0 if under likelihood_threshold
with np.errstate(invalid='ignore'):
x_files[likelihood_files<likelihood_threshold] = np.nan
y_files[likelihood_files<likelihood_threshold] = np.nan
likelihood_files[likelihood_files<likelihood_threshold] = np.nan
for n in range(nb_persons_to_detect):
x_files[n][likelihood_files[n] < likelihood_threshold] = np.nan
y_files[n][likelihood_files[n] < likelihood_threshold] = np.nan
likelihood_files[n][likelihood_files[n] < likelihood_threshold] = np.nan
Q, error, nb_cams_excluded, id_excluded_cams = [], [], [], []
for keypoint_idx in keypoints_idx:
# Triangulate cameras with min reprojection error
# print('\n', keypoints_names[keypoint_idx])
coords_2D_kpt = np.array( (x_files[:, keypoint_idx], y_files[:, keypoint_idx], likelihood_files[:, keypoint_idx]) )
coords_2D_kpt_swapped = np.array(( x_files[:, keypoints_idx_swapped[keypoint_idx]], y_files[:, keypoints_idx_swapped[keypoint_idx]], likelihood_files[:, keypoints_idx_swapped[keypoint_idx]] ))
Q = [[] for n in range(nb_persons_to_detect)]
error = [[] for n in range(nb_persons_to_detect)]
nb_cams_excluded = [[] for n in range(nb_persons_to_detect)]
id_excluded_cams = [[] for n in range(nb_persons_to_detect)]
for n in range(nb_persons_to_detect):
for keypoint_idx in keypoints_idx:
# Triangulate cameras with min reprojection error
# print('\n', keypoints_names[keypoint_idx])
coords_2D_kpt = np.array( (x_files[n][:, keypoint_idx], y_files[n][:, keypoint_idx], likelihood_files[n][:, keypoint_idx]) )
coords_2D_kpt_swapped = np.array(( x_files[n][:, keypoints_idx_swapped[keypoint_idx]], y_files[n][:, keypoints_idx_swapped[keypoint_idx]], likelihood_files[n][:, keypoints_idx_swapped[keypoint_idx]] ))
Q_kpt, error_kpt, nb_cams_excluded_kpt, id_excluded_cams_kpt = triangulation_from_best_cameras(config, coords_2D_kpt, coords_2D_kpt_swapped, P, calib_params) # P has been modified if undistort_points=True
Q_kpt, error_kpt, nb_cams_excluded_kpt, id_excluded_cams_kpt = triangulation_from_best_cameras(config, coords_2D_kpt, coords_2D_kpt_swapped, P, calib_params) # P has been modified if undistort_points=True
Q.append(Q_kpt)
error.append(error_kpt)
nb_cams_excluded.append(nb_cams_excluded_kpt)
id_excluded_cams.append(id_excluded_cams_kpt)
Q[n].append(Q_kpt)
error[n].append(error_kpt)
nb_cams_excluded[n].append(nb_cams_excluded_kpt)
id_excluded_cams[n].append(id_excluded_cams_kpt)
# Add triangulated points, errors and excluded cameras to pandas dataframes
Q_tot.append(np.concatenate(Q))
error_tot.append(error)
nb_cams_excluded_tot.append(nb_cams_excluded)
id_excluded_cams = [item for sublist in id_excluded_cams for item in sublist]
Q_tot.append([np.concatenate(Q[n]) for n in range(nb_persons_to_detect)])
error_tot.append([error[n] for n in range(nb_persons_to_detect)])
nb_cams_excluded_tot.append([nb_cams_excluded[n] for n in range(nb_persons_to_detect)])
id_excluded_cams = [[id_excluded_cams[n][k] for k in range(keypoints_nb)] for n in range(nb_persons_to_detect)]
id_excluded_cams_tot.append(id_excluded_cams)
Q_tot = pd.DataFrame(Q_tot)
error_tot = pd.DataFrame(error_tot)
nb_cams_excluded_tot = pd.DataFrame(nb_cams_excluded_tot)
Q_tot = [pd.DataFrame([Q_tot[f][n] for f in range(*f_range)]) for n in range(nb_persons_to_detect)]
error_tot = [pd.DataFrame([error_tot[f][n] for f in range(*f_range)]) for n in range(nb_persons_to_detect)]
nb_cams_excluded_tot = [pd.DataFrame([nb_cams_excluded_tot[f][n] for f in range(*f_range)]) for n in range(nb_persons_to_detect)]
id_excluded_cams_tot = [pd.DataFrame([id_excluded_cams_tot[f][n] for f in range(*f_range)]) for n in range(nb_persons_to_detect)]
id_excluded_cams_tot = [item for sublist in id_excluded_cams_tot for item in sublist]
cam_excluded_count = dict(Counter(id_excluded_cams_tot))
cam_excluded_count.update((x, y/keypoints_nb/frames_nb) for x, y in cam_excluded_count.items())
for n in range(nb_persons_to_detect):
error_tot[n]['mean'] = error_tot[n].mean(axis = 1)
nb_cams_excluded_tot[n]['mean'] = nb_cams_excluded_tot[n].mean(axis = 1)
error_tot['mean'] = error_tot.mean(axis = 1)
nb_cams_excluded_tot['mean'] = nb_cams_excluded_tot.mean(axis = 1)
# Delete participants with less than 4 valid triangulated frames
# for each person, for each keypoint, frames to interpolate
zero_nan_frames = [np.where( Q_tot[n].iloc[:,::3].T.eq(0) | ~np.isfinite(Q_tot[n].iloc[:,::3].T) ) for n in range(nb_persons_to_detect)]
zero_nan_frames_per_kpt = [[zero_nan_frames[n][1][np.where(zero_nan_frames[n][0]==k)[0]] for k in range(keypoints_nb)] for n in range(nb_persons_to_detect)]
non_nan_nb_first_kpt = [frames_nb - len(zero_nan_frames_per_kpt[n][0]) for n in range(nb_persons_to_detect)]
deleted_person_id = [n for n in range(len(non_nan_nb_first_kpt)) if non_nan_nb_first_kpt[n]<4]
# Optionally, for each keypoint, show indices of frames that should be interpolated
Q_tot = [Q_tot[n] for n in range(len(Q_tot)) if n not in deleted_person_id]
error_tot = [error_tot[n] for n in range(len(error_tot)) if n not in deleted_person_id]
nb_cams_excluded_tot = [nb_cams_excluded_tot[n] for n in range(len(nb_cams_excluded_tot)) if n not in deleted_person_id]
id_excluded_cams_tot = [id_excluded_cams_tot[n] for n in range(len(id_excluded_cams_tot)) if n not in deleted_person_id]
nb_persons_to_detect = len(Q_tot)
# IDs of excluded cameras
# id_excluded_cams_tot = [np.concatenate([id_excluded_cams_tot[f][k] for f in range(frames_nb)]) for k in range(keypoints_nb)]
id_excluded_cams_tot = [np.hstack(np.hstack(np.array(id_excluded_cams_tot[n]))) for n in range(nb_persons_to_detect)]
cam_excluded_count = [dict(Counter(k)) for k in id_excluded_cams_tot]
[cam_excluded_count[n].update((x, y/frames_nb/keypoints_nb) for x, y in cam_excluded_count[n].items()) for n in range(nb_persons_to_detect)]
# Optionally, for each person, for each keypoint, show indices of frames that should be interpolated
if show_interp_indices:
zero_nan_frames = np.where( Q_tot.iloc[:,::3].T.eq(0) | ~np.isfinite(Q_tot.iloc[:,::3].T) )
zero_nan_frames_per_kpt = [zero_nan_frames[1][np.where(zero_nan_frames[0]==k)[0]] for k in range(keypoints_nb)]
gaps = [np.where(np.diff(zero_nan_frames_per_kpt[k]) > 1)[0] + 1 for k in range(keypoints_nb)]
sequences = [np.split(zero_nan_frames_per_kpt[k], gaps[k]) for k in range(keypoints_nb)]
interp_frames = [[f'{seq[0]}:{seq[-1]+1}' for seq in seq_kpt if len(seq)<=interp_gap_smaller_than and len(seq)>0] for seq_kpt in sequences]
non_interp_frames = [[f'{seq[0]}:{seq[-1]+1}' for seq in seq_kpt if len(seq)>interp_gap_smaller_than] for seq_kpt in sequences]
gaps = [[np.where(np.diff(zero_nan_frames_per_kpt[n][k]) > 1)[0] + 1 for k in range(keypoints_nb)] for n in range(nb_persons_to_detect)]
sequences = [[np.split(zero_nan_frames_per_kpt[n][k], gaps[n][k]) for k in range(keypoints_nb)] for n in range(nb_persons_to_detect)]
interp_frames = [[[f'{seq[0]}:{seq[-1]}' for seq in seq_kpt if len(seq)<=interp_gap_smaller_than and len(seq)>0] for seq_kpt in sequences[n]] for n in range(nb_persons_to_detect)]
non_interp_frames = [[[f'{seq[0]}:{seq[-1]}' for seq in seq_kpt if len(seq)>interp_gap_smaller_than] for seq_kpt in sequences[n]] for n in range(nb_persons_to_detect)]
else:
interp_frames = None
non_interp_frames = []
# Interpolate missing values
if interpolation_kind != 'none':
Q_tot = Q_tot.apply(interpolate_zeros_nans, axis=0, args = [interp_gap_smaller_than, interpolation_kind])
for n in range(nb_persons_to_detect):
Q_tot[n].apply(interpolate_zeros_nans, axis=0, args = [interp_gap_smaller_than, interpolation_kind])
# Q_tot.replace(np.nan, 0, inplace=True)
# Create TRC file
trc_path = make_trc(config, Q_tot, keypoints_names, f_range)
trc_paths = [make_trc(config, Q_tot[n], keypoints_names, f_range, id_person=n) for n in range(len(Q_tot))]
# Recap message
recap_triangulate(config, error_tot, nb_cams_excluded_tot, keypoints_names, cam_excluded_count, interp_frames, non_interp_frames, trc_path)
recap_triangulate(config, error_tot, nb_cams_excluded_tot, keypoints_names, cam_excluded_count, interp_frames, non_interp_frames, trc_paths)