pose2sim/Pose2Sim/poseEstimation.py

445 lines
20 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
###########################################################################
## POSE ESTIMATION ##
###########################################################################
Estimate pose from a video file or a folder of images and
write the results to JSON files, videos, and/or images.
Results can optionally be displayed in real time.
Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body)
Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you
need nother detection or pose models)
Optionally gives consistent person ID across frames (slower but good for 2D analysis)
Optionally runs detection every n frames and inbetween tracks points (faster but less accurate).
If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise,
uses the CPU with the OpenVINO backend.
INPUTS:
- videos or image folders from the video directory
- a Config.toml file
OUTPUTS:
- JSON files with the detected keypoints and confidence scores in the OpenPose format
- Optionally, videos and/or image files with the detected keypoints
'''
## INIT
import os
import glob
import json
import logging
from tqdm import tqdm
import numpy as np
import cv2
from rtmlib import PoseTracker, Body, Wholebody, BodyWithFeet, draw_skeleton
from Pose2Sim.common import natural_sort_key
## AUTHORSHIP INFORMATION
__author__ = "HunMin Kim, David Pagnon"
__copyright__ = "Copyright 2021, Pose2Sim"
__credits__ = ["HunMin Kim", "David Pagnon"]
__license__ = "BSD 3-Clause License"
__version__ = "0.9.4"
__maintainer__ = "David Pagnon"
__email__ = "contact@david-pagnon.com"
__status__ = "Development"
## FUNCTIONS
def save_to_openpose(json_file_path, keypoints, scores):
'''
Save the keypoints and scores to a JSON file in the OpenPose format
INPUTS:
- json_file_path: Path to save the JSON file
- keypoints: Detected keypoints
- scores: Confidence scores for each keypoint
OUTPUTS:
- JSON file with the detected keypoints and confidence scores in the OpenPose format
'''
# Prepare keypoints with confidence scores for JSON output
nb_detections = len(keypoints)
# print('results: ', keypoints, scores)
detections = []
for i in range(nb_detections): # nb of detected people
keypoints_with_confidence_i = []
for kp, score in zip(keypoints[i], scores[i]):
keypoints_with_confidence_i.extend([kp[0].item(), kp[1].item(), score.item()])
detections.append({
"person_id": [-1],
"pose_keypoints_2d": keypoints_with_confidence_i,
"face_keypoints_2d": [],
"hand_left_keypoints_2d": [],
"hand_right_keypoints_2d": [],
"pose_keypoints_3d": [],
"face_keypoints_3d": [],
"hand_left_keypoints_3d": [],
"hand_right_keypoints_3d": []
})
# Create JSON output structure
json_output = {"version": 1.3, "people": detections}
# Save JSON output for each frame
json_output_dir = os.path.abspath(os.path.join(json_file_path, '..'))
if not os.path.isdir(json_output_dir): os.makedirs(json_output_dir)
with open(json_file_path, 'w') as json_file:
json.dump(json_output, json_file)
def sort_people_rtmlib(pose_tracker, keypoints, scores):
'''
Associate persons across frames (RTMLib method)
INPUTS:
- pose_tracker: PoseTracker. The initialized RTMLib pose tracker object
- keypoints: array of shape K, L, M with K the number of detected persons,
L the number of detected keypoints, M their 2D coordinates
- scores: array of shape K, L with K the number of detected persons,
L the confidence of detected keypoints
OUTPUT:
- sorted_keypoints: array with reordered persons
- sorted_scores: array with reordered scores
'''
try:
desired_size = max(pose_tracker.track_ids_last_frame)+1
sorted_keypoints = np.full((desired_size, keypoints.shape[1], 2), np.nan)
sorted_keypoints[pose_tracker.track_ids_last_frame] = keypoints[:len(pose_tracker.track_ids_last_frame), :, :]
sorted_scores = np.full((desired_size, scores.shape[1]), np.nan)
sorted_scores[pose_tracker.track_ids_last_frame] = scores[:len(pose_tracker.track_ids_last_frame), :]
except:
sorted_keypoints, sorted_scores = keypoints, scores
return sorted_keypoints, sorted_scores
def process_video(video_path, pose_tracker, output_format, save_video, save_images, display_detection, frame_range):
'''
Estimate pose from a video file
INPUTS:
- video_path: str. Path to the input video file
- pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib
- output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut')
- save_video: bool. Whether to save the output video
- save_images: bool. Whether to save the output images
- display_detection: bool. Whether to show real-time visualization
- frame_range: list. Range of frames to process
OUTPUTS:
- JSON files with the detected keypoints and confidence scores in the OpenPose format
- if save_video: Video file with the detected keypoints and confidence scores drawn on the frames
- if save_images: Image files with the detected keypoints and confidence scores drawn on the frames
'''
try:
cap = cv2.VideoCapture(video_path)
cap.read()
if cap.read()[0] == False:
raise
except:
raise NameError(f"{video_path} is not a video. Images must be put in one subdirectory per camera.")
pose_dir = os.path.abspath(os.path.join(video_path, '..', '..', 'pose'))
if not os.path.isdir(pose_dir): os.makedirs(pose_dir)
video_name_wo_ext = os.path.splitext(os.path.basename(video_path))[0]
json_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_json')
output_video_path = os.path.join(pose_dir, f'{video_name_wo_ext}_pose.mp4')
img_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_img')
if save_video: # Set up video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video
fps = cap.get(cv2.CAP_PROP_FPS) # Get the frame rate from the raw video
W, H = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Get the width and height from the raw video
out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file
if display_detection:
cv2.namedWindow(f"Pose Estimation {os.path.basename(video_path)}", cv2.WINDOW_NORMAL + cv2.WINDOW_KEEPRATIO)
frame_idx = 0
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
f_range = [[total_frames] if frame_range==[] else frame_range][0]
with tqdm(total=total_frames, desc=f'Processing {os.path.basename(video_path)}') as pbar:
while cap.isOpened():
# print('\nFrame ', frame_idx)
success, frame = cap.read()
if not success:
break
if frame_idx in range(*f_range):
# Perform pose estimation on the frame
keypoints, scores = pose_tracker(frame)
# Save to json
if 'openpose' in output_format:
json_file_path = os.path.join(json_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.json')
save_to_openpose(json_file_path, keypoints, scores)
# Draw skeleton on the frame
if display_detection or save_video or save_images:
img_show = frame.copy()
img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low
if display_detection:
cv2.imshow(f"Pose Estimation {os.path.basename(video_path)}", img_show)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if save_video:
out.write(img_show)
if save_images:
if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir)
cv2.imwrite(os.path.join(img_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.png'), img_show)
frame_idx += 1
pbar.update(1)
cap.release()
if save_video:
out.release()
logging.info(f"--> Output video saved to {output_video_path}.")
if save_images:
logging.info(f"--> Output images saved to {img_output_dir}.")
if display_detection:
cv2.destroyAllWindows()
def process_images(image_folder_path, vid_img_extension, pose_tracker, output_format, fps, save_video, save_images, display_detection, frame_range):
'''
Estimate pose estimation from a folder of images
INPUTS:
- image_folder_path: str. Path to the input image folder
- vid_img_extension: str. Extension of the image files
- pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib
- output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut')
- save_video: bool. Whether to save the output video
- save_images: bool. Whether to save the output images
- display_detection: bool. Whether to show real-time visualization
- frame_range: list. Range of frames to process
OUTPUTS:
- JSON files with the detected keypoints and confidence scores in the OpenPose format
- if save_video: Video file with the detected keypoints and confidence scores drawn on the frames
- if save_images: Image files with the detected keypoints and confidence scores drawn on the frames
'''
pose_dir = os.path.abspath(os.path.join(image_folder_path, '..', '..', 'pose'))
if not os.path.isdir(pose_dir): os.makedirs(pose_dir)
json_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_json')
output_video_path = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_pose.mp4')
img_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_img')
image_files = glob.glob(os.path.join(image_folder_path, '*'+vid_img_extension))
sorted(image_files, key=natural_sort_key)
if save_video: # Set up video writer
logging.warning('Using default framerate of 60 fps.')
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video
W, H = cv2.imread(image_files[0]).shape[:2][::-1] # Get the width and height from the first image (assuming all images have the same size)
out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file
if display_detection:
cv2.namedWindow(f"Pose Estimation {os.path.basename(image_folder_path)}", cv2.WINDOW_NORMAL)
f_range = [[len(image_files)] if frame_range==[] else frame_range][0]
for frame_idx, image_file in enumerate(tqdm(image_files, desc=f'\nProcessing {os.path.basename(img_output_dir)}')):
if frame_idx in range(*f_range):
try:
frame = cv2.imread(image_file)
except:
raise NameError(f"{image_file} is not an image. Videos must be put in the video directory, not in subdirectories.")
# Perform pose estimation on the image
keypoints, scores = pose_tracker(frame)
# Extract frame number from the filename
if 'openpose' in output_format:
json_file_path = os.path.join(json_output_dir, f"{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.json")
save_to_openpose(json_file_path, keypoints, scores)
# Draw skeleton on the image
if display_detection or save_video or save_images:
img_show = frame.copy()
img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low
if display_detection:
cv2.imshow(f"Pose Estimation {os.path.basename(image_folder_path)}", img_show)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if save_video:
out.write(img_show)
if save_images:
if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir)
cv2.imwrite(os.path.join(img_output_dir, f'{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.png'), img_show)
if save_video:
logging.info(f"--> Output video saved to {output_video_path}.")
if save_images:
logging.info(f"--> Output images saved to {img_output_dir}.")
if display_detection:
cv2.destroyAllWindows()
def rtm_estimator(config_dict):
'''
Estimate pose from a video file or a folder of images and
write the results to JSON files, videos, and/or images.
Results can optionally be displayed in real time.
Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body)
Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you
need nother detection or pose models)
Optionally gives consistent person ID across frames (slower but good for 2D analysis)
Optionally runs detection every n frames and inbetween tracks points (faster but less accurate).
If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise,
uses the CPU with the OpenVINO backend.
INPUTS:
- videos or image folders from the video directory
- a Config.toml file
OUTPUTS:
- JSON files with the detected keypoints and confidence scores in the OpenPose format
- Optionally, videos and/or image files with the detected keypoints
'''
# Read config
project_dir = config_dict['project']['project_dir']
# if batch
session_dir = os.path.realpath(os.path.join(project_dir, '..'))
# if single trial
session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd()
frame_range = config_dict.get('project').get('frame_range')
video_dir = os.path.join(project_dir, 'videos')
pose_dir = os.path.join(project_dir, 'pose')
pose_model = config_dict['pose']['pose_model']
mode = config_dict['pose']['mode'] # lightweight, balanced, performance
vid_img_extension = config_dict['pose']['vid_img_extension']
output_format = config_dict['pose']['output_format']
save_video = True if 'to_video' in config_dict['pose']['save_video'] else False
save_images = True if 'to_images' in config_dict['pose']['save_video'] else False
display_detection = config_dict['pose']['display_detection']
overwrite_pose = config_dict['pose']['overwrite_pose']
det_frequency = config_dict['pose']['det_frequency']
# Determine frame rate
video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension))
frame_rate = config_dict.get('project').get('frame_rate')
if frame_rate == 'auto':
cap = cv2.VideoCapture(video_files[0])
if not cap.isOpened():
raise FileNotFoundError(f'Error: Could not open {video_files[0]}. Check that the file exists.')
frame_rate = cap.get(cv2.CAP_PROP_FPS)
if frame_rate == 0:
frame_rate = 30
logging.warning(f'Error: Could not retrieve frame rate from {video_files[0]}. Defaulting to 30fps.')
# If CUDA is available, use it with ONNXRuntime backend; else use CPU with openvino
try:
import torch
import onnxruntime as ort
if torch.cuda.is_available() and 'CUDAExecutionProvider' in ort.get_available_providers():
device = 'cuda'
backend = 'onnxruntime'
logging.info(f"\nValid CUDA installation found: using ONNXRuntime backend with GPU.")
else:
raise
except:
try:
import onnxruntime as ort
if 'MPSExecutionProvider' in ort.get_available_providers() or 'CoreMLExecutionProvider' in ort.get_available_providers():
device = 'mps'
backend = 'onnxruntime'
logging.info(f"\nValid MPS installation found: using ONNXRuntime backend with GPU.")
else:
raise
except:
device = 'cpu'
backend = 'openvino'
logging.info(f"\nNo valid CUDA installation found: using OpenVINO backend with CPU.")
if det_frequency>1:
logging.info(f'Inference run only every {det_frequency} frames. Inbetween, pose estimation tracks previously detected points.')
elif det_frequency==1:
logging.info(f'Inference run on every single frame.')
else:
raise ValueError(f"Invalid det_frequency: {det_frequency}. Must be an integer greater or equal to 1.")
# Select the appropriate model based on the model_type
if pose_model.upper() == 'HALPE_26':
ModelClass = BodyWithFeet
logging.info(f"Using HALPE_26 model (body and feet) for pose estimation.")
elif pose_model.upper() == 'COCO_133':
ModelClass = Wholebody
logging.info(f"Using COCO_133 model (body, feet, hands, and face) for pose estimation.")
elif pose_model.upper() == 'COCO_17':
ModelClass = Body # 26 keypoints(halpe26)
logging.info(f"Using COCO_17 model (body) for pose estimation.")
else:
raise ValueError(f"Invalid model_type: {pose_model}. Must be 'HALPE_26', 'COCO_133', or 'COCO_17'. Use another network (MMPose, DeepLabCut, OpenPose, AlphaPose, BlazePose...) and convert the output files if you need another model. See documentation.")
logging.info(f'Mode: {mode}.\n')
# Initialize the pose tracker
pose_tracker = PoseTracker(
ModelClass,
det_frequency=det_frequency,
mode=mode,
backend=backend,
device=device,
tracking=False,
to_openpose=False)
logging.info('\nEstimating pose...')
try:
pose_listdirs_names = next(os.walk(pose_dir))[1]
os.listdir(os.path.join(pose_dir, pose_listdirs_names[0]))[0]
if not overwrite_pose:
logging.info('Skipping pose estimation as it has already been done. Set overwrite_pose to true in Config.toml if you want to run it again.')
else:
logging.info('Overwriting previous pose estimation. Set overwrite_pose to false in Config.toml if you want to keep the previous results.')
raise
except:
video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension))
if not len(video_files) == 0:
# Process video files
logging.info(f'Found video files with extension {vid_img_extension}.')
for video_path in video_files:
pose_tracker.reset()
process_video(video_path, pose_tracker, output_format, save_video, save_images, display_detection, frame_range)
else:
# Process image folders
logging.info(f'Found image folders with extension {vid_img_extension}.')
image_folders = [f for f in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, f))]
for image_folder in image_folders:
pose_tracker.reset()
image_folder_path = os.path.join(video_dir, image_folder)
process_images(image_folder_path, vid_img_extension, pose_tracker, output_format, frame_rate, save_video, save_images, display_detection, frame_range)