#!/usr/bin/env python # -*- coding: utf-8 -*- ''' ########################################################################### ## POSE ESTIMATION ## ########################################################################### Estimate pose from a video file or a folder of images and write the results to JSON files, videos, and/or images. Results can optionally be displayed in real time. Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body) Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you need nother detection or pose models) Optionally gives consistent person ID across frames (slower but good for 2D analysis) Optionally runs detection every n frames and inbetween tracks points (faster but less accurate). If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, uses the CPU with the OpenVINO backend. INPUTS: - videos or image folders from the video directory - a Config.toml file OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - Optionally, videos and/or image files with the detected keypoints ''' ## INIT import os import glob import json import logging from tqdm import tqdm import numpy as np import cv2 import torch import onnxruntime as ort from rtmlib import PoseTracker, Body, Wholebody, BodyWithFeet, draw_skeleton from Pose2Sim.common import natural_sort_key ## AUTHORSHIP INFORMATION __author__ = "HunMin Kim, David Pagnon" __copyright__ = "Copyright 2021, Pose2Sim" __credits__ = ["HunMin Kim", "David Pagnon"] __license__ = "BSD 3-Clause License" __version__ = "0.9.4" __maintainer__ = "David Pagnon" __email__ = "contact@david-pagnon.com" __status__ = "Development" ## FUNCTIONS def save_to_openpose(json_file_path, keypoints, scores): ''' Save the keypoints and scores to a JSON file in the OpenPose format INPUTS: - json_file_path: Path to save the JSON file - keypoints: Detected keypoints - scores: Confidence scores for each keypoint OUTPUTS: - JSON file with the detected keypoints and confidence scores in the OpenPose format ''' # Prepare keypoints with confidence scores for JSON output nb_detections = len(keypoints) # print('results: ', keypoints, scores) detections = [] for i in range(nb_detections): # nb of detected people keypoints_with_confidence_i = [] for kp, score in zip(keypoints[i], scores[i]): keypoints_with_confidence_i.extend([kp[0].item(), kp[1].item(), score.item()]) detections.append({ "person_id": [-1], "pose_keypoints_2d": keypoints_with_confidence_i, "face_keypoints_2d": [], "hand_left_keypoints_2d": [], "hand_right_keypoints_2d": [], "pose_keypoints_3d": [], "face_keypoints_3d": [], "hand_left_keypoints_3d": [], "hand_right_keypoints_3d": [] }) # Create JSON output structure json_output = {"version": 1.3, "people": detections} # Save JSON output for each frame json_output_dir = os.path.abspath(os.path.join(json_file_path, '..')) if not os.path.isdir(json_output_dir): os.makedirs(json_output_dir) with open(json_file_path, 'w') as json_file: json.dump(json_output, json_file) def process_video(video_path, pose_tracker, tracking, output_format, save_video, save_images, display_detection, frame_range): ''' Estimate pose from a video file INPUTS: - video_path: str. Path to the input video file - pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib - tracking: bool. Whether to give consistent person ID across frames - output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut') - save_video: bool. Whether to save the output video - save_images: bool. Whether to save the output images - display_detection: bool. Whether to show real-time visualization - frame_range: list. Range of frames to process OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - if save_video: Video file with the detected keypoints and confidence scores drawn on the frames - if save_images: Image files with the detected keypoints and confidence scores drawn on the frames ''' try: cap = cv2.VideoCapture(video_path) cap.read() if cap.read()[0] == False: raise except: raise NameError(f"{video_path} is not a video. Images must be put in one subdirectory per camera.") pose_dir = os.path.abspath(os.path.join(video_path, '..', '..', 'pose')) if not os.path.isdir(pose_dir): os.makedirs(pose_dir) video_name_wo_ext = os.path.splitext(os.path.basename(video_path))[0] json_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_json') output_video_path = os.path.join(pose_dir, f'{video_name_wo_ext}_pose.mp4') img_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_img') if save_video: # Set up video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video fps = cap.get(cv2.CAP_PROP_FPS) # Get the frame rate from the raw video W, H = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Get the width and height from the raw video out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file if display_detection: cv2.namedWindow(f"Pose Estimation {os.path.basename(video_path)}", cv2.WINDOW_NORMAL + cv2.WINDOW_KEEPRATIO) frame_idx = 0 cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) f_range = [[total_frames] if frame_range==[] else frame_range][0] with tqdm(total=total_frames, desc=f'Processing {os.path.basename(video_path)}') as pbar: while cap.isOpened(): # print('\nFrame ', frame_idx) success, frame = cap.read() if not success: break if frame_idx in range(*f_range): # Perform pose estimation on the frame keypoints, scores = pose_tracker(frame) # Reorder keypoints, scores if tracking: max_id = max(pose_tracker.track_ids_last_frame) num_frames, num_points, num_coordinates = keypoints.shape keypoints_filled = np.zeros((max_id+1, num_points, num_coordinates)) scores_filled = np.zeros((max_id+1, num_points)) keypoints_filled[pose_tracker.track_ids_last_frame] = keypoints scores_filled[pose_tracker.track_ids_last_frame] = scores keypoints = keypoints_filled scores = scores_filled # Save to json if 'openpose' in output_format: json_file_path = os.path.join(json_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.json') save_to_openpose(json_file_path, keypoints, scores) # Draw skeleton on the frame if display_detection or save_video or save_images: img_show = frame.copy() img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low if display_detection: cv2.imshow(f"Pose Estimation {os.path.basename(video_path)}", img_show) if cv2.waitKey(1) & 0xFF == ord('q'): break if save_video: out.write(img_show) if save_images: if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir) cv2.imwrite(os.path.join(img_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.png'), img_show) frame_idx += 1 pbar.update(1) cap.release() if save_video: out.release() logging.info(f"--> Output video saved to {output_video_path}.") if save_images: logging.info(f"--> Output images saved to {img_output_dir}.") if display_detection: cv2.destroyAllWindows() def process_images(image_folder_path, vid_img_extension, pose_tracker, tracking, output_format, fps, save_video, save_images, display_detection, frame_range): ''' Estimate pose estimation from a folder of images INPUTS: - image_folder_path: str. Path to the input image folder - vid_img_extension: str. Extension of the image files - pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib - tracking: bool. Whether to give consistent person ID across frames - output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut') - save_video: bool. Whether to save the output video - save_images: bool. Whether to save the output images - display_detection: bool. Whether to show real-time visualization - frame_range: list. Range of frames to process OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - if save_video: Video file with the detected keypoints and confidence scores drawn on the frames - if save_images: Image files with the detected keypoints and confidence scores drawn on the frames ''' pose_dir = os.path.abspath(os.path.join(image_folder_path, '..', '..', 'pose')) if not os.path.isdir(pose_dir): os.makedirs(pose_dir) json_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_json') output_video_path = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_pose.mp4') img_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_img') image_files = glob.glob(os.path.join(image_folder_path, '*'+vid_img_extension)) sorted(image_files, key=natural_sort_key) if save_video: # Set up video writer logging.warning('Using default framerate of 60 fps.') fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video W, H = cv2.imread(image_files[0]).shape[:2][::-1] # Get the width and height from the first image (assuming all images have the same size) out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file if display_detection: cv2.namedWindow(f"Pose Estimation {os.path.basename(image_folder_path)}", cv2.WINDOW_NORMAL) f_range = [[len(image_files)] if frame_range==[] else frame_range][0] for frame_idx, image_file in enumerate(tqdm(image_files, desc=f'\nProcessing {os.path.basename(img_output_dir)}')): if frame_idx in range(*f_range): try: frame = cv2.imread(image_file) except: raise NameError(f"{image_file} is not an image. Videos must be put in the video directory, not in subdirectories.") # Perform pose estimation on the image keypoints, scores = pose_tracker(frame) # Reorder keypoints, scores if tracking: max_id = max(pose_tracker.track_ids_last_frame) num_frames, num_points, num_coordinates = keypoints.shape keypoints_filled = np.zeros((max_id+1, num_points, num_coordinates)) scores_filled = np.zeros((max_id+1, num_points)) keypoints_filled[pose_tracker.track_ids_last_frame] = keypoints scores_filled[pose_tracker.track_ids_last_frame] = scores keypoints = keypoints_filled scores = scores_filled # Extract frame number from the filename if 'openpose' in output_format: json_file_path = os.path.join(json_output_dir, f"{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.json") save_to_openpose(json_file_path, keypoints, scores) # Draw skeleton on the image if display_detection or save_video or save_images: img_show = frame.copy() img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low if display_detection: cv2.imshow(f"Pose Estimation {os.path.basename(image_folder_path)}", img_show) if cv2.waitKey(1) & 0xFF == ord('q'): break if save_video: out.write(img_show) if save_images: if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir) cv2.imwrite(os.path.join(img_output_dir, f'{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.png'), img_show) if save_video: logging.info(f"--> Output video saved to {output_video_path}.") if save_images: logging.info(f"--> Output images saved to {img_output_dir}.") if display_detection: cv2.destroyAllWindows() def rtm_estimator(config_dict): ''' Estimate pose from a video file or a folder of images and write the results to JSON files, videos, and/or images. Results can optionally be displayed in real time. Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body) Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you need nother detection or pose models) Optionally gives consistent person ID across frames (slower but good for 2D analysis) Optionally runs detection every n frames and inbetween tracks points (faster but less accurate). If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, uses the CPU with the OpenVINO backend. INPUTS: - videos or image folders from the video directory - a Config.toml file OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - Optionally, videos and/or image files with the detected keypoints ''' # Read config project_dir = config_dict['project']['project_dir'] # if batch session_dir = os.path.realpath(os.path.join(project_dir, '..')) # if single trial session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd() frame_range = config_dict.get('project').get('frame_range') video_dir = os.path.join(project_dir, 'videos') pose_dir = os.path.join(project_dir, 'pose') pose_model = config_dict['pose']['pose_model'] mode = config_dict['pose']['mode'] # lightweight, balanced, performance vid_img_extension = config_dict['pose']['vid_img_extension'] output_format = config_dict['pose']['output_format'] save_video = True if 'to_video' in config_dict['pose']['save_video'] else False save_images = True if 'to_images' in config_dict['pose']['save_video'] else False display_detection = config_dict['pose']['display_detection'] overwrite_pose = config_dict['pose']['overwrite_pose'] det_frequency = config_dict['pose']['det_frequency'] tracking = config_dict['pose']['tracking'] # Determine frame rate video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension)) frame_rate = config_dict.get('project').get('frame_rate') if frame_rate == 'auto': try: cap = cv2.VideoCapture(video_files[0]) cap.read() if cap.read()[0] == False: raise except: frame_rate = 60 # If CUDA is available, use it with ONNXRuntime backend; else use CPU with openvino if 'CUDAExecutionProvider' in ort.get_available_providers() and torch.cuda.is_available(): device = 'cuda' backend = 'onnxruntime' logging.info(f"\nValid CUDA installation found: using ONNXRuntime backend with GPU.") elif 'MPSExecutionProvider' in ort.get_available_providers() or 'CoreMLExecutionProvider' in ort.get_available_providers(): device = 'mps' backend = 'onnxruntime' logging.info(f"\nValid MPS installation found: using ONNXRuntime backend with GPU.") else: device = 'cpu' backend = 'openvino' logging.info(f"\nNo valid CUDA installation found: using OpenVINO backend with CPU.") if det_frequency>1: logging.info(f'Inference run only every {det_frequency} frames. Inbetween, pose estimation tracks previously detected points.') elif det_frequency==1: logging.info(f'Inference run on every single frame.') else: raise ValueError(f"Invalid det_frequency: {det_frequency}. Must be an integer greater or equal to 1.") if tracking: logging.info(f'Pose estimation will attempt to give consistent person IDs across frames.\n') # Select the appropriate model based on the model_type if pose_model.upper() == 'HALPE_26': ModelClass = BodyWithFeet logging.info(f"Using HALPE_26 model (body and feet) for pose estimation.") elif pose_model.upper() == 'COCO_133': ModelClass = Wholebody logging.info(f"Using COCO_133 model (body, feet, hands, and face) for pose estimation.") elif pose_model.upper() == 'COCO_17': ModelClass = Body # 26 keypoints(halpe26) logging.info(f"Using COCO_17 model (body) for pose estimation.") else: raise ValueError(f"Invalid model_type: {pose_model}. Must be 'HALPE_26', 'COCO_133', or 'COCO_17'. Use another network (MMPose, DeepLabCut, OpenPose, AlphaPose, BlazePose...) and convert the output files if you need another model. See documentation.") logging.info(f'Mode: {mode}.\n') # Initialize the pose tracker pose_tracker = PoseTracker( ModelClass, det_frequency=det_frequency, mode=mode, backend=backend, device=device, tracking=tracking, to_openpose=False) logging.info('\nEstimating pose...') try: pose_listdirs_names = next(os.walk(pose_dir))[1] os.listdir(os.path.join(pose_dir, pose_listdirs_names[0]))[0] if not overwrite_pose: logging.info('Skipping pose estimation as it has already been done. Set overwrite_pose to true in Config.toml if you want to run it again.') else: logging.info('Overwriting previous pose estimation. Set overwrite_pose to false in Config.toml if you want to keep the previous results.') raise except: video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension)) if not len(video_files) == 0: # Process video files logging.info(f'Found video files with extension {vid_img_extension}.') for video_path in video_files: pose_tracker.reset() process_video(video_path, pose_tracker, tracking, output_format, save_video, save_images, display_detection, frame_range) else: # Process image folders logging.info(f'Found image folders with extension {vid_img_extension}.') image_folders = [f for f in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, f))] for image_folder in image_folders: pose_tracker.reset() image_folder_path = os.path.join(video_dir, image_folder) process_images(image_folder_path, vid_img_extension, pose_tracker, tracking, output_format, frame_rate, save_video, save_images, display_detection, frame_range)