Update Augmenter

This commit is contained in:
HunMinKim 2024-01-09 20:15:12 +09:00 committed by GitHub
parent 6d2013efee
commit 90cb9c9142
No known key found for this signature in database
3 changed files with 685 additions and 0 deletions

Pose2Sim/augmenter.py Normal file
View File

@ -0,0 +1,213 @@
import os
import numpy as np
from . import utilsDataman
import copy
import tensorflow as tf
from .utils import TRC2numpy
import json
import os
import glob
# subject_height must be in meters
def get_midhip_data(trc_file):
# OpenPose의 MidHip 데이터를 시도하여 찾습니다.
midhip_data = trc_file.marker("CHip")
if midhip_data is None or len(midhip_data) == 0:
raise ValueError("MidHip data is empty")
except (KeyError, ValueError):
# MidHip 데이터가 없는 경우, RHip과 LHip의 평균을 사용합니다.
rhip_data = trc_file.marker("RHip")
lhip_data = trc_file.marker("LHip")
midhip_data = (rhip_data + lhip_data) / 2
return midhip_data
def augmentTRC(config_dict):
# get parameters from Config.toml
project_dir = config_dict.get('project').get('project_dir')
session_dir = os.path.realpath(os.path.join(project_dir, '..', '..'))
pathInputTRCFile = os.path.realpath(os.path.join(project_dir, 'pose-3d'))
pathOutputTRCFile = os.path.realpath(os.path.join(project_dir, 'pose-3d'))
subject_height = config_dict.get('project').get('participant_height')
if subject_height is None or subject_height == 0:
raise ValueError("Subject height is not set or invalid in the config file.")
subject_mass = config_dict.get('project').get('participant_mass')
augmenterDir = os.path.join(session_dir, 'MarkerAugmenter')
augmenterModelName = config_dict.get('BODY_25_AUGMENTED').get('ModelName')
augmenter_model = config_dict.get('BODY_25_AUGMENTED').get('model')
offset = config_dict.get('BODY_25_AUGMENTED').get('offset')
# Apply all trc files
trc_files = [f for f in glob.glob(os.path.join(pathInputTRCFile, '*.trc')) if '_LSTM' not in f]
for pathInputTRCFile in trc_files:
pathOutputTRCFile = os.path.splitext(pathInputTRCFile)[0] + "_LSTM.trc"
# This is by default - might need to be adjusted in the future.
featureHeight = True
featureWeight = True
# Augmenter types
if augmenter_model == 'v0.0':
from .utils import getOpenPoseMarkers_fullBody
feature_markers_full, response_markers_full = getOpenPoseMarkers_fullBody()
augmenterModelType_all = [augmenter_model]
feature_markers_all = [feature_markers_full]
response_markers_all = [response_markers_full]
elif augmenter_model == 'v0.1' or augmenter_model == 'v0.2':
# Lower body
augmenterModelType_lower = '{}_lower'.format(augmenter_model)
from .utils import getOpenPoseMarkers_lowerExtremity
feature_markers_lower, response_markers_lower = getOpenPoseMarkers_lowerExtremity()
# Upper body
augmenterModelType_upper = '{}_upper'.format(augmenter_model)
from .utils import getMarkers_upperExtremity_noPelvis
feature_markers_upper, response_markers_upper = getMarkers_upperExtremity_noPelvis()
augmenterModelType_all = [augmenterModelType_lower, augmenterModelType_upper]
feature_markers_all = [feature_markers_lower, feature_markers_upper]
response_markers_all = [response_markers_lower, response_markers_upper]
# Lower body
augmenterModelType_lower = '{}_lower'.format(augmenter_model)
from .utils import getOpenPoseMarkers_lowerExtremity2
feature_markers_lower, response_markers_lower = getOpenPoseMarkers_lowerExtremity2()
# Upper body
augmenterModelType_upper = '{}_upper'.format(augmenter_model)
from .utils import getMarkers_upperExtremity_noPelvis2
feature_markers_upper, response_markers_upper = getMarkers_upperExtremity_noPelvis2()
augmenterModelType_all = [augmenterModelType_lower, augmenterModelType_upper]
feature_markers_all = [feature_markers_lower, feature_markers_upper]
response_markers_all = [response_markers_lower, response_markers_upper]
print('Using augmenter model: {}'.format(augmenter_model))
# %% Process data.
# Import TRC file
trc_file = utilsDataman.TRCFile(pathInputTRCFile)
# Loop over augmenter types to handle separate augmenters for lower and
# upper bodies.
outputs_all = {}
n_response_markers_all = 0
for idx_augm, augmenterModelType in enumerate(augmenterModelType_all):
outputs_all[idx_augm] = {}
feature_markers = feature_markers_all[idx_augm]
response_markers = response_markers_all[idx_augm]
augmenterModelDir = os.path.join(augmenterDir, augmenterModelName,
# %% Pre-process inputs.
# Step 1: import .trc file with OpenPose marker trajectories.
trc_data = TRC2numpy(pathInputTRCFile, feature_markers)
# # Add these lines to get RHip and LHip data and calculate midHip
# rhip_data = trc_file.marker("RHip") # Replace "RHip" with the actual name in your trc file
# lhip_data = trc_file.marker("LHip") # Replace "LHip" with the actual name in your trc file
# Calculate the midHip marker as the average of RHip and LHip
midhip_data = get_midhip_data(trc_file)
trc_data_data = trc_data[:,1:]
# Step 2: Normalize with reference marker position.
with open(os.path.join(augmenterModelDir, "metadata.json"), 'r') as f:
metadata = json.load(f)
# Use midhip_data as the reference marker data
referenceMarker_data = midhip_data # instead of trc_file.marker(referenceMarker)
norm_trc_data_data = np.zeros((trc_data_data.shape[0],
for i in range(0,trc_data_data.shape[1],3):
norm_trc_data_data[:,i:i+3] = (trc_data_data[:,i:i+3] -
# Step 3: Normalize with subject's height.
norm2_trc_data_data = copy.deepcopy(norm_trc_data_data)
norm2_trc_data_data = norm2_trc_data_data / subject_height
# Step 4: Add remaining features.
inputs = copy.deepcopy(norm2_trc_data_data)
if featureHeight:
inputs = np.concatenate(
(inputs, subject_height*np.ones((inputs.shape[0],1))), axis=1)
if featureWeight:
inputs = np.concatenate(
(inputs, subject_mass*np.ones((inputs.shape[0],1))), axis=1)
# Step 5: Pre-process data
pathMean = os.path.join(augmenterModelDir, "mean.npy")
pathSTD = os.path.join(augmenterModelDir, "std.npy")
if os.path.isfile(pathMean):
trainFeatures_mean = np.load(pathMean, allow_pickle=True)
inputs -= trainFeatures_mean
if os.path.isfile(pathSTD):
trainFeatures_std = np.load(pathSTD, allow_pickle=True)
inputs /= trainFeatures_std
# Step 6: Reshape inputs if necessary (eg, LSTM)
if augmenterModelName == "LSTM":
inputs = np.reshape(inputs, (1, inputs.shape[0], inputs.shape[1]))
# %% Load model and weights, and predict outputs.
json_file = open(os.path.join(augmenterModelDir, "model.json"), 'r')
pretrainedModel_json = json_file.read()
model = tf.keras.models.model_from_json(pretrainedModel_json)
model.load_weights(os.path.join(augmenterModelDir, "weights.h5"))
outputs = model.predict(inputs)
# %% Post-process outputs.
# Step 1: Reshape if necessary (eg, LSTM)
if augmenterModelName == "LSTM":
outputs = np.reshape(outputs, (outputs.shape[1], outputs.shape[2]))
# Step 2: Un-normalize with subject's height.
unnorm_outputs = outputs * subject_height
# Step 2: Un-normalize with reference marker position.
unnorm2_outputs = np.zeros((unnorm_outputs.shape[0],
for i in range(0,unnorm_outputs.shape[1],3):
unnorm2_outputs[:,i:i+3] = (unnorm_outputs[:,i:i+3] +
# %% Add markers to .trc file.
for c, marker in enumerate(response_markers):
x = unnorm2_outputs[:,c*3]
y = unnorm2_outputs[:,c*3+1]
z = unnorm2_outputs[:,c*3+2]
trc_file.add_marker(marker, x, y, z)
# %% Gather data for computing minimum y-position.
outputs_all[idx_augm]['response_markers'] = response_markers
outputs_all[idx_augm]['response_data'] = unnorm2_outputs
n_response_markers_all += len(response_markers)
# %% Extract minimum y-position across response markers. This is used
# to align feet and floor when visualizing.
responses_all_conc = np.zeros((unnorm2_outputs.shape[0],
idx_acc_res = 0
for idx_augm in outputs_all:
idx_acc_res_end = (idx_acc_res +
responses_all_conc[:,idx_acc_res:idx_acc_res_end] = (
idx_acc_res = idx_acc_res_end
# Minimum y-position across response markers.
min_y_pos = np.min(responses_all_conc[:,1::3])
# %% If offset
if offset:
trc_file.offset('y', -(min_y_pos-0.01))
# %% Return augmented .trc file
return min_y_pos

Pose2Sim/utils.py Normal file
View File

@ -0,0 +1,190 @@
from . import utilsDataman
import numpy as np
def TRC2numpy(pathFile, markers,rotation=None):
# rotation is a dict, eg. {'y':90} with axis, angle for rotation
trc_file = utilsDataman.TRCFile(pathFile)
time = trc_file.time
num_frames = time.shape[0]
data = np.zeros((num_frames, len(markers)*3))
if rotation != None:
for axis,angle in rotation.items():
for count, marker in enumerate(markers):
data[:,3*count:3*count+3] = trc_file.marker(marker)
this_dat = np.empty((num_frames, 1))
this_dat[:, 0] = time
data_out = np.concatenate((this_dat, data), axis=1)
return data_out
def numpy2TRC(f, data, headers, fc=50.0, t_start=0.0, units="m"):
header_mapping = {}
for count, header in enumerate(headers):
header_mapping[count+1] = header
# Line 1.
f.write('PathFileType 4\t(X/Y/Z) %s\n' % os.getcwd())
# Line 2.
# Line 3.
f.write('%.1f\t%.1f\t%i\t%i\t%s\t%.1f\t%i\t%i\n' % (
fc, fc, num_frames,
num_markers, units, fc,
1, num_frames))
# Line 4.
for key in sorted(header_mapping.keys()):
f.write("%s\t\t\t" % format(header_mapping[key]))
# Line 5.
for imark in np.arange(num_markers) + 1:
f.write('X%i\tY%s\tZ%s\t' % (imark, imark, imark))
# Line 6.
for frame in range(data.shape[0]):
f.write("{}\t{:.8f}\t".format(frame+1,(frame)/fc+t_start)) # opensim frame labeling is 1 indexed
for key in sorted(header_mapping.keys()):
f.write("{:.5f}\t{:.5f}\t{:.5f}\t".format(data[frame,0+(key-1)*3], data[frame,1+(key-1)*3], data[frame,2+(key-1)*3]))
def getOpenPoseMarkerNames():
markerNames = ["Nose", "Neck", "RShoulder", "RElbow", "RWrist",
"LShoulder", "LElbow", "LWrist", "midHip", "RHip",
"RKnee", "RAnkle", "LHip", "LKnee", "LAnkle", "REye",
"LEye", "REar", "LEar", "LBigToe", "LSmallToe",
"LHeel", "RBigToe", "RSmallToe", "RHeel"]
return markerNames
def getOpenPoseFaceMarkers():
faceMarkerNames = ['Nose', 'REye', 'LEye', 'REar', 'LEar']
markerNames = getOpenPoseMarkerNames()
idxFaceMarkers = [markerNames.index(i) for i in faceMarkerNames]
return faceMarkerNames, idxFaceMarkers
def getOpenPoseMarkers_fullBody():
feature_markers = [
"Neck", "RShoulder", "LShoulder", "RHip", "LHip", "RKnee", "LKnee",
"RAnkle", "LAnkle", "RHeel", "LHeel", "RSmallToe", "LSmallToe",
"RBigToe", "LBigToe", "RElbow", "LElbow", "RWrist", "LWrist"]
response_markers = ["C7_study", "r_shoulder_study", "L_shoulder_study",
"r.ASIS_study", "L.ASIS_study", "r.PSIS_study",
"L.PSIS_study", "r_knee_study", "L_knee_study",
"r_mknee_study", "L_mknee_study", "r_ankle_study",
"L_ankle_study", "r_mankle_study", "L_mankle_study",
"r_calc_study", "L_calc_study", "r_toe_study",
"L_toe_study", "r_5meta_study", "L_5meta_study",
"r_lelbow_study", "L_lelbow_study", "r_melbow_study",
"L_melbow_study", "r_lwrist_study", "L_lwrist_study",
"r_mwrist_study", "L_mwrist_study",
"r_thigh1_study", "r_thigh2_study", "r_thigh3_study",
"L_thigh1_study", "L_thigh2_study", "L_thigh3_study",
"r_sh1_study", "r_sh2_study", "r_sh3_study",
"L_sh1_study", "L_sh2_study", "L_sh3_study",
"RHJC_study", "LHJC_study"]
return feature_markers, response_markers
def getOpenPoseMarkers_lowerExtremity():
feature_markers = [
"Neck", "RShoulder", "LShoulder", "RHip", "LHip", "RKnee", "LKnee",
"RAnkle", "LAnkle", "RHeel", "LHeel", "RSmallToe", "LSmallToe",
"RBigToe", "LBigToe"]
response_markers = ["C7_study", "r_shoulder_study", "L_shoulder_study",
"r.ASIS_study", "L.ASIS_study", "r.PSIS_study",
"L.PSIS_study", "r_knee_study", "L_knee_study",
"r_mknee_study", "L_mknee_study", "r_ankle_study",
"L_ankle_study", "r_mankle_study", "L_mankle_study",
"r_calc_study", "L_calc_study", "r_toe_study",
"L_toe_study", "r_5meta_study", "L_5meta_study",
"r_thigh1_study", "r_thigh2_study", "r_thigh3_study",
"L_thigh1_study", "L_thigh2_study", "L_thigh3_study",
"r_sh1_study", "r_sh2_study", "r_sh3_study",
"L_sh1_study", "L_sh2_study", "L_sh3_study",
"RHJC_study", "LHJC_study"]
return feature_markers, response_markers
# Different order of markers compared to getOpenPoseMarkers_lowerExtremity
def getOpenPoseMarkers_lowerExtremity2():
feature_markers = [
"Neck", "RShoulder", "LShoulder", "RHip", "LHip", "RKnee", "LKnee",
"RAnkle", "LAnkle", "RHeel", "LHeel", "RSmallToe", "LSmallToe",
"RBigToe", "LBigToe"]
response_markers = [
'r.ASIS_study', 'L.ASIS_study', 'r.PSIS_study',
'L.PSIS_study', 'r_knee_study', 'r_mknee_study',
'r_ankle_study', 'r_mankle_study', 'r_toe_study',
'r_5meta_study', 'r_calc_study', 'L_knee_study',
'L_mknee_study', 'L_ankle_study', 'L_mankle_study',
'L_toe_study', 'L_calc_study', 'L_5meta_study',
'r_shoulder_study', 'L_shoulder_study', 'C7_study',
'r_thigh1_study', 'r_thigh2_study', 'r_thigh3_study',
'L_thigh1_study', 'L_thigh2_study', 'L_thigh3_study',
'r_sh1_study', 'r_sh2_study', 'r_sh3_study', 'L_sh1_study',
'L_sh2_study', 'L_sh3_study', 'RHJC_study', 'LHJC_study']
return feature_markers, response_markers
def getMarkers_upperExtremity_pelvis():
feature_markers = [
"Neck", "RShoulder", "LShoulder", "RHip", "LHip", "RElbow", "LElbow",
"RWrist", "LWrist"]
response_markers = ["r_lelbow_study", "L_lelbow_study", "r_melbow_study",
"L_melbow_study", "r_lwrist_study", "L_lwrist_study",
"r_mwrist_study", "L_mwrist_study"]
return feature_markers, response_markers
def getMarkers_upperExtremity_noPelvis():
feature_markers = [
"Neck", "RShoulder", "LShoulder", "RElbow", "LElbow", "RWrist",
response_markers = ["r_lelbow_study", "L_lelbow_study", "r_melbow_study",
"L_melbow_study", "r_lwrist_study", "L_lwrist_study",
"r_mwrist_study", "L_mwrist_study"]
return feature_markers, response_markers
# Different order of markers compared to getMarkers_upperExtremity_noPelvis.
def getMarkers_upperExtremity_noPelvis2():
feature_markers = [
"Neck", "RShoulder", "LShoulder", "RElbow", "LElbow", "RWrist",
response_markers = ["r_lelbow_study", "r_melbow_study", "r_lwrist_study",
"r_mwrist_study", "L_lelbow_study", "L_melbow_study",
"L_lwrist_study", "L_mwrist_study"]
return feature_markers, response_markers

Pose2Sim/utilsDataman.py Normal file
View File

@ -0,0 +1,282 @@
"""Manages the movement and use of data files."""
import os
import warnings
from scipy.spatial.transform import Rotation as R
import numpy as np
from numpy.lib.recfunctions import append_fields
class TRCFile(object):
"""A plain-text file format for storing motion capture marker trajectories.
TRC stands for Track Row Column.
The metadata for the file is stored in attributes of this object.
for more information.
def __init__(self, fpath=None, **kwargs):
fpath : str
Valid file path to a TRC (.trc) file.
self.marker_names = []
if fpath != None:
for k, v in kwargs.items():
setattr(self, k, v)
def read_from_file(self, fpath):
# Read the header lines / metadata.
# ---------------------------------
# Split by any whitespace.
# TODO may cause issues with paths that have spaces in them.
f = open(fpath)
# These are lists of each entry on the first few lines.
first_line = f.readline().split()
# Skip the 2nd line.
third_line = f.readline().split()
fourth_line = f.readline().split()
# First line.
if len(first_line) > 3:
self.path = first_line[3]
self.path = ''
# Third line.
self.data_rate = float(third_line[0])
self.camera_rate = float(third_line[1])
self.num_frames = int(third_line[2])
self.num_markers = int(third_line[3])
self.units = third_line[4]
self.orig_data_rate = float(third_line[5])
self.orig_data_start_frame = int(third_line[6])
self.orig_num_frames = int(third_line[7])
# Marker names.
# The first and second column names are 'Frame#' and 'Time'.
self.marker_names = fourth_line[2:]
len_marker_names = len(self.marker_names)
if len_marker_names != self.num_markers:
warnings.warn('Header entry NumMarkers, %i, does not '
'match actual number of markers, %i. Changing '
'NumMarkers to match actual number.' % (
self.num_markers, len_marker_names))
self.num_markers = len_marker_names
# Load the actual data.
# ---------------------
col_names = ['frame_num', 'time']
# This naming convention comes from OpenSim's Inverse Kinematics tool,
# when it writes model marker locations.
for mark in self.marker_names:
col_names += [mark + '_tx', mark + '_ty', mark + '_tz']
dtype = {'names': col_names,
'formats': ['int'] + ['float64'] * (3 * self.num_markers + 1)}
usecols = [i for i in range(3 * self.num_markers + 1 + 1)]
self.data = np.loadtxt(fpath, delimiter='\t', skiprows=5, dtype=dtype,
self.time = self.data['time']
# Check the number of rows.
n_rows = self.time.shape[0]
if n_rows != self.num_frames:
warnings.warn('%s: Header entry NumFrames, %i, does not '
'match actual number of frames, %i, Changing '
'NumFrames to match actual number.' % (fpath,
self.num_frames, n_rows))
self.num_frames = n_rows
def __getitem__(self, key):
"""See `marker()`.
return self.marker(key)
def units(self):
return self.units
def time(self):
this_dat = np.empty((self.num_frames, 1))
this_dat[:, 0] = self.time
return this_dat
def marker(self, name):
"""The trajectory of marker `name`, given as a `self.num_frames` x 3
array. The order of the columns is x, y, z.
this_dat = np.empty((self.num_frames, 3))
this_dat[:, 0] = self.data[name + '_tx']
this_dat[:, 1] = self.data[name + '_ty']
this_dat[:, 2] = self.data[name + '_tz']
return this_dat
def add_marker(self, name, x, y, z):
"""Add a marker, with name `name` to the TRCFile.
name : str
Name of the marker; e.g., 'R.Hip'.
x, y, z: array_like
Coordinates of the marker trajectory. All 3 must have the same
if (len(x) != self.num_frames or len(y) != self.num_frames or len(z) !=
raise Exception('Length of data (%i, %i, %i) is not '
'NumFrames (%i).', len(x), len(y), len(z), self.num_frames)
self.marker_names += [name]
self.num_markers += 1
if not hasattr(self, 'data'):
self.data = np.array(x, dtype=[('%s_tx' % name, 'float64')])
self.data = append_fields(self.data,
['%s_t%s' % (name, s) for s in 'yz'],
[y, z], usemask=False)
self.data = append_fields(self.data,
['%s_t%s' % (name, s) for s in 'xyz'],
[x, y, z], usemask=False)
def marker_at(self, name, time):
x = np.interp(time, self.time, self.data[name + '_tx'])
y = np.interp(time, self.time, self.data[name + '_ty'])
z = np.interp(time, self.time, self.data[name + '_tz'])
return [x, y, z]
def marker_exists(self, name):
exists : bool
Is the marker in the TRCFile?
return name in self.marker_names
def write(self, fpath):
"""Write this TRCFile object to a TRC file.
fpath : str
Valid file path to which this TRCFile is saved.
f = open(fpath, 'w')
# Line 1.
f.write('PathFileType 4\t(X/Y/Z) %s\n' % os.path.split(fpath)[0])
# Line 2.
# Line 3.
f.write('%.1f\t%.1f\t%i\t%i\t%s\t%.1f\t%i\t%i\n' % (
self.data_rate, self.camera_rate, self.num_frames,
self.num_markers, self.units, self.orig_data_rate,
self.orig_data_start_frame, self.orig_num_frames))
# Line 4.
for imark in range(self.num_markers):
f.write('%s\t\t\t' % self.marker_names[imark])
# Line 5.
for imark in np.arange(self.num_markers) + 1:
f.write('X%i\tY%s\tZ%s\t' % (imark, imark, imark))
# Line 6.
# Data.
for iframe in range(self.num_frames):
f.write('%i' % (iframe + 1))
f.write('\t%.7f' % self.time[iframe])
for mark in self.marker_names:
idxs = [mark + '_tx', mark + '_ty', mark + '_tz']
f.write('\t%.7f\t%.7f\t%.7f' % tuple(
self.data[coln][iframe] for coln in idxs))
def add_noise(self, noise_width):
""" add random noise to each component of the marker trajectory
The noise mean will be zero, with the noise_width being the
standard deviation.
noise_width : int
for imarker in range(self.num_markers):
components = ['_tx', '_ty', '_tz']
for iComponent in range(3):
# generate noise
noise = np.random.normal(0, noise_width, self.num_frames)
# add noise to each component of marker data.
self.data[self.marker_names[imarker] + components[iComponent]] += noise
def rotate(self, axis, value):
""" rotate the data.
axis : rotation axis
value : angle in degree
for imarker in range(self.num_markers):
temp = np.zeros((self.num_frames, 3))
temp[:,0] = self.data[self.marker_names[imarker] + '_tx']
temp[:,1] = self.data[self.marker_names[imarker] + '_ty']
temp[:,2] = self.data[self.marker_names[imarker] + '_tz']
r = R.from_euler(axis, value, degrees=True)
temp_rot = r.apply(temp)
self.data[self.marker_names[imarker] + '_tx'] = temp_rot[:,0]
self.data[self.marker_names[imarker] + '_ty'] = temp_rot[:,1]
self.data[self.marker_names[imarker] + '_tz'] = temp_rot[:,2]
def offset(self, axis, value):
""" offset the data.
axis : rotation axis
value : offset in m
for imarker in range(self.num_markers):
if axis.lower() == 'x':
self.data[self.marker_names[imarker] + '_tx'] += value
elif axis.lower() == 'y':
self.data[self.marker_names[imarker] + '_ty'] += value
elif axis.lower() == 'z':
self.data[self.marker_names[imarker] + '_tz'] += value
raise ValueError("Axis not recognized")