2023-07-19 17:37:20 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
########################################################
## Run BlazePose and save coordinates ##
########################################################
Runs BlazePose ( Mediapipe ) on a video
Saves coordinates to OpenPose format ( json files ) or DeepLabCut format ( csv or h5 table )
Optionally displays and saves images with keypoints overlayed
N . B . : First install mediapipe : ` pip install mediapipe `
You may also need to install tables : ` pip install tables `
Usage :
2023-09-21 23:39:28 +08:00
python - m Blazepose_runsave - i input_file - - display - - save_images - - save_video - - to_csv - - to_h5 - - to_json - - model_complexity 2 - o output_folder
OR python - m Blazepose_runsave - i input_file - - display - - to_json - - save_images
OR python - m Blazepose_runsave - i input_file - dJs
2023-07-19 17:37:20 +08:00
OR from Pose2Sim . Utilities import Blazepose_runsave ; Blazepose_runsave . blazepose_detec_func ( input_file = r ' input_file ' , save_images = True , to_json = True , model_complexity = 2 )
'''
## INIT
import cv2
import mediapipe as mp
import os
import pandas as pd
import numpy as np
import json
import argparse
mp_drawing = mp . solutions . drawing_utils
mp_drawing_styles = mp . solutions . drawing_styles
mp_pose = mp . solutions . pose
## AUTHORSHIP INFORMATION
__author__ = " David Pagnon "
__copyright__ = " Copyright 2023, Pose2Sim "
__credits__ = [ " David Pagnon " ]
__license__ = " BSD 3-Clause License "
__version__ = ' 0.4 '
__maintainer__ = " David Pagnon "
__email__ = " contact@david-pagnon.com "
__status__ = " Development "
## FUNCTIONS
def save_to_csv_or_h5 ( kpt_list , output_folder , video_name , to_csv , to_h5 ) :
'''
Saves blazepose keypoint coordinates to csv or h5 file ,
in the DeepLabCut format .
INPUTS :
- kpt_list : List of lists of keypoints X and Y coordinates and likelihood , for each frame
- output_folder : Folder where to save the csv or h5 file
- video_name : Name of the video
- to_csv : Boolean , whether to save to csv
- to_h5 : Boolean , whether to save to h5
OUTPUTS :
- Creation of csv or h5 file in output_folder
'''
# Prepare dataframe file
scorer = [ ' DavidPagnon ' ] * len ( mp_pose . PoseLandmark ) * 3
individuals = [ ' person ' ] * len ( mp_pose . PoseLandmark ) * 3
bodyparts = [ [ p . name ] * 3 for p in mp_pose . PoseLandmark ]
bodyparts = [ item for sublist in bodyparts for item in sublist ]
coords = [ ' x ' , ' y ' , ' likelihood ' ] * len ( mp_pose . PoseLandmark )
tuples = list ( zip ( scorer , individuals , bodyparts , coords ) )
index_csv = pd . MultiIndex . from_tuples ( tuples , names = [ ' scorer ' , ' individuals ' , ' bodyparts ' , ' coords ' ] )
df = pd . DataFrame ( np . array ( kpt_list ) . T , index = index_csv ) . T
if to_csv :
csv_file = os . path . join ( output_folder , video_name + ' .csv ' )
df . to_csv ( csv_file , sep = ' , ' , index = True , line_terminator = ' \n ' )
if to_h5 :
h5_file = os . path . join ( output_folder , video_name + ' .h5 ' )
df . to_hdf ( h5_file , index = True , key = ' blazepose_detection ' )
def save_to_json ( kpt_list , output_folder , video_name ) :
'''
Saves blazepose keypoint coordinates to json file ,
in the OpenPose format .
INPUTS :
- kpt_list : List of lists of keypoints X and Y coordinates and likelihood , for each frame
- output_folder : Folder where to save the csv or h5 file
- video_name : Name of the video
OUTPUTS :
- Creation of json files in output_folder / json_folder
'''
json_folder = os . path . join ( output_folder , ' blaze_ ' + video_name + ' _json ' )
if not os . path . exists ( json_folder ) :
os . mkdir ( json_folder )
print ( json_folder )
# json preparation
json_dict = { ' version ' : 1.3 , ' people ' : [ ] }
json_dict [ ' people ' ] = [ { ' person_id ' : [ - 1 ] ,
' pose_keypoints_2d ' : [ ] ,
' face_keypoints_2d ' : [ ] ,
' hand_left_keypoints_2d ' : [ ] ,
' hand_right_keypoints_2d ' : [ ] ,
' pose_keypoints_3d ' : [ ] ,
' face_keypoints_3d ' : [ ] ,
' hand_left_keypoints_3d ' : [ ] ,
' hand_right_keypoints_3d ' : [ ] } ]
# write each h5 line in json file
for frame , kpt in enumerate ( kpt_list ) :
json_dict [ ' people ' ] [ 0 ] [ ' pose_keypoints_2d ' ] = kpt
json_file = os . path . join ( json_folder , ' blaze_ ' + video_name + ' . ' + str ( frame ) . zfill ( 5 ) + ' .json ' )
with open ( json_file , ' w ' ) as js_f :
js_f . write ( json . dumps ( json_dict ) )
def blazepose_detec_func ( * * args ) :
'''
Runs BlazePose ( Mediapipe ) on a video
Saves coordinates to OpenPose format ( json files ) or DeepLabCut format ( csv or h5 table )
Optionally displays and saves images with keypoints overlayed
N . B . : First install mediapipe : ` pip install mediapipe `
You may also need to install tables : ` pip install tables `
Usage :
2023-09-21 23:39:28 +08:00
python - m Blazepose_runsave - i input_file - - display - - save_images - - save_video - - to_csv - - to_h5 - - to_json - - model_complexity 2 - o output_folder
OR python - m Blazepose_runsave - i input_file - - display - - to_json - - save_images
OR python - m Blazepose_runsave - i input_file - dJs
2023-07-19 17:37:20 +08:00
OR from Pose2Sim . Utilities import Blazepose_runsave ; Blazepose_runsave . blazepose_detec_func ( input_file = r ' input_file ' , save_images = True , to_json = True , model_complexity = 2 )
'''
# Retrieve arguments
video_input = os . path . realpath ( args . get ( ' input_file ' ) )
video_dir = os . path . dirname ( video_input )
video_name = os . path . splitext ( os . path . basename ( video_input ) ) [ 0 ]
output_folder = args . get ( ' output_folder ' )
display = args . get ( ' display ' )
save_images = args . get ( ' save_images ' )
save_video = args . get ( ' save_video ' )
to_csv = args . get ( ' to_csv ' )
to_h5 = args . get ( ' to_h5 ' )
to_json = args . get ( ' to_json ' )
model_complexity = int ( args . get ( ' model_complexity ' ) )
if ' model_complexity ' not in vars ( ) : model_complexity = 2
if to_csv or to_h5 or to_json or save_images or save_video :
if output_folder == None :
output_folder = video_dir
if not os . path . exists ( os . path . realpath ( output_folder ) ) :
os . mkdir ( os . path . realpath ( output_folder ) )
# Run Blazepose
cap = cv2 . VideoCapture ( video_input )
W , H = cap . get ( cv2 . CAP_PROP_FRAME_WIDTH ) , cap . get ( cv2 . CAP_PROP_FRAME_HEIGHT )
fps = cap . get ( cv2 . CAP_PROP_FPS )
count = 0
kpt_list = [ ]
with mp_pose . Pose ( min_detection_confidence = 0.5 , min_tracking_confidence = 0.5 , model_complexity = model_complexity ) as pose :
while cap . isOpened ( ) :
ret , frame = cap . read ( )
if ret == True :
# Blazepose detection
results = pose . process ( cv2 . cvtColor ( frame , cv2 . COLOR_BGR2RGB ) )
try :
kpt = [ [ p . x * W , p . y * H , p . visibility ] for p in results . pose_landmarks . landmark ]
kpt = [ item for sublist in kpt for item in sublist ]
mp_drawing . draw_landmarks ( frame , results . pose_landmarks , mp_pose . POSE_CONNECTIONS , landmark_drawing_spec = mp_drawing_styles . get_default_pose_landmarks_style ( ) )
except :
print ( f ' No person detected by BlazePose on frame { count } ' )
kpt = [ np . nan ] * 3 * 33
# Display images
if display :
cv2 . imshow ( ' frame ' , frame )
if cv2 . waitKey ( 30 ) & 0xFF == ord ( ' q ' ) :
break
# Save images
if save_images :
images_folder = os . path . join ( output_folder , ' blaze_ ' + video_name + ' _img ' )
if not os . path . exists ( images_folder ) :
os . mkdir ( images_folder )
cv2 . imwrite ( os . path . join ( images_folder , ' blaze_ ' + video_name + ' . ' + str ( count ) . zfill ( 5 ) + ' .png ' ) , frame )
# Save video
if save_video :
if count == 0 :
fourcc = cv2 . VideoWriter_fourcc ( * ' MP4V ' )
writer = cv2 . VideoWriter ( os . path . join ( output_folder , video_name + ' _blaze.mp4 ' ) , fourcc , fps , ( int ( W ) , int ( H ) ) )
writer . write ( frame )
# Store coordinates
if to_csv or to_h5 or to_json :
kpt_list . append ( kpt )
count + = 1
else :
break
cap . release ( )
if save_video :
writer . release ( )
cv2 . destroyAllWindows ( )
# Save coordinates
if to_csv or to_h5 :
save_to_csv_or_h5 ( kpt_list , output_folder , video_name , to_csv , to_h5 )
if to_json :
save_to_json ( kpt_list , output_folder , video_name )
if __name__ == ' __main__ ' :
parser = argparse . ArgumentParser ( )
parser . add_argument ( ' -i ' , ' --input_file ' , required = True , help = ' input video file ' )
parser . add_argument ( ' -C ' , ' --to_csv ' , required = False , action = ' store_true ' , help = ' save coordinates to csv ' )
parser . add_argument ( ' -H ' , ' --to_h5 ' , required = False , action = ' store_true ' , help = ' save coordinates to h5 ' )
parser . add_argument ( ' -J ' , ' --to_json ' , required = False , action = ' store_true ' , help = ' save coordinates to json ' )
parser . add_argument ( ' -d ' , ' --display ' , required = False , action = ' store_true ' , help = ' display images with overlayed coordinates ' )
parser . add_argument ( ' -s ' , ' --save_images ' , required = False , action = ' store_true ' , help = ' save images with overlayed coordinates ' )
parser . add_argument ( ' -v ' , ' --save_video ' , required = False , action = ' store_true ' , help = ' save video with overlayed coordinates ' )
parser . add_argument ( ' -m ' , ' --model_complexity ' , required = False , default = 2 , help = ' model complexity. 0: fastest but less accurate, 2: most accurate but slowest ' )
parser . add_argument ( ' -o ' , ' --output_folder ' , required = False , help = ' output folder for coordinates and images ' )
args = vars ( parser . parse_args ( ) )
blazepose_detec_func ( * * args )