#!/usr/bin/env python # -*- coding: utf-8 -*- ''' ########################################################################### ## POSE ESTIMATION ## ########################################################################### Estimate pose from a video file or a folder of images and write the results to JSON files, videos, and/or images. Results can optionally be displayed in real time. Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body) Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you need nother detection or pose models) Optionally gives consistent person ID across frames (slower but good for 2D analysis) Optionally runs detection every n frames and inbetween tracks points (faster but less accurate). If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, uses the CPU with the OpenVINO backend. INPUTS: - videos or image folders from the video directory - a Config.toml file OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - Optionally, videos and/or image files with the detected keypoints ''' ## INIT import os import glob import json import logging from tqdm import tqdm import numpy as np import cv2 from rtmlib import PoseTracker, Body, Wholebody, BodyWithFeet, draw_skeleton from Pose2Sim.common import natural_sort_key ## AUTHORSHIP INFORMATION __author__ = "HunMin Kim, David Pagnon" __copyright__ = "Copyright 2021, Pose2Sim" __credits__ = ["HunMin Kim", "David Pagnon"] __license__ = "BSD 3-Clause License" __version__ = "0.9.4" __maintainer__ = "David Pagnon" __email__ = "contact@david-pagnon.com" __status__ = "Development" ## FUNCTIONS def save_to_openpose(json_file_path, keypoints, scores): ''' Save the keypoints and scores to a JSON file in the OpenPose format INPUTS: - json_file_path: Path to save the JSON file - keypoints: Detected keypoints - scores: Confidence scores for each keypoint OUTPUTS: - JSON file with the detected keypoints and confidence scores in the OpenPose format ''' # Prepare keypoints with confidence scores for JSON output nb_detections = len(keypoints) # print('results: ', keypoints, scores) detections = [] for i in range(nb_detections): # nb of detected people keypoints_with_confidence_i = [] for kp, score in zip(keypoints[i], scores[i]): keypoints_with_confidence_i.extend([kp[0].item(), kp[1].item(), score.item()]) detections.append({ "person_id": [-1], "pose_keypoints_2d": keypoints_with_confidence_i, "face_keypoints_2d": [], "hand_left_keypoints_2d": [], "hand_right_keypoints_2d": [], "pose_keypoints_3d": [], "face_keypoints_3d": [], "hand_left_keypoints_3d": [], "hand_right_keypoints_3d": [] }) # Create JSON output structure json_output = {"version": 1.3, "people": detections} # Save JSON output for each frame json_output_dir = os.path.abspath(os.path.join(json_file_path, '..')) if not os.path.isdir(json_output_dir): os.makedirs(json_output_dir) with open(json_file_path, 'w') as json_file: json.dump(json_output, json_file) def process_video(video_path, pose_tracker, tracking, output_format, save_video, save_images, display_detection, frame_range): ''' Estimate pose from a video file INPUTS: - video_path: str. Path to the input video file - pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib - tracking: bool. Whether to give consistent person ID across frames - output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut') - save_video: bool. Whether to save the output video - save_images: bool. Whether to save the output images - display_detection: bool. Whether to show real-time visualization - frame_range: list. Range of frames to process OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - if save_video: Video file with the detected keypoints and confidence scores drawn on the frames - if save_images: Image files with the detected keypoints and confidence scores drawn on the frames ''' try: cap = cv2.VideoCapture(video_path) cap.read() if cap.read()[0] == False: raise except: raise NameError(f"{video_path} is not a video. Images must be put in one subdirectory per camera.") pose_dir = os.path.abspath(os.path.join(video_path, '..', '..', 'pose')) if not os.path.isdir(pose_dir): os.makedirs(pose_dir) video_name_wo_ext = os.path.splitext(os.path.basename(video_path))[0] json_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_json') output_video_path = os.path.join(pose_dir, f'{video_name_wo_ext}_pose.mp4') img_output_dir = os.path.join(pose_dir, f'{video_name_wo_ext}_img') if save_video: # Set up video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video fps = cap.get(cv2.CAP_PROP_FPS) # Get the frame rate from the raw video W, H = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Get the width and height from the raw video out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file if display_detection: cv2.namedWindow(f"Pose Estimation {os.path.basename(video_path)}", cv2.WINDOW_NORMAL + cv2.WINDOW_KEEPRATIO) frame_idx = 0 cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) f_range = [[total_frames] if frame_range==[] else frame_range][0] with tqdm(total=total_frames, desc=f'Processing {os.path.basename(video_path)}') as pbar: while cap.isOpened(): # print('\nFrame ', frame_idx) success, frame = cap.read() if not success: break if frame_idx in range(*f_range): # Perform pose estimation on the frame keypoints, scores = pose_tracker(frame) # Reorder keypoints, scores if tracking: max_id = max(pose_tracker.track_ids_last_frame) num_frames, num_points, num_coordinates = keypoints.shape keypoints_filled = np.zeros((max_id+1, num_points, num_coordinates)) scores_filled = np.zeros((max_id+1, num_points)) keypoints_filled[pose_tracker.track_ids_last_frame] = keypoints scores_filled[pose_tracker.track_ids_last_frame] = scores keypoints = keypoints_filled scores = scores_filled # Save to json if 'openpose' in output_format: json_file_path = os.path.join(json_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.json') save_to_openpose(json_file_path, keypoints, scores) # Draw skeleton on the frame if display_detection or save_video or save_images: img_show = frame.copy() img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low if display_detection: cv2.imshow(f"Pose Estimation {os.path.basename(video_path)}", img_show) if cv2.waitKey(1) & 0xFF == ord('q'): break if save_video: out.write(img_show) if save_images: if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir) cv2.imwrite(os.path.join(img_output_dir, f'{video_name_wo_ext}_{frame_idx:06d}.png'), img_show) frame_idx += 1 pbar.update(1) cap.release() if save_video: out.release() logging.info(f"--> Output video saved to {output_video_path}.") if save_images: logging.info(f"--> Output images saved to {img_output_dir}.") if display_detection: cv2.destroyAllWindows() def process_images(image_folder_path, vid_img_extension, pose_tracker, tracking, output_format, fps, save_video, save_images, display_detection, frame_range): ''' Estimate pose estimation from a folder of images INPUTS: - image_folder_path: str. Path to the input image folder - vid_img_extension: str. Extension of the image files - pose_tracker: PoseTracker. Initialized pose tracker object from RTMLib - tracking: bool. Whether to give consistent person ID across frames - output_format: str. Output format for the pose estimation results ('openpose', 'mmpose', 'deeplabcut') - save_video: bool. Whether to save the output video - save_images: bool. Whether to save the output images - display_detection: bool. Whether to show real-time visualization - frame_range: list. Range of frames to process OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - if save_video: Video file with the detected keypoints and confidence scores drawn on the frames - if save_images: Image files with the detected keypoints and confidence scores drawn on the frames ''' pose_dir = os.path.abspath(os.path.join(image_folder_path, '..', '..', 'pose')) if not os.path.isdir(pose_dir): os.makedirs(pose_dir) json_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_json') output_video_path = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_pose.mp4') img_output_dir = os.path.join(pose_dir, f'{os.path.basename(image_folder_path)}_img') image_files = glob.glob(os.path.join(image_folder_path, '*'+vid_img_extension)) sorted(image_files, key=natural_sort_key) if save_video: # Set up video writer logging.warning('Using default framerate of 60 fps.') fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video W, H = cv2.imread(image_files[0]).shape[:2][::-1] # Get the width and height from the first image (assuming all images have the same size) out = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H)) # Create the output video file if display_detection: cv2.namedWindow(f"Pose Estimation {os.path.basename(image_folder_path)}", cv2.WINDOW_NORMAL) f_range = [[len(image_files)] if frame_range==[] else frame_range][0] for frame_idx, image_file in enumerate(tqdm(image_files, desc=f'\nProcessing {os.path.basename(img_output_dir)}')): if frame_idx in range(*f_range): try: frame = cv2.imread(image_file) except: raise NameError(f"{image_file} is not an image. Videos must be put in the video directory, not in subdirectories.") # Perform pose estimation on the image keypoints, scores = pose_tracker(frame) # Reorder keypoints, scores if tracking: max_id = max(pose_tracker.track_ids_last_frame) num_frames, num_points, num_coordinates = keypoints.shape keypoints_filled = np.zeros((max_id+1, num_points, num_coordinates)) scores_filled = np.zeros((max_id+1, num_points)) keypoints_filled[pose_tracker.track_ids_last_frame] = keypoints scores_filled[pose_tracker.track_ids_last_frame] = scores keypoints = keypoints_filled scores = scores_filled # Extract frame number from the filename if 'openpose' in output_format: json_file_path = os.path.join(json_output_dir, f"{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.json") save_to_openpose(json_file_path, keypoints, scores) # Draw skeleton on the image if display_detection or save_video or save_images: img_show = frame.copy() img_show = draw_skeleton(img_show, keypoints, scores, kpt_thr=0.1) # maybe change this value if 0.1 is too low if display_detection: cv2.imshow(f"Pose Estimation {os.path.basename(image_folder_path)}", img_show) if cv2.waitKey(1) & 0xFF == ord('q'): break if save_video: out.write(img_show) if save_images: if not os.path.isdir(img_output_dir): os.makedirs(img_output_dir) cv2.imwrite(os.path.join(img_output_dir, f'{os.path.splitext(os.path.basename(image_file))[0]}_{frame_idx:06d}.png'), img_show) if save_video: logging.info(f"--> Output video saved to {output_video_path}.") if save_images: logging.info(f"--> Output images saved to {img_output_dir}.") if display_detection: cv2.destroyAllWindows() def rtm_estimator(config_dict): ''' Estimate pose from a video file or a folder of images and write the results to JSON files, videos, and/or images. Results can optionally be displayed in real time. Supported models: HALPE_26 (default, body and feet), COCO_133 (body, feet, hands), COCO_17 (body) Supported modes: lightweight, balanced, performance (edit paths at rtmlib/tools/solutions if you need nother detection or pose models) Optionally gives consistent person ID across frames (slower but good for 2D analysis) Optionally runs detection every n frames and inbetween tracks points (faster but less accurate). If a valid cuda installation is detected, uses the GPU with the ONNXRuntime backend. Otherwise, uses the CPU with the OpenVINO backend. INPUTS: - videos or image folders from the video directory - a Config.toml file OUTPUTS: - JSON files with the detected keypoints and confidence scores in the OpenPose format - Optionally, videos and/or image files with the detected keypoints ''' # Read config project_dir = config_dict['project']['project_dir'] # if batch session_dir = os.path.realpath(os.path.join(project_dir, '..')) # if single trial session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd() frame_range = config_dict.get('project').get('frame_range') video_dir = os.path.join(project_dir, 'videos') pose_dir = os.path.join(project_dir, 'pose') pose_model = config_dict['pose']['pose_model'] mode = config_dict['pose']['mode'] # lightweight, balanced, performance vid_img_extension = config_dict['pose']['vid_img_extension'] output_format = config_dict['pose']['output_format'] save_video = True if 'to_video' in config_dict['pose']['save_video'] else False save_images = True if 'to_images' in config_dict['pose']['save_video'] else False display_detection = config_dict['pose']['display_detection'] overwrite_pose = config_dict['pose']['overwrite_pose'] det_frequency = config_dict['pose']['det_frequency'] tracking = config_dict['pose']['tracking'] # Determine frame rate video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension)) frame_rate = config_dict.get('project').get('frame_rate') if frame_rate == 'auto': try: cap = cv2.VideoCapture(video_files[0]) cap.read() if cap.read()[0] == False: raise except: frame_rate = 60 # If CUDA is available, use it with ONNXRuntime backend; else use CPU with openvino try: import torch import onnxruntime as ort if torch.cuda.is_available() and 'CUDAExecutionProvider' in ort.get_available_providers(): device = 'cuda' backend = 'onnxruntime' logging.info(f"\nValid CUDA installation found: using ONNXRuntime backend with GPU.") else: raise except: try: import onnxruntime as ort if 'MPSExecutionProvider' in ort.get_available_providers() or 'CoreMLExecutionProvider' in ort.get_available_providers(): device = 'mps' backend = 'onnxruntime' logging.info(f"\nValid MPS installation found: using ONNXRuntime backend with GPU.") else: raise except: device = 'cpu' backend = 'openvino' logging.info(f"\nNo valid CUDA installation found: using OpenVINO backend with CPU.") if det_frequency>1: logging.info(f'Inference run only every {det_frequency} frames. Inbetween, pose estimation tracks previously detected points.') elif det_frequency==1: logging.info(f'Inference run on every single frame.') else: raise ValueError(f"Invalid det_frequency: {det_frequency}. Must be an integer greater or equal to 1.") if tracking: logging.info(f'Pose estimation will attempt to give consistent person IDs across frames.\n') # Select the appropriate model based on the model_type if pose_model.upper() == 'HALPE_26': ModelClass = BodyWithFeet logging.info(f"Using HALPE_26 model (body and feet) for pose estimation.") elif pose_model.upper() == 'COCO_133': ModelClass = Wholebody logging.info(f"Using COCO_133 model (body, feet, hands, and face) for pose estimation.") elif pose_model.upper() == 'COCO_17': ModelClass = Body # 26 keypoints(halpe26) logging.info(f"Using COCO_17 model (body) for pose estimation.") else: raise ValueError(f"Invalid model_type: {pose_model}. Must be 'HALPE_26', 'COCO_133', or 'COCO_17'. Use another network (MMPose, DeepLabCut, OpenPose, AlphaPose, BlazePose...) and convert the output files if you need another model. See documentation.") logging.info(f'Mode: {mode}.\n') # Initialize the pose tracker pose_tracker = PoseTracker( ModelClass, det_frequency=det_frequency, mode=mode, backend=backend, device=device, tracking=tracking, to_openpose=False) logging.info('\nEstimating pose...') try: pose_listdirs_names = next(os.walk(pose_dir))[1] os.listdir(os.path.join(pose_dir, pose_listdirs_names[0]))[0] if not overwrite_pose: logging.info('Skipping pose estimation as it has already been done. Set overwrite_pose to true in Config.toml if you want to run it again.') else: logging.info('Overwriting previous pose estimation. Set overwrite_pose to false in Config.toml if you want to keep the previous results.') raise except: video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension)) if not len(video_files) == 0: # Process video files logging.info(f'Found video files with extension {vid_img_extension}.') for video_path in video_files: pose_tracker.reset() process_video(video_path, pose_tracker, tracking, output_format, save_video, save_images, display_detection, frame_range) else: # Process image folders logging.info(f'Found image folders with extension {vid_img_extension}.') image_folders = [f for f in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, f))] for image_folder in image_folders: pose_tracker.reset() image_folder_path = os.path.join(video_dir, image_folder) process_images(image_folder_path, vid_img_extension, pose_tracker, tracking, output_format, frame_rate, save_video, save_images, display_detection, frame_range)