2023-07-19 17:37:20 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
###########################################################################
2024-01-05 00:20:06 +08:00
## ROBUST TRIANGULATION OF 2D COORDINATES ##
2023-07-19 17:37:20 +08:00
###########################################################################
2024-01-05 00:20:06 +08:00
This module triangulates 2 D json coordinates and builds a . trc file readable
by OpenSim .
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
The triangulation is weighted by the likelihood of each detected 2 D keypoint
( if they meet the likelihood threshold ) . If the reprojection error is above a
threshold , right and left sides are swapped ; if it is still above , a camera
is removed for this point and this frame , until the threshold is met . If more
cameras are removed than a predefined minimum , triangulation is skipped for
the point and this frame . In the end , missing values are interpolated .
In case of multiple subjects detection , make sure you first run the
personAssociation module .
2023-07-19 17:37:20 +08:00
INPUTS :
- a calibration file ( . toml extension )
2024-01-05 00:20:06 +08:00
- json files for each camera with only one person of interest
2023-07-19 17:37:20 +08:00
- a Config . toml file
- a skeleton model
OUTPUTS :
2024-01-05 00:20:06 +08:00
- a . trc file with 3 D coordinates in Y - up system coordinates
2023-07-19 17:37:20 +08:00
'''
## INIT
import os
import glob
import fnmatch
import numpy as np
import json
import itertools as it
2024-01-05 00:20:06 +08:00
import pandas as pd
import cv2
2023-07-19 17:37:20 +08:00
import toml
from tqdm import tqdm
2024-01-05 00:20:06 +08:00
from scipy import interpolate
from collections import Counter
2023-11-01 22:56:37 +08:00
from anytree import RenderTree
from anytree . importer import DictImporter
2023-07-19 17:37:20 +08:00
import logging
2024-01-03 03:15:43 +08:00
from Pose2Sim . common import retrieve_calib_params , computeP , weighted_triangulation , \
reprojection , euclidean_distance , natural_sort
2023-07-19 17:37:20 +08:00
from Pose2Sim . skeletons import *
## AUTHORSHIP INFORMATION
__author__ = " David Pagnon "
__copyright__ = " Copyright 2021, Pose2Sim "
__credits__ = [ " David Pagnon " ]
__license__ = " BSD 3-Clause License "
__version__ = ' 0.4 '
__maintainer__ = " David Pagnon "
__email__ = " contact@david-pagnon.com "
__status__ = " Development "
## FUNCTIONS
2024-01-05 00:20:06 +08:00
def zup2yup ( Q ) :
2023-07-19 17:37:20 +08:00
'''
2024-01-05 00:20:06 +08:00
Turns Z - up system coordinates into Y - up coordinates
2023-07-19 17:37:20 +08:00
INPUT :
2024-01-05 00:20:06 +08:00
- Q : pandas dataframe
N 3 D points as columns , ie 3 * N columns in Z - up system coordinates
and frame number as rows
2023-07-19 17:37:20 +08:00
OUTPUT :
2024-01-05 00:20:06 +08:00
- Q : pandas dataframe with N 3 D points in Y - up system coordinates
2023-07-19 17:37:20 +08:00
'''
2024-01-05 00:20:06 +08:00
# X->Y, Y->Z, Z->X
cols = list ( Q . columns )
cols = np . array ( [ [ cols [ i * 3 + 1 ] , cols [ i * 3 + 2 ] , cols [ i * 3 ] ] for i in range ( int ( len ( cols ) / 3 ) ) ] ) . flatten ( )
Q = Q [ cols ]
return Q
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
def interpolate_zeros_nans ( col , * args ) :
2023-07-19 17:37:20 +08:00
'''
2024-01-05 00:20:06 +08:00
Interpolate missing points ( of value zero ) ,
unless more than N contiguous values are missing .
2023-07-19 17:37:20 +08:00
INPUTS :
2024-01-05 00:20:06 +08:00
- col : pandas column of coordinates
- args [ 0 ] = N : max number of contiguous bad values , above which they won ' t be interpolated
- args [ 1 ] = kind : ' linear ' , ' slinear ' , ' quadratic ' , ' cubic ' . Default : ' cubic '
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
OUTPUT :
- col_interp : interpolated pandas column
2023-07-19 17:37:20 +08:00
'''
2024-01-05 00:20:06 +08:00
if len ( args ) == 2 :
N , kind = args
if len ( args ) == 1 :
N = np . inf
kind = args [ 0 ]
if not args :
N = np . inf
# Interpolate nans
mask = ~ ( np . isnan ( col ) | col . eq ( 0 ) ) # true where nans or zeros
idx_good = np . where ( mask ) [ 0 ]
if ' kind ' not in locals ( ) : # 'linear', 'slinear', 'quadratic', 'cubic'
f_interp = interpolate . interp1d ( idx_good , col [ idx_good ] , kind = " linear " , bounds_error = False )
else :
f_interp = interpolate . interp1d ( idx_good , col [ idx_good ] , kind = kind , fill_value = ' extrapolate ' , bounds_error = False )
col_interp = np . where ( mask , col , f_interp ( col . index ) ) #replace at false index with interpolated values
# Reintroduce nans if lenght of sequence > N
idx_notgood = np . where ( ~ mask ) [ 0 ]
gaps = np . where ( np . diff ( idx_notgood ) > 1 ) [ 0 ] + 1 # where the indices of true are not contiguous
sequences = np . split ( idx_notgood , gaps )
if sequences [ 0 ] . size > 0 :
for seq in sequences :
if len ( seq ) > N : # values to exclude from interpolation are set to false when they are too long
col_interp [ seq ] = np . nan
return col_interp
2023-12-29 06:08:50 +08:00
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
def make_trc ( config , Q , keypoints_names , f_range ) :
'''
Make Opensim compatible trc file from a dataframe with 3 D coordinates
INPUT :
- config : dictionary of configuration parameters
- Q : pandas dataframe with 3 D coordinates as columns , frame number as rows
- keypoints_names : list of strings
- f_range : list of two numbers . Range of frames
OUTPUT :
- trc file
'''
# Read config
project_dir = config . get ( ' project ' ) . get ( ' project_dir ' )
frame_rate = config . get ( ' project ' ) . get ( ' frame_rate ' )
seq_name = os . path . basename ( os . path . realpath ( project_dir ) )
pose3d_dir = os . path . join ( project_dir , ' pose-3d ' )
trc_f = f ' { seq_name } _ { f_range [ 0 ] } - { f_range [ 1 ] } .trc '
#Header
DataRate = CameraRate = OrigDataRate = frame_rate
NumFrames = len ( Q )
NumMarkers = len ( keypoints_names )
header_trc = [ ' PathFileType \t 4 \t (X/Y/Z) \t ' + trc_f ,
' DataRate \t CameraRate \t NumFrames \t NumMarkers \t Units \t OrigDataRate \t OrigDataStartFrame \t OrigNumFrames ' ,
' \t ' . join ( map ( str , [ DataRate , CameraRate , NumFrames , NumMarkers , ' m ' , OrigDataRate , f_range [ 0 ] , f_range [ 1 ] ] ) ) ,
' Frame# \t Time \t ' + ' \t \t \t ' . join ( keypoints_names ) + ' \t \t ' ,
' \t \t ' + ' \t ' . join ( [ f ' X { i + 1 } \t Y { i + 1 } \t Z { i + 1 } ' for i in range ( len ( keypoints_names ) ) ] ) ]
# Zup to Yup coordinate system
Q = zup2yup ( Q )
#Add Frame# and Time columns
Q . index = np . array ( range ( 0 , f_range [ 1 ] - f_range [ 0 ] ) ) + 1
Q . insert ( 0 , ' t ' , Q . index / frame_rate )
#Write file
if not os . path . exists ( pose3d_dir ) : os . mkdir ( pose3d_dir )
trc_path = os . path . realpath ( os . path . join ( pose3d_dir , trc_f ) )
with open ( trc_path , ' w ' ) as trc_o :
[ trc_o . write ( line + ' \n ' ) for line in header_trc ]
Q . to_csv ( trc_o , sep = ' \t ' , index = True , header = None , lineterminator = ' \n ' )
return trc_path
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
def recap_triangulate ( config , error , nb_cams_excluded , keypoints_names , cam_excluded_count , interp_frames , non_interp_frames , trc_path ) :
2023-07-19 17:37:20 +08:00
'''
2024-01-04 21:19:08 +08:00
Print a message giving statistics on reprojection errors ( in pixel and in m )
2024-01-05 00:20:06 +08:00
as well as the number of cameras that had to be excluded to reach threshold
2024-01-04 21:19:08 +08:00
conditions . Also stored in User / logs . txt .
2023-07-19 17:37:20 +08:00
2024-01-04 21:19:08 +08:00
INPUT :
- a Config . toml file
- error : dataframe
- nb_cams_excluded : dataframe
2024-01-05 00:20:06 +08:00
- keypoints_names : list of strings
2023-07-19 17:37:20 +08:00
2024-01-04 21:19:08 +08:00
OUTPUT :
- Message in console
'''
2024-01-05 00:20:06 +08:00
2024-01-04 21:19:08 +08:00
# Read config
project_dir = config . get ( ' project ' ) . get ( ' project_dir ' )
session_dir = os . path . realpath ( os . path . join ( project_dir , ' .. ' , ' .. ' ) )
calib_dir = [ os . path . join ( session_dir , c ) for c in os . listdir ( session_dir ) if ( ' Calib ' or ' calib ' ) in c ] [ 0 ]
calib_file = glob . glob ( os . path . join ( calib_dir , ' *.toml ' ) ) [ 0 ] # lastly created calibration file
calib = toml . load ( calib_file )
2024-01-05 00:20:06 +08:00
cam_names = np . array ( [ calib [ c ] . get ( ' name ' ) for c in list ( calib . keys ( ) ) ] )
cam_names = cam_names [ list ( cam_excluded_count . keys ( ) ) ]
error_threshold_triangulation = config . get ( ' triangulation ' ) . get ( ' reproj_error_threshold_triangulation ' )
2024-01-05 17:33:37 +08:00
likelihood_threshold = config . get ( ' triangulation ' ) . get ( ' likelihood_threshold_triangulation ' )
2024-01-05 00:20:06 +08:00
show_interp_indices = config . get ( ' triangulation ' ) . get ( ' show_interp_indices ' )
interpolation_kind = config . get ( ' triangulation ' ) . get ( ' interpolation ' )
handle_LR_swap = config . get ( ' triangulation ' ) . get ( ' handle_LR_swap ' )
2024-01-05 11:22:51 +08:00
undistort_points = config . get ( ' triangulation ' ) . get ( ' undistort_points ' )
2024-01-05 00:20:06 +08:00
# Recap
2024-01-04 21:19:08 +08:00
calib_cam1 = calib [ list ( calib . keys ( ) ) [ 0 ] ]
fm = calib_cam1 [ ' matrix ' ] [ 0 ] [ 0 ]
Dm = euclidean_distance ( calib_cam1 [ ' translation ' ] , [ 0 , 0 , 0 ] )
2024-01-05 00:20:06 +08:00
logging . info ( ' ' )
for idx , name in enumerate ( keypoints_names ) :
mean_error_keypoint_px = np . around ( error . iloc [ : , idx ] . mean ( ) , decimals = 1 ) # RMS à la place?
mean_error_keypoint_m = np . around ( mean_error_keypoint_px * Dm / fm , decimals = 3 )
mean_cam_excluded_keypoint = np . around ( nb_cams_excluded . iloc [ : , idx ] . mean ( ) , decimals = 2 )
logging . info ( f ' Mean reprojection error for { name } is { mean_error_keypoint_px } px (~ { mean_error_keypoint_m } m), reached with { mean_cam_excluded_keypoint } excluded cameras. ' )
if show_interp_indices :
if interpolation_kind != ' none ' :
if len ( list ( interp_frames [ idx ] ) ) == 0 :
logging . info ( f ' No frames needed to be interpolated. ' )
else :
interp_str = str ( interp_frames [ idx ] ) . replace ( " : " , " to " ) . replace ( " ' " , " " ) . replace ( " ] " , " " ) . replace ( " [ " , " " )
logging . info ( f ' Frames { interp_str } were interpolated. ' )
if len ( list ( non_interp_frames [ idx ] ) ) > 0 :
noninterp_str = str ( non_interp_frames [ idx ] ) . replace ( " : " , " to " ) . replace ( " ' " , " " ) . replace ( " ] " , " " ) . replace ( " [ " , " " )
logging . info ( f ' Frames { non_interp_frames [ idx ] } could not be interpolated: consider adjusting thresholds. ' )
else :
logging . info ( f ' No frames were interpolated because \' interpolation_kind \' was set to none. ' )
mean_error_px = np . around ( error [ ' mean ' ] . mean ( ) , decimals = 1 )
mean_error_mm = np . around ( mean_error_px * Dm / fm * 1000 , decimals = 1 )
mean_cam_excluded = np . around ( nb_cams_excluded [ ' mean ' ] . mean ( ) , decimals = 2 )
logging . info ( f ' \n --> Mean reprojection error for all points on all frames is { mean_error_px } px, which roughly corresponds to { mean_error_mm } mm. ' )
2024-01-05 11:22:51 +08:00
logging . info ( f ' Cameras were excluded if likelihood was below { likelihood_threshold } and if the reprojection error was above { error_threshold_triangulation } px. ' )
2024-01-05 00:20:06 +08:00
logging . info ( f ' In average, { mean_cam_excluded } cameras had to be excluded to reach these thresholds. ' )
cam_excluded_count = { i : v for i , v in zip ( cam_names , cam_excluded_count . values ( ) ) }
str_cam_excluded_count = ' '
for i , ( k , v ) in enumerate ( cam_excluded_count . items ( ) ) :
if i == 0 :
str_cam_excluded_count + = f ' Camera { k } was excluded { int ( np . round ( v * 100 ) ) } % of the time, '
elif i == len ( cam_excluded_count ) - 1 :
str_cam_excluded_count + = f ' and Camera { k } : { int ( np . round ( v * 100 ) ) } %. '
else :
str_cam_excluded_count + = f ' Camera { k } : { int ( np . round ( v * 100 ) ) } %, '
logging . info ( str_cam_excluded_count )
2024-01-05 11:22:51 +08:00
2024-01-05 17:33:37 +08:00
logging . info ( f ' Limb swapping was { " handled " if handle_LR_swap else " not handled " } . ' )
2024-01-05 11:22:51 +08:00
logging . info ( f ' Lens distortions were { " taken into account " if undistort_points else " not taken into account " } . ' )
2024-01-05 00:20:06 +08:00
logging . info ( f ' \n 3D coordinates are stored at { trc_path } . ' )
def triangulation_from_best_cameras ( config , coords_2D_kpt , coords_2D_kpt_swapped , projection_matrices , calib_params ) :
'''
Triangulates 2 D keypoint coordinates . If reprojection error is above threshold ,
tries swapping left and right sides . If still above , removes a camera until error
is below threshold unless the number of remaining cameras is below a predefined number .
1. Creates subset with N cameras excluded
2. Tries all possible triangulations
3. Chooses the one with smallest reprojection error
If error too big , take off one more camera .
If then below threshold , retain result .
If better but still too big , take off one more camera .
INPUTS :
- a Config . toml file
- coords_2D_kpt : ( x , y , likelihood ) * ncams array
- coords_2D_kpt_swapped : ( x , y , likelihood ) * ncams array with left / right swap
- projection_matrices : list of arrays
OUTPUTS :
- Q : array of triangulated point ( x , y , z , 1. )
- error_min : float
- nb_cams_excluded : int
'''
2024-01-04 21:19:08 +08:00
2024-01-05 00:20:06 +08:00
# Read config
error_threshold_triangulation = config . get ( ' triangulation ' ) . get ( ' reproj_error_threshold_triangulation ' )
min_cameras_for_triangulation = config . get ( ' triangulation ' ) . get ( ' min_cameras_for_triangulation ' )
handle_LR_swap = config . get ( ' triangulation ' ) . get ( ' handle_LR_swap ' )
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
undistort_points = config . get ( ' triangulation ' ) . get ( ' undistort_points ' )
if undistort_points :
calib_params_K = calib_params [ ' K ' ]
calib_params_dist = calib_params [ ' dist ' ]
calib_params_R = calib_params [ ' R ' ]
calib_params_T = calib_params [ ' T ' ]
# Initialize
x_files , y_files , likelihood_files = coords_2D_kpt
x_files_swapped , y_files_swapped , likelihood_files_swapped = coords_2D_kpt_swapped
n_cams = len ( x_files )
error_min = np . inf
nb_cams_off = 0 # cameras will be taken-off until reprojection error is under threshold
2024-01-04 21:19:08 +08:00
2024-01-05 00:20:06 +08:00
while error_min > error_threshold_triangulation and n_cams - nb_cams_off > = min_cameras_for_triangulation :
# Create subsets with "nb_cams_off" cameras excluded
id_cams_off = np . array ( list ( it . combinations ( range ( n_cams ) , nb_cams_off ) ) )
if undistort_points :
calib_params_K_filt = [ calib_params_K ] * len ( id_cams_off )
calib_params_dist_filt = [ calib_params_dist ] * len ( id_cams_off )
calib_params_R_filt = [ calib_params_R ] * len ( id_cams_off )
calib_params_T_filt = [ calib_params_T ] * len ( id_cams_off )
projection_matrices_filt = [ projection_matrices ] * len ( id_cams_off )
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
x_files_filt = np . vstack ( [ x_files . copy ( ) ] * len ( id_cams_off ) )
y_files_filt = np . vstack ( [ y_files . copy ( ) ] * len ( id_cams_off ) )
x_files_swapped_filt = np . vstack ( [ x_files_swapped . copy ( ) ] * len ( id_cams_off ) )
y_files_swapped_filt = np . vstack ( [ y_files_swapped . copy ( ) ] * len ( id_cams_off ) )
2024-01-05 11:22:51 +08:00
likelihood_files_filt = np . vstack ( [ likelihood_files . copy ( ) ] * len ( id_cams_off ) )
2024-01-05 00:20:06 +08:00
if nb_cams_off > 0 :
for i in range ( len ( id_cams_off ) ) :
x_files_filt [ i ] [ id_cams_off [ i ] ] = np . nan
y_files_filt [ i ] [ id_cams_off [ i ] ] = np . nan
x_files_swapped_filt [ i ] [ id_cams_off [ i ] ] = np . nan
y_files_swapped_filt [ i ] [ id_cams_off [ i ] ] = np . nan
likelihood_files_filt [ i ] [ id_cams_off [ i ] ] = np . nan
nb_cams_excluded_filt = [ np . count_nonzero ( np . nan_to_num ( x ) == 0 ) for x in likelihood_files_filt ] # count nans and zeros
if undistort_points :
2024-01-05 11:22:51 +08:00
calib_params_K_filt = [ [ c [ i ] for i in range ( n_cams ) if not np . isnan ( x_files_filt [ j ] [ i ] ) and not x_files_filt [ j ] [ i ] == 0. ] for j , c in enumerate ( calib_params_K_filt ) ]
calib_params_dist_filt = [ [ c [ i ] for i in range ( n_cams ) if not np . isnan ( x_files_filt [ j ] [ i ] ) and not x_files_filt [ j ] [ i ] == 0. ] for j , c in enumerate ( calib_params_dist_filt ) ]
calib_params_R_filt = [ [ c [ i ] for i in range ( n_cams ) if not np . isnan ( x_files_filt [ j ] [ i ] ) and not x_files_filt [ j ] [ i ] == 0. ] for j , c in enumerate ( calib_params_R_filt ) ]
calib_params_T_filt = [ [ c [ i ] for i in range ( n_cams ) if not np . isnan ( x_files_filt [ j ] [ i ] ) and not x_files_filt [ j ] [ i ] == 0. ] for j , c in enumerate ( calib_params_T_filt ) ]
projection_matrices_filt = [ [ p [ i ] for i in range ( n_cams ) if not np . isnan ( x_files_filt [ j ] [ i ] ) and not x_files_filt [ j ] [ i ] == 0. ] for j , p in enumerate ( projection_matrices_filt ) ]
print ( ' \n nb_cams_off ' , repr ( nb_cams_off ) , ' nb_cams_excluded ' , repr ( nb_cams_excluded_filt ) )
nb_cams_off_tot = nb_cams_excluded_filt [ 0 ]
id_cams_off_tot = [ np . argwhere ( np . isnan ( x ) ) . ravel ( ) for x in x_files_filt ]
print ( ' x_files ' , repr ( x_files ) )
print ( ' x_files_filt ' , repr ( x_files_filt ) )
print ( ' id_cams_off_tot ' , id_cams_off_tot )
x_files_filt = [ np . array ( [ xx for ii , xx in enumerate ( x ) if not np . isnan ( xx ) and not xx == 0. ] ) for x in x_files_filt ]
y_files_filt = [ np . array ( [ xx for ii , xx in enumerate ( x ) if not np . isnan ( xx ) and not xx == 0. ] ) for x in y_files_filt ]
x_files_swapped_filt = [ np . array ( [ xx for ii , xx in enumerate ( x ) if not np . isnan ( xx ) and not xx == 0. ] ) for x in x_files_swapped_filt ]
y_files_swapped_filt = [ np . array ( [ xx for ii , xx in enumerate ( x ) if not np . isnan ( xx ) and not xx == 0. ] ) for x in y_files_swapped_filt ]
likelihood_files_filt = [ np . array ( [ xx for ii , xx in enumerate ( x ) if not np . isnan ( xx ) and not xx == 0. ] ) for x in likelihood_files_filt ]
print ( ' x_files_filt ' , repr ( x_files_filt ) )
2024-01-05 00:20:06 +08:00
# Triangulate 2D points
Q_filt = [ weighted_triangulation ( projection_matrices_filt [ i ] , x_files_filt [ i ] , y_files_filt [ i ] , likelihood_files_filt [ i ] ) for i in range ( len ( id_cams_off ) ) ]
# Reprojection
if undistort_points :
2024-01-05 17:33:37 +08:00
coords_2D_kpt_calc_filt = [ np . array ( [ cv2 . projectPoints ( np . array ( Q_filt [ i ] [ : - 1 ] ) , calib_params_R_filt [ i ] [ j ] , calib_params_T_filt [ i ] [ j ] , calib_params_K_filt [ i ] [ j ] , calib_params_dist_filt [ i ] [ j ] ) [ 0 ] . ravel ( )
for j in range ( n_cams - nb_cams_excluded_filt [ i ] ) ] )
for i in range ( len ( id_cams_off ) ) ]
coords_2D_kpt_calc_filt = [ [ coords_2D_kpt_calc_filt [ i ] [ : , 0 ] , coords_2D_kpt_calc_filt [ i ] [ : , 1 ] ] for i in range ( len ( id_cams_off ) ) ]
2024-01-05 00:20:06 +08:00
else :
coords_2D_kpt_calc_filt = [ reprojection ( projection_matrices_filt [ i ] , Q_filt [ i ] ) for i in range ( len ( id_cams_off ) ) ]
coords_2D_kpt_calc_filt = np . array ( coords_2D_kpt_calc_filt , dtype = object )
x_calc_filt = coords_2D_kpt_calc_filt [ : , 0 ]
2024-01-05 17:33:37 +08:00
print ( ' x_calc_filt ' , x_calc_filt )
2024-01-05 00:20:06 +08:00
y_calc_filt = coords_2D_kpt_calc_filt [ : , 1 ]
# Reprojection error
error = [ ]
for config_off_id in range ( len ( x_calc_filt ) ) :
q_file = [ ( x_files_filt [ config_off_id ] [ i ] , y_files_filt [ config_off_id ] [ i ] ) for i in range ( len ( x_files_filt [ config_off_id ] ) ) ]
q_calc = [ ( x_calc_filt [ config_off_id ] [ i ] , y_calc_filt [ config_off_id ] [ i ] ) for i in range ( len ( x_calc_filt [ config_off_id ] ) ) ]
error . append ( np . mean ( [ euclidean_distance ( q_file [ i ] , q_calc [ i ] ) for i in range ( len ( q_file ) ) ] ) )
2024-01-05 11:22:51 +08:00
print ( ' error ' , error )
2024-01-05 00:20:06 +08:00
# Choosing best triangulation (with min reprojection error)
error_min = min ( error )
best_cams = np . argmin ( error )
nb_cams_excluded = nb_cams_excluded_filt [ best_cams ]
Q = Q_filt [ best_cams ] [ : - 1 ]
# Swap left and right sides if reprojection error still too high
if handle_LR_swap and error_min > error_threshold_triangulation :
n_cams_swapped = 1
error_off_swap_min = error_min
2024-01-05 11:22:51 +08:00
while error_off_swap_min > error_threshold_triangulation and n_cams_swapped < ( n_cams - nb_cams_off_tot ) / 2 : # more than half of the cameras switched: may triangulate twice the same side
print ( ' nb_cams_off ' , nb_cams_off , ' n_cams_swapped ' , n_cams_swapped , ' nb_cams_off_tot ' , nb_cams_off_tot )
2024-01-05 00:20:06 +08:00
# Create subsets
2024-01-05 11:22:51 +08:00
id_cams_swapped = np . array ( list ( it . combinations ( range ( n_cams - nb_cams_off_tot ) , n_cams_swapped ) ) )
2024-01-05 17:33:37 +08:00
print ( ' id_cams_swapped ' , id_cams_swapped )
2024-01-05 00:20:06 +08:00
x_files_filt_off_swap = np . array ( [ [ x ] * len ( id_cams_swapped ) for x in x_files_filt ] )
y_files_filt_off_swap = np . array ( [ [ y ] * len ( id_cams_swapped ) for y in y_files_filt ] )
2024-01-05 11:22:51 +08:00
for id_off in range ( len ( id_cams_off ) ) : # for each configuration with nb_cams_off_tot removed
2024-01-05 00:20:06 +08:00
for id_swapped , config_swapped in enumerate ( id_cams_swapped ) : # for each of these configurations, test all subconfigurations with with n_cams_swapped swapped
2024-01-05 11:22:51 +08:00
x_files_filt_off_swap [ id_off ] [ id_swapped , config_swapped ] = x_files_swapped_filt [ id_off ] [ config_swapped ]
y_files_filt_off_swap [ id_off ] [ id_swapped , config_swapped ] = y_files_swapped_filt [ id_off ] [ config_swapped ]
2024-01-05 00:20:06 +08:00
# Triangulate 2D points
Q_filt_off_swap = np . array ( [ [ weighted_triangulation ( projection_matrices_filt [ id_off ] , x_files_filt_off_swap [ id_off , id_swapped ] , y_files_filt_off_swap [ id_off , id_swapped ] , likelihood_files_filt [ id_off ] )
for id_swapped in range ( len ( id_cams_swapped ) ) ]
for id_off in range ( len ( id_cams_off ) ) ] )
# Reprojection
if undistort_points :
2024-01-05 17:33:37 +08:00
coords_2D_kpt_calc_off_swap = [ np . array ( [ [ cv2 . projectPoints ( np . array ( Q_filt_off_swap [ id_off ] [ id_swapped ] [ : - 1 ] ) , calib_params_R_filt [ id_off ] [ j ] , calib_params_T_filt [ id_off ] [ j ] , calib_params_K_filt [ id_off ] [ j ] , calib_params_dist_filt [ id_off ] [ j ] ) [ 0 ] . ravel ( )
2024-01-05 11:22:51 +08:00
for j in range ( n_cams - nb_cams_off_tot ) ]
2024-01-05 17:33:37 +08:00
for id_swapped in range ( len ( id_cams_swapped ) ) ] )
for id_off in range ( len ( id_cams_off ) ) ]
coords_2D_kpt_calc_off_swap = np . array ( [ [ [ coords_2D_kpt_calc_off_swap [ id_off ] [ id_swapped , : , 0 ] , coords_2D_kpt_calc_off_swap [ id_off ] [ id_swapped , : , 1 ] ]
2024-01-05 00:20:06 +08:00
for id_swapped in range ( len ( id_cams_swapped ) ) ]
for id_off in range ( len ( id_cams_off ) ) ] )
else :
2024-01-05 17:33:37 +08:00
coords_2D_kpt_calc_off_swap = np . array ( [ [ reprojection ( projection_matrices_filt [ id_off ] , Q_filt_off_swap [ id_off ] [ id_swapped ] )
2024-01-05 00:20:06 +08:00
for id_swapped in range ( len ( id_cams_swapped ) ) ]
2024-01-05 17:43:17 +08:00
for id_off in range ( len ( id_cams_off ) ) ] )
2024-01-05 00:20:06 +08:00
x_calc_off_swap = coords_2D_kpt_calc_off_swap [ : , : , 0 ]
y_calc_off_swap = coords_2D_kpt_calc_off_swap [ : , : , 1 ]
# Reprojection error
2024-01-05 11:22:51 +08:00
print ( ' x_files_filt_off_swap ' , x_files_filt_off_swap )
2024-01-05 00:20:06 +08:00
error_off_swap = [ ]
for id_off in range ( len ( id_cams_off ) ) :
error_percam = [ ]
for id_swapped , config_swapped in enumerate ( id_cams_swapped ) :
2024-01-05 11:22:51 +08:00
# print(id_off,id_swapped,n_cams,nb_cams_off)
# print(repr(x_files_filt_off_swap))
q_file_off_swap = [ ( x_files_filt_off_swap [ id_off , id_swapped , i ] , y_files_filt_off_swap [ id_off , id_swapped , i ] ) for i in range ( n_cams - nb_cams_off_tot ) ]
q_calc_off_swap = [ ( x_calc_off_swap [ id_off , id_swapped , i ] , y_calc_off_swap [ id_off , id_swapped , i ] ) for i in range ( n_cams - nb_cams_off_tot ) ]
2024-01-05 00:20:06 +08:00
error_percam . append ( np . mean ( [ euclidean_distance ( q_file_off_swap [ i ] , q_calc_off_swap [ i ] ) for i in range ( len ( q_file_off_swap ) ) ] ) )
error_off_swap . append ( error_percam )
error_off_swap = np . array ( error_off_swap )
2024-01-05 11:22:51 +08:00
print ( ' error_off_swap ' , error_off_swap )
2024-01-05 00:20:06 +08:00
# Choosing best triangulation (with min reprojection error)
error_off_swap_min = np . min ( error_off_swap )
best_off_swap_config = np . unravel_index ( error_off_swap . argmin ( ) , error_off_swap . shape )
id_off_cams = best_off_swap_config [ 0 ]
id_swapped_cams = id_cams_swapped [ best_off_swap_config [ 1 ] ]
Q_best = Q_filt_off_swap [ best_off_swap_config ] [ : - 1 ]
n_cams_swapped + = 1
if error_off_swap_min < error_min :
error_min = error_off_swap_min
best_cams = id_off_cams
Q = Q_best
nb_cams_off + = 1
# Index of excluded cams for this keypoint
2024-01-05 11:22:51 +08:00
id_excluded_cams = id_cams_off_tot [ best_cams ]
2024-01-05 00:20:06 +08:00
2024-01-05 11:22:51 +08:00
# If triangulation not successful, error = nan, and 3D coordinates as missing values
2024-01-05 00:20:06 +08:00
if error_min > error_threshold_triangulation :
error_min = np . nan
Q = np . array ( [ np . nan , np . nan , np . nan ] )
return Q , error_min , nb_cams_excluded , id_excluded_cams
def extract_files_frame_f ( json_tracked_files_f , keypoints_ids ) :
'''
Extract data from json files for frame f ,
in the order of the body model hierarchy .
INPUTS :
- json_tracked_files_f : list of str . Paths of json_files for frame f .
- keypoints_ids : list of int . Keypoints IDs in the order of the hierarchy .
OUTPUTS :
- x_files , y_files , likelihood_files : array :
n_cams lists of n_keypoints lists of coordinates .
2023-07-19 17:37:20 +08:00
'''
2024-01-05 00:20:06 +08:00
n_cams = len ( json_tracked_files_f )
x_files , y_files , likelihood_files = [ ] , [ ] , [ ]
for cam_nb in range ( n_cams ) :
x_files_cam , y_files_cam , likelihood_files_cam = [ ] , [ ] , [ ]
with open ( json_tracked_files_f [ cam_nb ] , ' r ' ) as json_f :
js = json . load ( json_f )
for keypoint_id in keypoints_ids :
try :
x_files_cam . append ( js [ ' people ' ] [ 0 ] [ ' pose_keypoints_2d ' ] [ keypoint_id * 3 ] )
y_files_cam . append ( js [ ' people ' ] [ 0 ] [ ' pose_keypoints_2d ' ] [ keypoint_id * 3 + 1 ] )
likelihood_files_cam . append ( js [ ' people ' ] [ 0 ] [ ' pose_keypoints_2d ' ] [ keypoint_id * 3 + 2 ] )
except :
x_files_cam . append ( np . nan )
y_files_cam . append ( np . nan )
likelihood_files_cam . append ( np . nan )
x_files . append ( x_files_cam )
y_files . append ( y_files_cam )
likelihood_files . append ( likelihood_files_cam )
x_files = np . array ( x_files )
y_files = np . array ( y_files )
likelihood_files = np . array ( likelihood_files )
return x_files , y_files , likelihood_files
def triangulate_all ( config ) :
'''
For each frame
For each keypoint
- Triangulate keypoint
- Reproject it on all cameras
- Take off cameras until requirements are met
Interpolate missing values
Create trc file
2023-07-19 17:37:20 +08:00
Print recap message
2024-01-05 00:20:06 +08:00
INPUTS :
2023-07-19 17:37:20 +08:00
- a calibration file ( . toml extension )
2024-01-05 00:20:06 +08:00
- json files for each camera with only one person of interest
2023-07-19 17:37:20 +08:00
- a Config . toml file
- a skeleton model
OUTPUTS :
2024-01-05 00:20:06 +08:00
- a . trc file with 3 D coordinates in Y - up system coordinates
2023-07-19 17:37:20 +08:00
'''
# Read config
project_dir = config . get ( ' project ' ) . get ( ' project_dir ' )
2023-12-17 05:15:12 +08:00
session_dir = os . path . realpath ( os . path . join ( project_dir , ' .. ' , ' .. ' ) )
2023-07-19 17:37:20 +08:00
pose_model = config . get ( ' pose ' ) . get ( ' pose_model ' )
frame_range = config . get ( ' project ' ) . get ( ' frame_range ' )
2024-01-05 17:33:37 +08:00
likelihood_threshold = config . get ( ' triangulation ' ) . get ( ' likelihood_threshold_triangulation ' )
2024-01-05 00:20:06 +08:00
interpolation_kind = config . get ( ' triangulation ' ) . get ( ' interpolation ' )
interp_gap_smaller_than = config . get ( ' triangulation ' ) . get ( ' interp_if_gap_smaller_than ' )
show_interp_indices = config . get ( ' triangulation ' ) . get ( ' show_interp_indices ' )
2024-01-03 03:15:43 +08:00
undistort_points = config . get ( ' triangulation ' ) . get ( ' undistort_points ' )
2024-01-05 00:20:06 +08:00
2023-12-17 05:15:12 +08:00
calib_dir = [ os . path . join ( session_dir , c ) for c in os . listdir ( session_dir ) if ( ' Calib ' or ' calib ' ) in c ] [ 0 ]
calib_file = glob . glob ( os . path . join ( calib_dir , ' *.toml ' ) ) [ 0 ] # lastly created calibration file
2023-12-06 16:48:11 +08:00
pose_dir = os . path . join ( project_dir , ' pose ' )
poseTracked_dir = os . path . join ( project_dir , ' pose-associated ' )
2024-01-05 00:20:06 +08:00
# Projection matrix from toml calibration file
2024-01-03 03:15:43 +08:00
P = computeP ( calib_file , undistort = undistort_points )
calib_params = retrieve_calib_params ( calib_file )
2024-01-05 00:20:06 +08:00
# Retrieve keypoints from model
2023-11-01 22:56:37 +08:00
try : # from skeletons.py
model = eval ( pose_model )
except :
try : # from Config.toml
model = DictImporter ( ) . import_ ( config . get ( ' pose ' ) . get ( pose_model ) )
if model . id == ' None ' :
model . id = None
except :
raise NameError ( ' Model not found in skeletons.py nor in Config.toml ' )
2024-01-05 00:20:06 +08:00
keypoints_ids = [ node . id for _ , _ , node in RenderTree ( model ) if node . id != None ]
keypoints_names = [ node . name for _ , _ , node in RenderTree ( model ) if node . id != None ]
keypoints_idx = list ( range ( len ( keypoints_ids ) ) )
keypoints_nb = len ( keypoints_ids )
# left/right swapped keypoints
keypoints_names_swapped = [ keypoint_name . replace ( ' R ' , ' L ' ) if keypoint_name . startswith ( ' R ' ) else keypoint_name . replace ( ' L ' , ' R ' ) if keypoint_name . startswith ( ' L ' ) else keypoint_name for keypoint_name in keypoints_names ]
keypoints_names_swapped = [ keypoint_name_swapped . replace ( ' right ' , ' left ' ) if keypoint_name_swapped . startswith ( ' right ' ) else keypoint_name_swapped . replace ( ' left ' , ' right ' ) if keypoint_name_swapped . startswith ( ' left ' ) else keypoint_name_swapped for keypoint_name_swapped in keypoints_names_swapped ]
keypoints_idx_swapped = [ keypoints_names . index ( keypoint_name_swapped ) for keypoint_name_swapped in keypoints_names_swapped ] # find index of new keypoint_name
2023-12-29 06:08:50 +08:00
2023-07-19 17:37:20 +08:00
# 2d-pose files selection
pose_listdirs_names = next ( os . walk ( pose_dir ) ) [ 1 ]
pose_listdirs_names = natural_sort ( pose_listdirs_names )
2023-12-06 16:48:11 +08:00
json_dirs_names = [ k for k in pose_listdirs_names if ' json ' in k ]
2024-01-05 00:20:06 +08:00
try :
json_files_names = [ fnmatch . filter ( os . listdir ( os . path . join ( poseTracked_dir , js_dir ) ) , ' *.json ' ) for js_dir in json_dirs_names ]
json_files_names = [ natural_sort ( j ) for j in json_files_names ]
json_tracked_files = [ [ os . path . join ( poseTracked_dir , j_dir , j_file ) for j_file in json_files_names [ j ] ] for j , j_dir in enumerate ( json_dirs_names ) ]
except :
json_files_names = [ fnmatch . filter ( os . listdir ( os . path . join ( pose_dir , js_dir ) ) , ' *.json ' ) for js_dir in json_dirs_names ]
json_files_names = [ natural_sort ( j ) for j in json_files_names ]
json_tracked_files = [ [ os . path . join ( pose_dir , j_dir , j_file ) for j_file in json_files_names [ j ] ] for j , j_dir in enumerate ( json_dirs_names ) ]
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
# Triangulation
f_range = [ [ 0 , min ( [ len ( j ) for j in json_files_names ] ) ] if frame_range == [ ] else frame_range ] [ 0 ]
frames_nb = f_range [ 1 ] - f_range [ 0 ]
2023-07-19 17:37:20 +08:00
n_cams = len ( json_dirs_names )
2024-01-05 00:20:06 +08:00
Q_tot , error_tot , nb_cams_excluded_tot , id_excluded_cams_tot = [ ] , [ ] , [ ] , [ ]
2023-07-19 17:37:20 +08:00
for f in tqdm ( range ( * f_range ) ) :
2024-01-05 00:20:06 +08:00
# Get x,y,likelihood values from files
2023-07-19 17:37:20 +08:00
json_tracked_files_f = [ json_tracked_files [ c ] [ f ] for c in range ( n_cams ) ]
2024-01-05 11:22:51 +08:00
print ( json_tracked_files_f )
2024-01-05 00:20:06 +08:00
x_files , y_files , likelihood_files = extract_files_frame_f ( json_tracked_files_f , keypoints_ids )
# undistort points
if undistort_points :
points = [ np . array ( tuple ( zip ( x_files [ i ] , y_files [ i ] ) ) ) . reshape ( - 1 , 1 , 2 ) . astype ( ' float32 ' ) for i in range ( n_cams ) ]
undistorted_points = [ cv2 . undistortPoints ( points [ i ] , calib_params [ ' K ' ] [ i ] , calib_params [ ' dist ' ] [ i ] , None , calib_params [ ' optim_K ' ] [ i ] ) for i in range ( n_cams ) ]
2024-01-05 11:22:51 +08:00
x_files = np . array ( [ [ u [ i ] [ 0 ] [ 0 ] for i in range ( len ( u ) ) ] for u in undistorted_points ] )
y_files = np . array ( [ [ u [ i ] [ 0 ] [ 1 ] for i in range ( len ( u ) ) ] for u in undistorted_points ] )
2024-01-05 00:20:06 +08:00
# This is good for slight distortion. For fishey camera, the model does not work anymore. See there for an example https://github.com/lambdaloop/aniposelib/blob/d03b485c4e178d7cff076e9fe1ac36837db49158/aniposelib/cameras.py#L301
# Replace likelihood by 0 if under likelihood_threshold
with np . errstate ( invalid = ' ignore ' ) :
2024-01-05 11:22:51 +08:00
x_files [ likelihood_files < likelihood_threshold ] = 0.
y_files [ likelihood_files < likelihood_threshold ] = 0.
2024-01-05 00:20:06 +08:00
likelihood_files [ likelihood_files < likelihood_threshold ] = 0.
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
Q , error , nb_cams_excluded , id_excluded_cams = [ ] , [ ] , [ ] , [ ]
for keypoint_idx in keypoints_idx :
# Triangulate cameras with min reprojection error
coords_2D_kpt = np . array ( ( x_files [ : , keypoint_idx ] , y_files [ : , keypoint_idx ] , likelihood_files [ : , keypoint_idx ] ) )
coords_2D_kpt_swapped = np . array ( ( x_files [ : , keypoints_idx_swapped [ keypoint_idx ] ] , y_files [ : , keypoints_idx_swapped [ keypoint_idx ] ] , likelihood_files [ : , keypoints_idx_swapped [ keypoint_idx ] ] ) )
Q_kpt , error_kpt , nb_cams_excluded_kpt , id_excluded_cams_kpt = triangulation_from_best_cameras ( config , coords_2D_kpt , coords_2D_kpt_swapped , P , calib_params ) # P has been modified if undistort_points=True
Q . append ( Q_kpt )
error . append ( error_kpt )
nb_cams_excluded . append ( nb_cams_excluded_kpt )
id_excluded_cams . append ( id_excluded_cams_kpt )
# Add triangulated points, errors and excluded cameras to pandas dataframes
Q_tot . append ( np . concatenate ( Q ) )
error_tot . append ( error )
nb_cams_excluded_tot . append ( nb_cams_excluded )
id_excluded_cams = [ item for sublist in id_excluded_cams for item in sublist ]
id_excluded_cams_tot . append ( id_excluded_cams )
Q_tot = pd . DataFrame ( Q_tot )
error_tot = pd . DataFrame ( error_tot )
nb_cams_excluded_tot = pd . DataFrame ( nb_cams_excluded_tot )
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
id_excluded_cams_tot = [ item for sublist in id_excluded_cams_tot for item in sublist ]
cam_excluded_count = dict ( Counter ( id_excluded_cams_tot ) )
cam_excluded_count . update ( ( x , y / keypoints_nb / frames_nb ) for x , y in cam_excluded_count . items ( ) )
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
error_tot [ ' mean ' ] = error_tot . mean ( axis = 1 )
nb_cams_excluded_tot [ ' mean ' ] = nb_cams_excluded_tot . mean ( axis = 1 )
# Optionally, for each keypoint, show indices of frames that should be interpolated
if show_interp_indices :
zero_nan_frames = np . where ( Q_tot . iloc [ : , : : 3 ] . T . eq ( 0 ) | ~ np . isfinite ( Q_tot . iloc [ : , : : 3 ] . T ) )
zero_nan_frames_per_kpt = [ zero_nan_frames [ 1 ] [ np . where ( zero_nan_frames [ 0 ] == k ) [ 0 ] ] for k in range ( keypoints_nb ) ]
gaps = [ np . where ( np . diff ( zero_nan_frames_per_kpt [ k ] ) > 1 ) [ 0 ] + 1 for k in range ( keypoints_nb ) ]
sequences = [ np . split ( zero_nan_frames_per_kpt [ k ] , gaps [ k ] ) for k in range ( keypoints_nb ) ]
interp_frames = [ [ f ' { seq [ 0 ] } : { seq [ - 1 ] + 1 } ' for seq in seq_kpt if len ( seq ) < = interp_gap_smaller_than and len ( seq ) > 0 ] for seq_kpt in sequences ]
non_interp_frames = [ [ f ' { seq [ 0 ] } : { seq [ - 1 ] + 1 } ' for seq in seq_kpt if len ( seq ) > interp_gap_smaller_than ] for seq_kpt in sequences ]
else :
interp_frames = None
non_interp_frames = [ ]
# Interpolate missing values
if interpolation_kind != ' none ' :
Q_tot = Q_tot . apply ( interpolate_zeros_nans , axis = 0 , args = [ interp_gap_smaller_than , interpolation_kind ] )
Q_tot . replace ( np . nan , 0 , inplace = True )
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
# Create TRC file
trc_path = make_trc ( config , Q_tot , keypoints_names , f_range )
2023-07-19 17:37:20 +08:00
2024-01-05 00:20:06 +08:00
# Recap message
recap_triangulate ( config , error_tot , nb_cams_excluded_tot , keypoints_names , cam_excluded_count , interp_frames , non_interp_frames , trc_path )