2023-07-19 17:37:20 +08:00
|
|
|
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
###########################################################################
|
|
|
|
## POSE2SIM ##
|
|
|
|
###########################################################################
|
|
|
|
|
|
|
|
This repository offers a way to perform markerless kinematics, and gives an
|
|
|
|
example workflow from an Openpose input to an OpenSim result.
|
|
|
|
|
|
|
|
It offers tools for:
|
|
|
|
- 2D pose estimation,
|
|
|
|
- Cameras calibration,
|
|
|
|
- Tracking the person of interest,
|
|
|
|
- Robust triangulation,
|
|
|
|
- Filtration.
|
|
|
|
|
|
|
|
It has been tested on Windows 10 but should work similarly on Linux.
|
|
|
|
Please subscribe to this issue if you wish to be notified of the code release.
|
|
|
|
See https://github.com/perfanalytics/pose2sim
|
|
|
|
|
|
|
|
Installation:
|
|
|
|
# Open Anaconda prompt. Type:
|
|
|
|
# - conda create -n Pose2Sim python=3.7 tensorflow-gpu=1.13.1
|
|
|
|
# - conda activate Pose2Sim
|
|
|
|
# - conda install Pose2Sim
|
|
|
|
|
|
|
|
Usage:
|
2023-08-19 14:59:34 +08:00
|
|
|
# First run Pose estimation and organize your directories (see Readme.md)
|
2023-07-19 17:37:20 +08:00
|
|
|
from Pose2Sim import Pose2Sim
|
2023-08-19 14:59:34 +08:00
|
|
|
Pose2Sim.calibration()
|
|
|
|
Pose2Sim.personAssociation()
|
|
|
|
Pose2Sim.triangulation()
|
|
|
|
Pose2Sim.filtering()
|
|
|
|
# Then run OpenSim (see Readme.md)
|
2023-07-19 17:37:20 +08:00
|
|
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
|
|
## INIT
|
|
|
|
import toml
|
|
|
|
import os
|
|
|
|
import time
|
2023-12-08 00:08:34 +08:00
|
|
|
from copy import deepcopy
|
2023-07-19 17:37:20 +08:00
|
|
|
import logging, logging.handlers
|
|
|
|
|
|
|
|
|
|
|
|
## AUTHORSHIP INFORMATION
|
|
|
|
__author__ = "David Pagnon"
|
|
|
|
__copyright__ = "Copyright 2021, Pose2Sim"
|
|
|
|
__credits__ = ["David Pagnon"]
|
|
|
|
__license__ = "BSD 3-Clause License"
|
|
|
|
__version__ = "0.4"
|
|
|
|
__maintainer__ = "David Pagnon"
|
|
|
|
__email__ = "contact@david-pagnon.com"
|
|
|
|
__status__ = "Development"
|
|
|
|
|
|
|
|
|
|
|
|
## FUNCTIONS
|
2023-12-09 19:53:43 +08:00
|
|
|
def setup_logging(session_dir):
|
|
|
|
'''
|
|
|
|
Create logging file and stream handlers
|
|
|
|
'''
|
|
|
|
with open(os.path.join(session_dir, 'logs.txt'), 'a+') as log_f: pass
|
|
|
|
logging.basicConfig(format='%(message)s', level=logging.INFO,
|
|
|
|
handlers = [logging.handlers.TimedRotatingFileHandler(os.path.join(session_dir, 'logs.txt'), when='D', interval=7), logging.StreamHandler()])
|
|
|
|
|
|
|
|
|
2023-12-08 00:08:34 +08:00
|
|
|
def recursive_update(dict_to_update, dict_with_new_values):
|
|
|
|
'''
|
|
|
|
Update nested dictionaries without overwriting existing keys in any level of nesting
|
2023-12-06 16:48:11 +08:00
|
|
|
|
2023-12-08 00:08:34 +08:00
|
|
|
Example:
|
|
|
|
dict_to_update = {'key': {'key_1': 'val_1', 'key_2': 'val_2'}}
|
|
|
|
dict_with_new_values = {'key': {'key_1': 'val_1_new'}}
|
|
|
|
returns {'key': {'key_1': 'val_1_new', 'key_2': 'val_2'}}
|
|
|
|
while dict_to_update.update(dict_with_new_values) would return {'key': {'key_1': 'val_1_new'}}
|
|
|
|
'''
|
|
|
|
|
|
|
|
for key, value in dict_with_new_values.items():
|
|
|
|
if key in dict_to_update and isinstance(value, dict) and isinstance(dict_to_update[key], dict):
|
|
|
|
# Recursively update nested dictionaries
|
|
|
|
dict_to_update[key] = recursive_update(dict_to_update[key], value)
|
|
|
|
else:
|
|
|
|
# Update or add new key-value pairs
|
|
|
|
dict_to_update[key] = value
|
|
|
|
|
|
|
|
return dict_to_update
|
|
|
|
|
|
|
|
|
2023-12-10 05:06:57 +08:00
|
|
|
def determine_level(config_dir):
|
|
|
|
'''
|
|
|
|
Determine the level at which the function is called.
|
|
|
|
Level = 1: Trial folder
|
|
|
|
Level = 2: Participant folder
|
|
|
|
Level = 3: Session folder
|
|
|
|
'''
|
|
|
|
|
|
|
|
len_paths = [len(root.split(os.sep)) for root,dirs,files in os.walk(config_dir) if 'Config.toml' in files]
|
|
|
|
level = max(len_paths) - min(len_paths) + 1
|
|
|
|
return level
|
|
|
|
|
|
|
|
|
|
|
|
def read_config_files(config):
|
2023-12-08 00:08:34 +08:00
|
|
|
'''
|
|
|
|
Read Session, Participant, and Trial configuration files,
|
|
|
|
and output a dictionary with all the parameters.
|
|
|
|
'''
|
|
|
|
|
2023-12-10 05:06:57 +08:00
|
|
|
if type(config)==dict:
|
|
|
|
level = 3 # log_dir = os.getcwd()
|
|
|
|
config_dicts = [config]
|
|
|
|
if config_dicts[0].get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_PROJECT_DIRECTORY>"})')
|
|
|
|
else:
|
|
|
|
# if launched without an argument, config == None, else it is the path to the config directory
|
|
|
|
config_dir = ['.' if config == None else config][0]
|
|
|
|
level = determine_level(config_dir)
|
|
|
|
|
|
|
|
# Trial level
|
|
|
|
if level == 1:
|
|
|
|
session_config_dict = toml.load(os.path.join(config_dir, '..','..','Config.toml'))
|
|
|
|
participant_config_dict = toml.load(os.path.join(config_dir, '..','Config.toml'))
|
|
|
|
trial_config_dict = toml.load(os.path.join(config_dir, 'Config.toml'))
|
|
|
|
|
|
|
|
session_config_dict = recursive_update(session_config_dict,participant_config_dict)
|
|
|
|
session_config_dict = recursive_update(session_config_dict,trial_config_dict)
|
|
|
|
session_config_dict.get("project").update({"project_dir":config_dir})
|
|
|
|
config_dicts = [session_config_dict]
|
|
|
|
|
|
|
|
# Participant level
|
|
|
|
if level == 2:
|
|
|
|
session_config_dict = toml.load(os.path.join(config_dir, '..','Config.toml'))
|
|
|
|
participant_config_dict = toml.load(os.path.join(config_dir, 'Config.toml'))
|
|
|
|
config_dicts = []
|
|
|
|
# Create config dictionaries for all trials of the participant
|
|
|
|
for (root,dirs,files) in os.walk(config_dir):
|
|
|
|
if 'Config.toml' in files and root != config_dir:
|
2023-12-08 00:08:34 +08:00
|
|
|
trial_config_dict = toml.load(os.path.join(root, files[0]))
|
|
|
|
# deep copy, otherwise session_config_dict is modified at each iteration within the config_dicts list
|
|
|
|
temp_dict = deepcopy(session_config_dict)
|
|
|
|
temp_dict = recursive_update(temp_dict,participant_config_dict)
|
|
|
|
temp_dict = recursive_update(temp_dict,trial_config_dict)
|
2023-12-10 05:06:57 +08:00
|
|
|
temp_dict.get("project").update({"project_dir":os.path.join(config_dir, os.path.relpath(root))})
|
|
|
|
if not os.path.basename(root) in temp_dict.get("project").get('exclude_from_batch'):
|
2023-12-08 00:08:34 +08:00
|
|
|
config_dicts.append(temp_dict)
|
|
|
|
|
2023-12-10 05:06:57 +08:00
|
|
|
# Session level
|
|
|
|
if level == 3:
|
|
|
|
session_config_dict = toml.load(os.path.join(config_dir, 'Config.toml'))
|
|
|
|
config_dicts = []
|
|
|
|
# Create config dictionaries for all trials of all participants of the session
|
|
|
|
for (root,dirs,files) in os.walk(config_dir):
|
|
|
|
if 'Config.toml' in files and root != config_dir:
|
|
|
|
# participant
|
|
|
|
if determine_level(root) == 2:
|
|
|
|
participant_config_dict = toml.load(os.path.join(root, files[0]))
|
|
|
|
# trial
|
|
|
|
elif determine_level(root) == 1:
|
|
|
|
trial_config_dict = toml.load(os.path.join(root, files[0]))
|
|
|
|
# deep copy, otherwise session_config_dict is modified at each iteration within the config_dicts list
|
|
|
|
temp_dict = deepcopy(session_config_dict)
|
|
|
|
temp_dict = recursive_update(temp_dict,participant_config_dict)
|
|
|
|
temp_dict = recursive_update(temp_dict,trial_config_dict)
|
|
|
|
temp_dict.get("project").update({"project_dir":os.path.join(config_dir, os.path.relpath(root))})
|
|
|
|
if not os.path.relpath(root) in [os.path.relpath(p) for p in temp_dict.get("project").get('exclude_from_batch')]:
|
|
|
|
config_dicts.append(temp_dict)
|
|
|
|
|
|
|
|
return level, config_dicts
|
2023-07-19 17:37:20 +08:00
|
|
|
|
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
def calibration(config=None):
|
2023-09-18 02:48:13 +08:00
|
|
|
'''
|
2023-12-06 16:48:11 +08:00
|
|
|
Cameras calibration from checkerboards or from qualisys files.
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-10 05:06:57 +08:00
|
|
|
config can be a dictionary,
|
2023-12-17 05:15:12 +08:00
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-09-18 02:48:13 +08:00
|
|
|
'''
|
2023-12-06 16:48:11 +08:00
|
|
|
|
|
|
|
from Pose2Sim.calibration import calibrate_cams_all
|
2023-12-08 00:08:34 +08:00
|
|
|
|
2023-12-10 05:06:57 +08:00
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
config_dict = config_dicts[0]
|
2023-12-08 16:39:34 +08:00
|
|
|
session_dir = os.path.realpath([os.getcwd() if level==3 else os.path.join(os.getcwd(), '..') if level==2 else os.path.join(os.getcwd(), '..', '..')][0])
|
|
|
|
config_dict.get("project").update({"project_dir":session_dir})
|
2023-12-06 16:48:11 +08:00
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
# Set up logging
|
2023-12-09 19:53:43 +08:00
|
|
|
setup_logging(session_dir)
|
2023-09-18 02:48:13 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
# Run calibration
|
2023-12-08 16:39:34 +08:00
|
|
|
calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if ('Calib' or 'calib') in c][0]
|
2023-09-18 02:48:13 +08:00
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
2023-12-06 16:48:11 +08:00
|
|
|
logging.info("Camera calibration")
|
2023-09-18 02:48:13 +08:00
|
|
|
logging.info("---------------------------------------------------------------------")
|
2023-12-08 16:39:34 +08:00
|
|
|
logging.info(f"\nCalibration directory: {calib_dir}")
|
2023-09-18 02:48:13 +08:00
|
|
|
start = time.time()
|
|
|
|
|
2023-12-06 16:48:11 +08:00
|
|
|
calibrate_cams_all(config_dict)
|
2023-09-18 02:48:13 +08:00
|
|
|
|
|
|
|
end = time.time()
|
2023-12-06 16:48:11 +08:00
|
|
|
logging.info(f'Calibration took {end-start:.2f} s.')
|
2023-09-18 02:48:13 +08:00
|
|
|
|
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
def poseEstimation(config=None):
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
2023-12-06 16:48:11 +08:00
|
|
|
Estimate pose using BlazePose, OpenPose, AlphaPose, or DeepLabCut.
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
config can be a dictionary,
|
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
2023-12-06 16:48:11 +08:00
|
|
|
|
|
|
|
raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation')
|
|
|
|
|
|
|
|
# TODO
|
2023-12-18 00:24:09 +08:00
|
|
|
# Determine the level at which the function is called (session:3, participant:2, trial:1)
|
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
if type(config)==dict:
|
2023-12-18 00:24:09 +08:00
|
|
|
config_dict = config_dicts[0]
|
|
|
|
if config_dict.get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_TRIAL_DIRECTORY>"})')
|
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
|
|
|
setup_logging(session_dir)
|
|
|
|
|
|
|
|
# Batch process all trials
|
|
|
|
for config_dict in config_dicts:
|
|
|
|
start = time.time()
|
|
|
|
project_dir = os.path.realpath(config_dict.get('project').get('project_dir'))
|
|
|
|
seq_name = os.path.basename(project_dir)
|
|
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
|
|
frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0]
|
|
|
|
|
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
|
|
|
logging.info("Camera synchronization")
|
|
|
|
logging.info("---------------------------------------------------------------------")
|
|
|
|
logging.info(f"\nProject directory: {project_dir}")
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-18 00:24:09 +08:00
|
|
|
pose_estimation_all(config_dict)
|
|
|
|
|
|
|
|
end = time.time()
|
|
|
|
logging.info(f'Pose estimation took {end-start:.2f} s.')
|
2023-07-19 17:37:20 +08:00
|
|
|
|
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
def synchronization(config=None):
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
2023-09-20 20:39:40 +08:00
|
|
|
Synchronize cameras if needed.
|
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
config can be a dictionary,
|
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
2023-09-20 20:39:40 +08:00
|
|
|
|
|
|
|
raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation')
|
2023-09-18 02:48:13 +08:00
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
#TODO
|
2023-12-18 00:24:09 +08:00
|
|
|
# Determine the level at which the function is called (session:3, participant:2, trial:1)
|
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
if type(config)==dict:
|
2023-12-18 00:24:09 +08:00
|
|
|
config_dict = config_dicts[0]
|
|
|
|
if config_dict.get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_TRIAL_DIRECTORY>"})')
|
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
|
|
|
setup_logging(session_dir)
|
|
|
|
|
|
|
|
# Batch process all trials
|
|
|
|
for config_dict in config_dicts:
|
|
|
|
start = time.time()
|
|
|
|
project_dir = os.path.realpath(config_dict.get('project').get('project_dir'))
|
|
|
|
seq_name = os.path.basename(project_dir)
|
|
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
|
|
frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0]
|
|
|
|
|
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
|
|
|
logging.info("Camera synchronization")
|
|
|
|
logging.info("---------------------------------------------------------------------")
|
|
|
|
logging.info(f"\nProject directory: {project_dir}")
|
|
|
|
|
|
|
|
synchronize_cams_all(config_dict)
|
2023-09-18 02:48:13 +08:00
|
|
|
|
2023-12-18 00:24:09 +08:00
|
|
|
end = time.time()
|
|
|
|
logging.info(f'Synchronization took {end-start:.2f} s.')
|
2023-09-18 02:48:13 +08:00
|
|
|
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
def personAssociation(config=None):
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
2023-12-17 05:15:12 +08:00
|
|
|
Tracking one or several persons of interest.
|
2023-07-19 17:37:20 +08:00
|
|
|
Needs a calibration file.
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
config can be a dictionary,
|
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
|
|
|
|
|
|
|
from Pose2Sim.personAssociation import track_2d_all
|
2023-12-17 21:29:22 +08:00
|
|
|
|
|
|
|
# Determine the level at which the function is called (session:3, participant:2, trial:1)
|
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
if type(config)==dict:
|
2023-12-17 21:29:22 +08:00
|
|
|
config_dict = config_dicts[0]
|
2023-12-09 19:53:43 +08:00
|
|
|
if config_dict.get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
2023-12-17 21:29:22 +08:00
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_TRIAL_DIRECTORY>"})')
|
2023-12-09 19:53:43 +08:00
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
|
|
|
setup_logging(session_dir)
|
|
|
|
|
|
|
|
# Batch process all trials
|
2023-12-17 05:15:12 +08:00
|
|
|
for config_dict in config_dicts:
|
2023-12-18 00:24:09 +08:00
|
|
|
start = time.time()
|
2023-12-17 05:15:12 +08:00
|
|
|
project_dir = os.path.realpath(config_dict.get('project').get('project_dir'))
|
|
|
|
seq_name = os.path.basename(project_dir)
|
|
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
|
|
frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0]
|
2023-12-17 05:19:58 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
|
|
|
logging.info(f"Associating persons for {seq_name}, for {frames}.")
|
|
|
|
logging.info("---------------------------------------------------------------------")
|
|
|
|
logging.info(f"\nProject directory: {project_dir}")
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
track_2d_all(config_dict)
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-18 00:24:09 +08:00
|
|
|
end = time.time()
|
|
|
|
logging.info(f'Associating persons took {end-start:.2f} s.')
|
2023-07-19 17:37:20 +08:00
|
|
|
|
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
def triangulation(config=None):
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
|
|
|
Robust triangulation of 2D points coordinates.
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
config can be a dictionary,
|
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
|
|
|
|
|
|
|
from Pose2Sim.triangulation import triangulate_all
|
|
|
|
|
2023-12-17 21:29:22 +08:00
|
|
|
# Determine the level at which the function is called (session:3, participant:2, trial:1)
|
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
if type(config)==dict:
|
2023-12-17 21:29:22 +08:00
|
|
|
config_dict = config_dicts[0]
|
2023-12-17 05:15:12 +08:00
|
|
|
if config_dict.get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
2023-12-17 21:29:22 +08:00
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_TRIAL_DIRECTORY>"})')
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
2023-12-17 21:29:22 +08:00
|
|
|
setup_logging(session_dir)
|
2023-12-17 05:15:12 +08:00
|
|
|
|
|
|
|
# Batch process all trials
|
|
|
|
for config_dict in config_dicts:
|
2023-12-18 00:24:09 +08:00
|
|
|
start = time.time()
|
2023-12-17 05:15:12 +08:00
|
|
|
project_dir = os.path.realpath(config_dict.get('project').get('project_dir'))
|
|
|
|
seq_name = os.path.basename(project_dir)
|
|
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
|
|
frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0]
|
|
|
|
|
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
|
|
|
logging.info(f"Triangulation of 2D points for {seq_name}, for {frames}.")
|
|
|
|
logging.info("---------------------------------------------------------------------")
|
|
|
|
logging.info(f"\nProject directory: {project_dir}")
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
triangulate_all(config_dict)
|
2023-07-19 17:37:20 +08:00
|
|
|
|
|
|
|
end = time.time()
|
2023-12-17 21:29:22 +08:00
|
|
|
logging.info(f'Triangulation took {end-start:.2f} s.')
|
2023-07-19 17:37:20 +08:00
|
|
|
|
|
|
|
|
2023-12-08 16:39:34 +08:00
|
|
|
def filtering(config=None):
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
|
|
|
Filter trc 3D coordinates.
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
config can be a dictionary,
|
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-07-19 17:37:20 +08:00
|
|
|
'''
|
|
|
|
|
|
|
|
from Pose2Sim.filtering import filter_all
|
|
|
|
|
2023-12-17 21:29:22 +08:00
|
|
|
# Determine the level at which the function is called (session:3, participant:2, trial:1)
|
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
if type(config)==dict:
|
2023-12-17 21:29:22 +08:00
|
|
|
config_dict = config_dicts[0]
|
2023-12-17 05:15:12 +08:00
|
|
|
if config_dict.get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
2023-12-17 21:29:22 +08:00
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_TRIAL_DIRECTORY>"})')
|
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
|
|
|
setup_logging(session_dir)
|
2023-12-17 05:15:12 +08:00
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
|
|
|
setup_logging(session_dir)
|
|
|
|
|
|
|
|
# Batch process all trials
|
|
|
|
for config_dict in config_dicts:
|
|
|
|
project_dir = os.path.realpath(config_dict.get('project').get('project_dir'))
|
|
|
|
seq_name = os.path.basename(project_dir)
|
|
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
|
|
frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0]
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
|
|
|
logging.info(f"Filtering 3D coordinates for {seq_name}, for {frames}.")
|
|
|
|
logging.info("---------------------------------------------------------------------")
|
|
|
|
logging.info(f"\nProject directory: {project_dir}")
|
2023-07-19 17:37:20 +08:00
|
|
|
|
2023-12-17 05:19:58 +08:00
|
|
|
filter_all(config_dict)
|
2023-09-20 20:39:40 +08:00
|
|
|
|
|
|
|
|
2023-12-18 00:24:09 +08:00
|
|
|
def opensimProcessing(config=None):
|
2023-09-20 20:39:40 +08:00
|
|
|
'''
|
2023-12-18 00:24:09 +08:00
|
|
|
Uses OpenSim to run scaling based on a static trc pose
|
|
|
|
and inverse kinematics based on a trc motion file.
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-17 05:15:12 +08:00
|
|
|
config can be a dictionary,
|
|
|
|
or a the directory path of a trial, participant, or session,
|
|
|
|
or the function can be called without an argument, in which case it the config directory is the current one.
|
2023-09-20 20:39:40 +08:00
|
|
|
'''
|
|
|
|
|
|
|
|
raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation')
|
|
|
|
|
|
|
|
# TODO
|
2023-12-18 00:24:09 +08:00
|
|
|
from Pose2Sim.opensimProcessing import opensim_processing_all
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-18 00:24:09 +08:00
|
|
|
# Determine the level at which the function is called (session:3, participant:2, trial:1)
|
|
|
|
level, config_dicts = read_config_files(config)
|
|
|
|
|
2023-09-20 20:39:40 +08:00
|
|
|
if type(config)==dict:
|
2023-12-18 00:24:09 +08:00
|
|
|
config_dict = config_dicts[0]
|
|
|
|
if config_dict.get('project').get('project_dir') == None:
|
|
|
|
raise ValueError('Please specify the project directory in config_dict:\n \
|
|
|
|
config_dict.get("project").update({"project_dir":"<YOUR_TRIAL_DIRECTORY>"})')
|
|
|
|
|
|
|
|
# Set up logging
|
|
|
|
session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..'))
|
|
|
|
setup_logging(session_dir)
|
|
|
|
|
|
|
|
# Batch process all trials
|
|
|
|
for config_dict in config_dicts:
|
|
|
|
start = time.time()
|
|
|
|
project_dir = os.path.realpath(config_dict.get('project').get('project_dir'))
|
|
|
|
seq_name = os.path.basename(project_dir)
|
|
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
|
|
frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0]
|
|
|
|
|
|
|
|
logging.info("\n\n---------------------------------------------------------------------")
|
|
|
|
# if static_file in project_dir:
|
|
|
|
# logging.info(f"Scaling model with <STATIC TRC FILE>.")
|
|
|
|
# else:
|
|
|
|
# logging.info(f"Running inverse kinematics <MOTION TRC FILE>.")
|
|
|
|
logging.info("---------------------------------------------------------------------")
|
|
|
|
logging.info(f"\nOpenSim output directory: {project_dir}")
|
|
|
|
|
|
|
|
opensim_processing_all(config_dict)
|
2023-09-20 20:39:40 +08:00
|
|
|
|
2023-12-18 00:24:09 +08:00
|
|
|
end = time.time()
|
|
|
|
logging.info(f'Model scaling took {end-start:.2f} s.')
|
|
|
|
|