#!/usr/bin/env python # -*- coding: utf-8 -*- ''' ########################################################################### ## POSE2SIM ## ########################################################################### This repository offers a way to perform markerless kinematics, and gives an example workflow from an Openpose input to an OpenSim result. It offers tools for: - 2D pose estimation, - Cameras calibration, - Tracking the person of interest, - Robust triangulation, - Filtration. It has been tested on Windows 10 but should work similarly on Linux. Please subscribe to this issue if you wish to be notified of the code release. See https://github.com/perfanalytics/pose2sim Installation: # Open Anaconda prompt. Type: # - conda create -n Pose2Sim python=3.7 tensorflow-gpu=1.13.1 # - conda activate Pose2Sim # - conda install Pose2Sim Usage: # First run Pose estimation and organize your directories (see Readme.md) from Pose2Sim import Pose2Sim Pose2Sim.calibration() Pose2Sim.personAssociation() Pose2Sim.triangulation() Pose2Sim.filtering() # Then run OpenSim (see Readme.md) ''' ## INIT import toml import os import time from copy import deepcopy import logging, logging.handlers ## AUTHORSHIP INFORMATION __author__ = "David Pagnon" __copyright__ = "Copyright 2021, Pose2Sim" __credits__ = ["David Pagnon"] __license__ = "BSD 3-Clause License" __version__ = "0.4" __maintainer__ = "David Pagnon" __email__ = "contact@david-pagnon.com" __status__ = "Development" ## FUNCTIONS def setup_logging(session_dir): ''' Create logging file and stream handlers ''' with open(os.path.join(session_dir, 'logs.txt'), 'a+') as log_f: pass logging.basicConfig(format='%(message)s', level=logging.INFO, handlers = [logging.handlers.TimedRotatingFileHandler(os.path.join(session_dir, 'logs.txt'), when='D', interval=7), logging.StreamHandler()]) def recursive_update(dict_to_update, dict_with_new_values): ''' Update nested dictionaries without overwriting existing keys in any level of nesting Example: dict_to_update = {'key': {'key_1': 'val_1', 'key_2': 'val_2'}} dict_with_new_values = {'key': {'key_1': 'val_1_new'}} returns {'key': {'key_1': 'val_1_new', 'key_2': 'val_2'}} while dict_to_update.update(dict_with_new_values) would return {'key': {'key_1': 'val_1_new'}} ''' for key, value in dict_with_new_values.items(): if key in dict_to_update and isinstance(value, dict) and isinstance(dict_to_update[key], dict): # Recursively update nested dictionaries dict_to_update[key] = recursive_update(dict_to_update[key], value) else: # Update or add new key-value pairs dict_to_update[key] = value return dict_to_update def determine_level(config_dir): ''' Determine the level at which the function is called. Level = 1: Trial folder Level = 2: Participant folder Level = 3: Session folder ''' len_paths = [len(root.split(os.sep)) for root,dirs,files in os.walk(config_dir) if 'Config.toml' in files] level = max(len_paths) - min(len_paths) + 1 return level def read_config_files(config): ''' Read Session, Participant, and Trial configuration files, and output a dictionary with all the parameters. ''' if type(config)==dict: level = 3 # log_dir = os.getcwd() config_dicts = [config] if config_dicts[0].get('project').get('project_dir') == None: raise ValueError('Please specify the project directory in config_dict:\n \ config_dict.get("project").update({"project_dir":""})') else: # if launched without an argument, config == None, else it is the path to the config directory config_dir = ['.' if config == None else config][0] level = determine_level(config_dir) # Trial level if level == 1: session_config_dict = toml.load(os.path.join(config_dir, '..','..','Config.toml')) participant_config_dict = toml.load(os.path.join(config_dir, '..','Config.toml')) trial_config_dict = toml.load(os.path.join(config_dir, 'Config.toml')) session_config_dict = recursive_update(session_config_dict,participant_config_dict) session_config_dict = recursive_update(session_config_dict,trial_config_dict) session_config_dict.get("project").update({"project_dir":config_dir}) config_dicts = [session_config_dict] # Participant level if level == 2: session_config_dict = toml.load(os.path.join(config_dir, '..','Config.toml')) participant_config_dict = toml.load(os.path.join(config_dir, 'Config.toml')) config_dicts = [] # Create config dictionaries for all trials of the participant for (root,dirs,files) in os.walk(config_dir): if 'Config.toml' in files and root != config_dir: trial_config_dict = toml.load(os.path.join(root, files[0])) # deep copy, otherwise session_config_dict is modified at each iteration within the config_dicts list temp_dict = deepcopy(session_config_dict) temp_dict = recursive_update(temp_dict,participant_config_dict) temp_dict = recursive_update(temp_dict,trial_config_dict) temp_dict.get("project").update({"project_dir":os.path.join(config_dir, os.path.relpath(root))}) if not os.path.basename(root) in temp_dict.get("project").get('exclude_from_batch'): config_dicts.append(temp_dict) # Session level if level == 3: session_config_dict = toml.load(os.path.join(config_dir, 'Config.toml')) config_dicts = [] # Create config dictionaries for all trials of all participants of the session for (root,dirs,files) in os.walk(config_dir): if 'Config.toml' in files and root != config_dir: # participant if determine_level(root) == 2: participant_config_dict = toml.load(os.path.join(root, files[0])) # trial elif determine_level(root) == 1: trial_config_dict = toml.load(os.path.join(root, files[0])) # deep copy, otherwise session_config_dict is modified at each iteration within the config_dicts list temp_dict = deepcopy(session_config_dict) temp_dict = recursive_update(temp_dict,participant_config_dict) temp_dict = recursive_update(temp_dict,trial_config_dict) temp_dict.get("project").update({"project_dir":os.path.join(config_dir, os.path.relpath(root))}) if not os.path.relpath(root) in [os.path.relpath(p) for p in temp_dict.get("project").get('exclude_from_batch')]: config_dicts.append(temp_dict) return level, config_dicts def calibration(config=None): ''' Cameras calibration from checkerboards or from qualisys files. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' from Pose2Sim.calibration import calibrate_cams_all level, config_dicts = read_config_files(config) config_dict = config_dicts[0] session_dir = os.path.realpath([os.getcwd() if level==3 else os.path.join(os.getcwd(), '..') if level==2 else os.path.join(os.getcwd(), '..', '..')][0]) config_dict.get("project").update({"project_dir":session_dir}) # Set up logging setup_logging(session_dir) # Run calibration calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if ('Calib' or 'calib') in c][0] logging.info("\n\n---------------------------------------------------------------------") logging.info("Camera calibration") logging.info("---------------------------------------------------------------------") logging.info(f"\nCalibration directory: {calib_dir}") start = time.time() calibrate_cams_all(config_dict) end = time.time() logging.info(f'Calibration took {end-start:.2f} s.') def poseEstimation(config=None): ''' Estimate pose using BlazePose, OpenPose, AlphaPose, or DeepLabCut. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation') # TODO from Pose2Sim.poseEstimation import pose_estimation_all if type(config)==dict: config_dict = config else: config_dict = read_config_files(config) project_dir, seq_name, frames = base_params(config_dict) logging.info("\n\n---------------------------------------------------------------------") logging.info("Pose estimation") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") start = time.time() pose_estimation_all(config_dict) end = time.time() logging.info(f'Pose estimation took {end-start:.2f} s.') def synchronization(config=None): ''' Synchronize cameras if needed. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation') #TODO from Pose2Sim.synchronization import synchronize_cams_all if type(config)==dict: config_dict = config else: config_dict = read_config_files(config) project_dir, seq_name, frames = base_params(config_dict) logging.info("\n\n---------------------------------------------------------------------") logging.info("Camera synchronization") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") start = time.time() synchronize_cams_all(config_dict) end = time.time() logging.info(f'Synchronization took {end-start:.2f} s.') def personAssociation(config=None): ''' Tracking one or several persons of interest. Needs a calibration file. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' from Pose2Sim.personAssociation import track_2d_all # Determine the level at which the function is called (session:3, participant:2, trial:1) level, config_dicts = read_config_files(config) if type(config)==dict: config_dict = config_dicts[0] if config_dict.get('project').get('project_dir') == None: raise ValueError('Please specify the project directory in config_dict:\n \ config_dict.get("project").update({"project_dir":""})') # Set up logging session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..')) setup_logging(session_dir) # Batch process all trials start = time.time() for config_dict in config_dicts: project_dir = os.path.realpath(config_dict.get('project').get('project_dir')) seq_name = os.path.basename(project_dir) frame_range = config_dict.get('project').get('frame_range') frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0] logging.info("\n\n---------------------------------------------------------------------") logging.info(f"Associating persons for {seq_name}, for {frames}.") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") track_2d_all(config_dict) end = time.time() logging.info(f'Associating persons took {end-start:.2f} s.') def triangulation(config=None): ''' Robust triangulation of 2D points coordinates. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' from Pose2Sim.triangulation import triangulate_all # Determine the level at which the function is called (session:3, participant:2, trial:1) level, config_dicts = read_config_files(config) if type(config)==dict: config_dict = config_dicts[0] if config_dict.get('project').get('project_dir') == None: raise ValueError('Please specify the project directory in config_dict:\n \ config_dict.get("project").update({"project_dir":""})') # Set up logging session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..')) setup_logging(session_dir) # Batch process all trials start = time.time() for config_dict in config_dicts: project_dir = os.path.realpath(config_dict.get('project').get('project_dir')) seq_name = os.path.basename(project_dir) frame_range = config_dict.get('project').get('frame_range') frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0] logging.info("\n\n---------------------------------------------------------------------") logging.info(f"Triangulation of 2D points for {seq_name}, for {frames}.") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") triangulate_all(config_dict) end = time.time() logging.info(f'Triangulation took {end-start:.2f} s.') def filtering(config=None): ''' Filter trc 3D coordinates. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' from Pose2Sim.filtering import filter_all # Determine the level at which the function is called (session:3, participant:2, trial:1) level, config_dicts = read_config_files(config) if type(config)==dict: config_dict = config_dicts[0] if config_dict.get('project').get('project_dir') == None: raise ValueError('Please specify the project directory in config_dict:\n \ config_dict.get("project").update({"project_dir":""})') # Set up logging session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..')) setup_logging(session_dir) # Set up logging session_dir = os.path.realpath(os.path.join(config_dicts[0].get('project').get('project_dir'), '..', '..')) setup_logging(session_dir) # Batch process all trials for config_dict in config_dicts: project_dir = os.path.realpath(config_dict.get('project').get('project_dir')) seq_name = os.path.basename(project_dir) frame_range = config_dict.get('project').get('frame_range') frames = ["all frames" if frame_range == [] else f"frames {frame_range[0]} to {frame_range[1]}"][0] logging.info("\n\n---------------------------------------------------------------------") logging.info(f"Filtering 3D coordinates for {seq_name}, for {frames}.") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") filter_all(config_dict) def scalingModel(config=None): ''' Uses OpenSim to scale a model based on a static 3D pose. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation') # TODO from Pose2Sim.scalingModel import scale_model_all if type(config)==dict: config_dict = config else: config_dict = read_config_files(config) project_dir, seq_name, frames = base_params(config_dict) logging.info("\n\n---------------------------------------------------------------------") logging.info("Scaling model") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") start = time.time() scale_model_all(config_dict) end = time.time() logging.info(f'Model scaling took {end-start:.2f} s.') def inverseKinematics(config=None): ''' Uses OpenSim to perform inverse kinematics. config can be a dictionary, or a the directory path of a trial, participant, or session, or the function can be called without an argument, in which case it the config directory is the current one. ''' raise NotImplementedError('This has not been integrated yet. \nPlease read README.md for further explanation') # TODO from Pose2Sim.inverseKinematics import inverse_kinematics_all if type(config)==dict: config_dict = config else: config_dict = read_config_files(config) project_dir, seq_name, frames = base_params(config_dict) logging.info("\n\n---------------------------------------------------------------------") logging.info("Inverse kinematics") logging.info("---------------------------------------------------------------------") logging.info(f"\nProject directory: {project_dir}") start = time.time() inverse_kinematics_all(config_dict) end = time.time() logging.info(f'Inverse kinematics took {end-start:.2f} s.')