b2fe4f7ba3
Edits from @hunminkim98's awesome work at integrating pose estimation into Pose2Sim with RTMLib. Most of the changes in syntax are not necessarily better, it is mostly for the code to be more consistent with the rest of the library. Thank you again for your fantastic work! General: - Automatically detects whether a valid CUDA install is available. If so, use the GPU with the ONNXRuntime backend. Otherwise, use the CPU with the OpenVINO backend - The tensorflow version used for marker augmentation was incompatible with the cuda torch installation for pose estimation: edited code and models for it to work with the latest tf version. - Added logging information to pose estimation - Readme.md: provided an installation procedure for CUDA (took me a while to find something simple and robust) - Readme.md: added information about PoseEstimation with RTMLib - added poseEstimation to tests.py - created videos for the multi-person case (used to only have json, no video), and reorganized Demo folders. Had to recreate calibration file as well Json files: - the json files only saved one person, I made it save all the detected ones - tracking was not taken into account by rtmlib, which caused issues in synchronization: fixed, waiting for merge - took the save_to_openpose function out from the main function - minified the json files (they take less space when all spaces are removed) Detection results: - Compared the triangulated locations of RTMpose keypoints to the ones of OpenPose to potentially edit model marker locations on OpenSim. Did not seem to need it. Others in Config.toml: - removed the "to_openpose" option, which is not needed - added the flag: save_video = 'to_images' # 'to_video' or 'to_images' or ['to_video', 'to_images'] - changed the way frame_range was handled (made me change synchronization in depth, as well as personAssociation and triangulation) - added the flag: time_range_around_maxspeed in synchronization - automatically detect framerate from video, or set to 60 fps if we work from images (or give a value) - frame_range -> time_range - moved height and weight to project (only read for markerAugmentation, and in the future for automatic scaling) - removed reorder_trc from triangulation and Config -> call it for markerAugmentation instead Others: - Provided an installation procedure for OpenSim (for the future) and made continuous installation check its install (a bit harder since it cannot be installed via pip) - scaling from motion instead of static pose (will have to study whether it's as good or not) - added logging to synchronization - Struggled quite a bit with continuous integration * Starting point of integrating RTMPose into Pose2Sim. (#111) * RTM_to_Open Convert format from RTMPose to OpenPose * rtm_intergrated * rtm_integrated * rtm_integrated * rtm_integrated * rtm * Delete build/lib/Pose2Sim directory * rtm * Delete build/lib/Pose2Sim directory * Delete onnxruntime-gpu * device = cpu * add pose folder * Update tests.py * added annotation * fix typo * Should work be still lots of tests to run. Detailed commit coming soon * intermediary commit * last checks before v0.9.0 * Update continuous-integration.yml * Update tests.py * replaced tabs with spaces * unittest issue * unittest typo * deactivated display for CI test of pose detection * Try to make continuous integration work * a * b * c * d * e * f * g * h * i * j * k * l --------- Co-authored-by: HunMinKim <144449115+hunminkim98@users.noreply.github.com>
425 lines
19 KiB
Python
425 lines
19 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
'''
|
|
###########################################################################
|
|
## POSE ESTIMATION ##
|
|
###########################################################################
|
|
|
|
Estimate pose from a video file or a folder of images and
|
|
write the results to JSON files, videos, and/or images.
|
|
Results can optionally be displayed in real time.
|
|
|
|
Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body)
|
|
Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you
|
|
need nother detection or pose models)
|
|
|
|
Optionally gives consistent person ID across frames (slower but good for 2D analysis)
|
|
Optionally runs detection every n frames and inbetween tracks points (faster but less accurate).
|
|
|
|
If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise,
|
|
uses the CPU with the OpenVINO backend.
|
|
|
|
INPUTS:
|
|
- videos or image folders from the video directory
|
|
- a Config.toml file
|
|
|
|
OUTPUTS:
|
|
- JSON files with the detected keypoints and confidence scores in the OpenPose format
|
|
- Optionally, videos and/or image files with the detected keypoints
|
|
'''
|
|
|
|
|
|
## INIT
|
|
import os
|
|
import glob
|
|
import json
|
|
import logging
|
|
from tqdm import tqdm
|
|
import numpy as np
|
|
import cv2
|
|
import torch
|
|
import onnxruntime as ort
|
|
|
|
from rtmlib import PoseTracker, Body, Wholebody, BodyWithFeet, draw_skeleton
|
|
from Pose2Sim.common import natural_sort_key
|
|
|
|
|
|
## AUTHORSHIP INFORMATION
|
|
__author__ = "HunMin Kim, David Pagnon"
|
|
__copyright__ = "Copyright 2021, Pose2Sim"
|
|
__credits__ = ["HunMin Kim", "David Pagnon"]
|
|
__license__ = "BSD 3-Clause License"
|
|
__version__ = "0.8.5"
|
|
__maintainer__ = "David Pagnon"
|
|
__email__ = "contact@david-pagnon.com"
|
|
__status__ = "Development"
|
|
|
|
|
|
## FUNCTIONS
|
|
def save_to_openpose(json_file_path, keypoints, scores):
|
|
'''
|
|
Save the keypoints and scores to a JSON file in the OpenPose format
|
|
|
|
INPUTS:
|
|
- json_file_path: Path to save the JSON file
|
|
- keypoints: Detected keypoints
|
|
- scores: Confidence scores for each keypoint
|
|
|
|
OUTPUTS:
|
|
- JSON file with the detected keypoints and confidence scores in the OpenPose format
|
|
'''
|
|
|
|
# Prepare keypoints with confidence scores for JSON output
|
|
nb_detections = len(keypoints)
|
|
# print('results: ', keypoints, scores)
|
|
detections = []
|
|
for i in range(nb_detections): # nb of detected people
|
|
keypoints_with_confidence_i = []
|
|
for kp, score in zip(keypoints[i], scores[i]):
|
|
keypoints_with_confidence_i.extend([kp[0].item(), kp[1].item(), score.item()])
|
|
detections.append({
|
|
"person_id": [-1],
|
|
"pose_keypoints_2d": keypoints_with_confidence_i,
|
|
"face_keypoints_2d": [],
|
|
"hand_left_keypoints_2d": [],
|
|
"hand_right_keypoints_2d": [],
|
|
"pose_keypoints_3d": [],
|
|
"face_keypoints_3d": [],
|
|
"hand_left_keypoints_3d": [],
|
|
"hand_right_keypoints_3d": []
|
|
})
|
|
|
|
# Create JSON output structure
|
|
json_output = {"version": 1.3, "people": detections}
|
|
|
|
# Save JSON output for each frame
|
|
json_output_dir = os.path.abspath(os.path.join(json_file_path, '..'))
|
|
if not os.path.isdir(json_output_dir): os.makedirs(json_output_dir)
|
|
with open(json_file_path, 'w') as json_file:
|
|
json.dump(json_output, json_file)
|
|
|
|
|
|
def process_video(video_path, pose_tracker, tracking, output_format, save_video, save_images, display_detection, frame_range):
|
|
'''
|
|
Estimate pose from a video file
|
|
|
|
INPUTS:
|
|
- video_path: str. Path to the input video file
|
|
- pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib
|
|
- tracking: bool. Whether to give consistent person ID across frames
|
|
- output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut')
|
|
- save_video: bool. Whether to save the output video
|
|
- save_images: bool. Whether to save the output images
|
|
- display_detection: bool. Whether to show real-time visualization
|
|
- frame_range: list. Range of frames to process
|
|
|
|
OUTPUTS:
|
|
- JSON files with the detected keypoints and confidence scores in the OpenPose format
|
|
- if save_video: Video file with the detected keypoints and confidence scores drawn on the frames
|
|
- if save_images: Image files with the detected keypoints and confidence scores drawn on the frames
|
|
'''
|
|
|
|
try:
|
|
cap = cv2.VideoCapture(video_path)
|
|
cap.read()
|
|
if cap.read()[0] == False:
|
|
raise
|
|
except:
|
|
raise NameError(f"{video_path} is not a video. Images must be put in one subdirectory per camera.")
|
|
|
|
pose_dir = os.path.abspath(os.path.join(video_path, '..', '..', 'pose'))
|
|
if not os.path.isdir(pose_dir): os.makedirs(pose_dir)
|
|
video_name_wo_ext = os.path.splitext(os.path.basename(video_path))[0]
|
|
json_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_json')
|
|
output_video_path = os.path.join(pose_dir, f'{video_name_wo_ext}_pose.mp4')
|
|
img_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_img')
|
|
|
|
if save_video: # Set up video writer
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video
|
|
fps = cap.get(cv2.CAP_PROP_FPS) # Get the frame rate from the raw video
|
|
W, H = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Get the width and height from the raw video
|
|
out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file
|
|
|
|
if display_detection:
|
|
cv2.namedWindow(f"Pose Estimation {os.path.basename(video_path)}", cv2.WINDOW_NORMAL + cv2.WINDOW_KEEPRATIO)
|
|
|
|
frame_idx = 0
|
|
cap = cv2.VideoCapture(video_path)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
f_range = [[total_frames] if frame_range==[] else frame_range][0]
|
|
with tqdm(total=total_frames, desc=f'Processing {os.path.basename(video_path)}') as pbar:
|
|
while cap.isOpened():
|
|
# print('\nFrame ', frame_idx)
|
|
success, frame = cap.read()
|
|
if not success:
|
|
break
|
|
|
|
if frame_idx in range(*f_range):
|
|
# Perform pose estimation on the frame
|
|
keypoints, scores = pose_tracker(frame)
|
|
|
|
# Reorder keypoints, scores
|
|
if tracking:
|
|
max_id = max(pose_tracker.track_ids_last_frame)
|
|
num_frames, num_points, num_coordinates = keypoints.shape
|
|
keypoints_filled = np.zeros((max_id+1, num_points, num_coordinates))
|
|
scores_filled = np.zeros((max_id+1, num_points))
|
|
keypoints_filled[pose_tracker.track_ids_last_frame] = keypoints
|
|
scores_filled[pose_tracker.track_ids_last_frame] = scores
|
|
keypoints = keypoints_filled
|
|
scores = scores_filled
|
|
|
|
# Save to json
|
|
if 'openpose' in output_format:
|
|
json_file_path = os.path.join(json_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.json')
|
|
save_to_openpose(json_file_path, keypoints, scores)
|
|
|
|
# Draw skeleton on the frame
|
|
if display_detection or save_video or save_images:
|
|
img_show = frame.copy()
|
|
img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low
|
|
|
|
if display_detection:
|
|
cv2.imshow(f"Pose Estimation {os.path.basename(video_path)}", img_show)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
if save_video:
|
|
out.write(img_show)
|
|
|
|
if save_images:
|
|
if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir)
|
|
cv2.imwrite(os.path.join(img_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.png'), img_show)
|
|
|
|
frame_idx += 1
|
|
pbar.update(1)
|
|
|
|
cap.release()
|
|
if save_video:
|
|
out.release()
|
|
logging.info(f"--> Output video saved to {output_video_path}.")
|
|
if save_images:
|
|
logging.info(f"--> Output images saved to {img_output_dir}.")
|
|
if display_detection:
|
|
cv2.destroyAllWindows()
|
|
|
|
|
|
def process_images(image_folder_path, vid_img_extension, pose_tracker, tracking, output_format, fps, save_video, save_images, display_detection, frame_range):
|
|
'''
|
|
Estimate pose estimation from a folder of images
|
|
|
|
INPUTS:
|
|
- image_folder_path: str. Path to the input image folder
|
|
- vid_img_extension: str. Extension of the image files
|
|
- pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib
|
|
- tracking: bool. Whether to give consistent person ID across frames
|
|
- output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut')
|
|
- save_video: bool. Whether to save the output video
|
|
- save_images: bool. Whether to save the output images
|
|
- display_detection: bool. Whether to show real-time visualization
|
|
- frame_range: list. Range of frames to process
|
|
|
|
OUTPUTS:
|
|
- JSON files with the detected keypoints and confidence scores in the OpenPose format
|
|
- if save_video: Video file with the detected keypoints and confidence scores drawn on the frames
|
|
- if save_images: Image files with the detected keypoints and confidence scores drawn on the frames
|
|
'''
|
|
|
|
pose_dir = os.path.abspath(os.path.join(image_folder_path, '..', '..', 'pose'))
|
|
if not os.path.isdir(pose_dir): os.makedirs(pose_dir)
|
|
json_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_json')
|
|
output_video_path = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_pose.mp4')
|
|
img_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_img')
|
|
|
|
image_files = glob.glob(os.path.join(image_folder_path, '*'+vid_img_extension))
|
|
sorted(image_files, key=natural_sort_key)
|
|
|
|
if save_video: # Set up video writer
|
|
logging.warning('Using default framerate of 60 fps.')
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video
|
|
W, H = cv2.imread(image_files[0]).shape[:2][::-1] # Get the width and height from the first image (assuming all images have the same size)
|
|
out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file
|
|
|
|
if display_detection:
|
|
cv2.namedWindow(f"Pose Estimation {os.path.basename(image_folder_path)}", cv2.WINDOW_NORMAL)
|
|
|
|
f_range = [[len(image_files)] if frame_range==[] else frame_range][0]
|
|
for frame_idx, image_file in enumerate(tqdm(image_files, desc=f'\nProcessing {os.path.basename(img_output_dir)}')):
|
|
if frame_idx in range(*f_range):
|
|
|
|
try:
|
|
frame = cv2.imread(image_file)
|
|
except:
|
|
raise NameError(f"{image_file} is not an image. Videos must be put in the video directory, not in subdirectories.")
|
|
|
|
# Perform pose estimation on the image
|
|
keypoints, scores = pose_tracker(frame)
|
|
|
|
# Reorder keypoints, scores
|
|
if tracking:
|
|
max_id = max(pose_tracker.track_ids_last_frame)
|
|
num_frames, num_points, num_coordinates = keypoints.shape
|
|
keypoints_filled = np.zeros((max_id+1, num_points, num_coordinates))
|
|
scores_filled = np.zeros((max_id+1, num_points))
|
|
keypoints_filled[pose_tracker.track_ids_last_frame] = keypoints
|
|
scores_filled[pose_tracker.track_ids_last_frame] = scores
|
|
keypoints = keypoints_filled
|
|
scores = scores_filled
|
|
|
|
# Extract frame number from the filename
|
|
if 'openpose' in output_format:
|
|
json_file_path = os.path.join(json_output_dir, f"{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.json")
|
|
save_to_openpose(json_file_path, keypoints, scores)
|
|
|
|
# Draw skeleton on the image
|
|
if display_detection or save_video or save_images:
|
|
img_show = frame.copy()
|
|
img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low
|
|
|
|
if display_detection:
|
|
cv2.imshow(f"Pose Estimation {os.path.basename(image_folder_path)}", img_show)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
if save_video:
|
|
out.write(img_show)
|
|
|
|
if save_images:
|
|
if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir)
|
|
cv2.imwrite(os.path.join(img_output_dir, f'{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.png'), img_show)
|
|
|
|
if save_video:
|
|
logging.info(f"--> Output video saved to {output_video_path}.")
|
|
if save_images:
|
|
logging.info(f"--> Output images saved to {img_output_dir}.")
|
|
if display_detection:
|
|
cv2.destroyAllWindows()
|
|
|
|
|
|
def rtm_estimator(config_dict):
|
|
'''
|
|
Estimate pose from a video file or a folder of images and
|
|
write the results to JSON files, videos, and/or images.
|
|
Results can optionally be displayed in real time.
|
|
|
|
Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body)
|
|
Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you
|
|
need nother detection or pose models)
|
|
|
|
Optionally gives consistent person ID across frames (slower but good for 2D analysis)
|
|
Optionally runs detection every n frames and inbetween tracks points (faster but less accurate).
|
|
|
|
If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise,
|
|
uses the CPU with the OpenVINO backend.
|
|
|
|
INPUTS:
|
|
- videos or image folders from the video directory
|
|
- a Config.toml file
|
|
|
|
OUTPUTS:
|
|
- JSON files with the detected keypoints and confidence scores in the OpenPose format
|
|
- Optionally, videos and/or image files with the detected keypoints
|
|
'''
|
|
|
|
# Read config
|
|
project_dir = config_dict['project']['project_dir']
|
|
# if batch
|
|
session_dir = os.path.realpath(os.path.join(project_dir, '..', '..'))
|
|
# if single trial
|
|
session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd()
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
video_dir = os.path.join(project_dir, 'videos')
|
|
|
|
pose_model = config_dict['pose']['pose_model']
|
|
mode = config_dict['pose']['mode'] # lightweight, balanced, performance
|
|
vid_img_extension = config_dict['pose']['vid_img_extension']
|
|
|
|
output_format = config_dict['pose']['output_format']
|
|
save_video = True if 'to_video' in config_dict['pose']['save_video'] else False
|
|
save_images = True if 'to_images' in config_dict['pose']['save_video'] else False
|
|
display_detection = config_dict['pose']['display_detection']
|
|
|
|
det_frequency = config_dict['pose']['det_frequency']
|
|
tracking = config_dict['pose']['tracking']
|
|
|
|
# Determine frame rate
|
|
video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension))
|
|
frame_rate = config_dict.get('project').get('frame_rate')
|
|
if frame_rate == 'auto':
|
|
try:
|
|
cap = cv2.VideoCapture(video_files[0])
|
|
cap.read()
|
|
if cap.read()[0] == False:
|
|
raise
|
|
except:
|
|
frame_rate = 60
|
|
|
|
# If CUDA is available, use it with ONNXRuntime backend; else use CPU with openvino
|
|
if 'CUDAExecutionProvider' in ort.get_available_providers() and torch.cuda.is_available():
|
|
device = 'cuda'
|
|
backend = 'onnxruntime'
|
|
logging.info(f"\nValid CUDA installation found: using ONNXRuntime backend with GPU.")
|
|
elif 'MPSExecutionProvider' in ort.get_available_providers() or 'CoreMLExecutionProvider' in ort.get_available_providers():
|
|
device = 'mps'
|
|
backend = 'onnxruntime'
|
|
logging.info(f"\nValid MPS installation found: using ONNXRuntime backend with GPU.")
|
|
else:
|
|
device = 'cpu'
|
|
backend = 'openvino'
|
|
logging.info(f"\nNo valid CUDA installation found: using OpenVINO backend with CPU.")
|
|
|
|
if det_frequency>1:
|
|
logging.info(f'Inference run only every {det_frequency} frames. Inbetween, pose estimation tracks previously detected points.')
|
|
elif det_frequency==1:
|
|
logging.info(f'Inference run on every single frame.')
|
|
else:
|
|
raise ValueError(f"Invalid det_frequency: {det_frequency}. Must be an integer greater or equal to 1.")
|
|
|
|
if tracking:
|
|
logging.info(f'Pose estimation will attempt to give consistent person IDs across frames.\n')
|
|
|
|
# Select the appropriate model based on the model_type
|
|
if pose_model.upper() == 'HALPE_26':
|
|
ModelClass = BodyWithFeet
|
|
logging.info(f"Using HALPE_26 model (body and feet) for pose estimation.")
|
|
elif pose_model.upper() == 'COCO_133':
|
|
ModelClass = Wholebody
|
|
logging.info(f"Using COCO_133 model (body, feet, hands, and face) for pose estimation.")
|
|
elif pose_model.upper() == 'COCO_17':
|
|
ModelClass = Body # 26 keypoints(halpe26)
|
|
logging.info(f"Using COCO_17 model (body) for pose estimation.")
|
|
else:
|
|
raise ValueError(f"Invalid model_type: {pose_model}. Must be 'HALPE_26', 'COCO_133', or 'COCO_17'. Use another network (MMPose, DeepLabCut, OpenPose, AlphaPose, BlazePose...) and convert the output files if you need another model. See documentation.")
|
|
logging.info(f'Mode: {mode}.\n')
|
|
|
|
|
|
# Initialize the pose tracker
|
|
pose_tracker = PoseTracker(
|
|
ModelClass,
|
|
det_frequency=det_frequency,
|
|
mode=mode,
|
|
backend=backend,
|
|
device=device,
|
|
tracking=tracking,
|
|
to_openpose=False)
|
|
|
|
logging.info('\nEstimating pose...')
|
|
video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension))
|
|
if not len(video_files) == 0:
|
|
# Process video files
|
|
logging.info(f'Found video files with extension {vid_img_extension}.')
|
|
for video_path in video_files:
|
|
pose_tracker.reset()
|
|
process_video(video_path, pose_tracker, tracking, output_format, save_video, save_images, display_detection, frame_range)
|
|
|
|
else:
|
|
# Process image folders
|
|
logging.info(f'Found image folders with extension {vid_img_extension}.')
|
|
image_folders = [f for f in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, f))]
|
|
for image_folder in image_folders:
|
|
pose_tracker.reset()
|
|
image_folder_path = os.path.join(video_dir, image_folder)
|
|
process_images(image_folder_path, vid_img_extension, pose_tracker, tracking, output_format, frame_rate, save_video, save_images, display_detection, frame_range)
|