af294391d9
* synchronization for multi-person * ignore infinite value * Fix mismatch issue between rect and list of bboxes. But this should clean up * solved indexing issue * for more simple and clear * Clean up v3 --------- Co-authored-by: David PAGNON <contact@david-pagnon.com>
759 lines
33 KiB
Python
759 lines
33 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
'''
|
|
#########################################
|
|
## SYNCHRONIZE CAMERAS ##
|
|
#########################################
|
|
|
|
Post-synchronize your cameras in case they are not natively synchronized.
|
|
|
|
For each camera, computes mean vertical speed for the chosen keypoints,
|
|
and find the time offset for which their correlation is highest.
|
|
|
|
Depending on the analysed motion, all keypoints can be taken into account,
|
|
or a list of them, or the right or left side.
|
|
All frames can be considered, or only those around a specific time (typically,
|
|
the time when there is a single participant in the scene performing a clear vertical motion).
|
|
Has also been successfully tested for synchronizing random walkswith random walks.
|
|
|
|
Keypoints whose likelihood is too low are filtered out; and the remaining ones are
|
|
filtered with a butterworth filter.
|
|
|
|
INPUTS:
|
|
- json files from each camera folders
|
|
- a Config.toml file
|
|
- a skeleton model
|
|
|
|
OUTPUTS:
|
|
- synchronized json files for each camera
|
|
'''
|
|
|
|
## INIT
|
|
import numpy as np
|
|
import pandas as pd
|
|
import cv2
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.widgets import Slider
|
|
from matplotlib import patheffects
|
|
from scipy import signal
|
|
from scipy import interpolate
|
|
import json
|
|
import os
|
|
import glob
|
|
import fnmatch
|
|
import re
|
|
import shutil
|
|
from anytree import RenderTree
|
|
from anytree.importer import DictImporter
|
|
import logging
|
|
|
|
from Pose2Sim.common import sort_stringlist_by_last_number, bounding_boxes
|
|
from Pose2Sim.skeletons import *
|
|
|
|
|
|
## AUTHORSHIP INFORMATION
|
|
__author__ = "David Pagnon, HunMin Kim"
|
|
__copyright__ = "Copyright 2021, Pose2Sim"
|
|
__credits__ = ["David Pagnon"]
|
|
__license__ = "BSD 3-Clause License"
|
|
__version__ = "0.9.4"
|
|
__maintainer__ = "David Pagnon"
|
|
__email__ = "contact@david-pagnon.com"
|
|
__status__ = "Development"
|
|
|
|
|
|
# FUNCTIONS
|
|
def load_frame_and_bounding_boxes(cap, frame_number, frame_to_json, pose_dir, json_dir_name):
|
|
'''
|
|
Given a video capture object or a list of image files and a frame number,
|
|
load the frame (or image) and corresponding bounding boxes.
|
|
|
|
INPUTS:
|
|
- cap: cv2.VideoCapture object or list of image file paths.
|
|
- frame_number: int. The frame number to load.
|
|
- frame_to_json: dict. Mapping from frame numbers to JSON file names.
|
|
- pose_dir: str. Path to the directory containing pose data.
|
|
- json_dir_name: str. Name of the JSON directory for the current camera.
|
|
|
|
OUTPUTS:
|
|
- frame_rgb: The RGB image of the frame or image.
|
|
- bounding_boxes_list: List of bounding boxes for the frame/image.
|
|
'''
|
|
|
|
# Case 1: If input is a video file (cv2.VideoCapture object)
|
|
if isinstance(cap, cv2.VideoCapture):
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
return None, []
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
# Case 2: If input is a list of image file paths
|
|
elif isinstance(cap, list):
|
|
if frame_number >= len(cap):
|
|
return None, []
|
|
image_path = cap[frame_number]
|
|
frame = cv2.imread(image_path)
|
|
if frame is None:
|
|
return None, []
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
else:
|
|
raise ValueError("Input must be either a video capture object or a list of image file paths.")
|
|
|
|
# Get the corresponding JSON file for bounding boxes
|
|
json_file_name = frame_to_json.get(frame_number)
|
|
bounding_boxes_list = []
|
|
if json_file_name:
|
|
json_file_path = os.path.join(pose_dir, json_dir_name, json_file_name)
|
|
bounding_boxes_list.extend(bounding_boxes(json_file_path))
|
|
|
|
return frame_rgb, bounding_boxes_list
|
|
|
|
|
|
def draw_bounding_boxes_and_annotations(ax, bounding_boxes_list, rects, annotations):
|
|
'''
|
|
Draws the bounding boxes and annotations on the given axes.
|
|
|
|
INPUTS:
|
|
- ax: The axes object to draw on.
|
|
- bounding_boxes_list: list of tuples. Each tuple contains (x_min, y_min, x_max, y_max) of a bounding box.
|
|
- rects: List to store rectangle patches representing bounding boxes.
|
|
- annotations: List to store text annotations for each bounding box.
|
|
|
|
OUTPUTS:
|
|
- None. Modifies rects and annotations in place.
|
|
'''
|
|
|
|
# Clear existing rectangles and annotations
|
|
for items in [rects, annotations]:
|
|
for item in items:
|
|
item.remove()
|
|
items.clear()
|
|
|
|
# Draw bounding boxes and annotations
|
|
for idx, (x_min, y_min, x_max, y_max) in enumerate(bounding_boxes_list):
|
|
if not np.isfinite([x_min, y_min, x_max, y_max]).all():
|
|
continue # Skip invalid bounding boxes for solve issue(posx and posy should be finite values)
|
|
|
|
rect = plt.Rectangle(
|
|
(x_min, y_min), x_max - x_min, y_max - y_min,
|
|
linewidth=1, edgecolor='white', facecolor=(1, 1, 1, 0.1),
|
|
linestyle='-', path_effects=[patheffects.withSimplePatchShadow()], zorder=2
|
|
) # add shadow
|
|
ax.add_patch(rect)
|
|
rects.append(rect)
|
|
|
|
annotation = ax.text(
|
|
x_min, y_min - 10, f'Person {idx}', color='white', fontsize=7, fontweight='normal',
|
|
bbox=dict(facecolor='black', alpha=0.5, boxstyle='round,pad=0.3'), zorder=3
|
|
)
|
|
annotations.append(annotation)
|
|
|
|
|
|
def reset_styles(rects, annotations):
|
|
'''
|
|
Resets the styles of the rectangles and annotations to default.
|
|
|
|
INPUTS:
|
|
- rects: List of rectangle patches representing bounding boxes.
|
|
- annotations: List of text annotations for each bounding box.
|
|
|
|
OUTPUTS:
|
|
- None. Modifies rects and annotations in place.
|
|
'''
|
|
|
|
for rect, annotation in zip(rects, annotations):
|
|
rect.set_linewidth(1)
|
|
rect.set_edgecolor('white')
|
|
rect.set_facecolor((1, 1, 1, 0.1))
|
|
annotation.set_fontsize(7)
|
|
annotation.set_fontweight('normal')
|
|
|
|
|
|
def highlight_bounding_box(rect, annotation):
|
|
'''
|
|
Highlights a rectangle and its annotation.
|
|
|
|
INPUTS:
|
|
- rect: Rectangle patch to highlight.
|
|
- annotation: Text annotation to highlight.
|
|
|
|
OUTPUTS:
|
|
- None. Modifies rect and annotation in place.
|
|
'''
|
|
|
|
rect.set_linewidth(2)
|
|
rect.set_edgecolor('yellow')
|
|
rect.set_facecolor((1, 1, 0, 0.2))
|
|
annotation.set_fontsize(8)
|
|
annotation.set_fontweight('bold')
|
|
|
|
|
|
def on_hover(event, fig, rects, annotations, bounding_boxes_list):
|
|
'''
|
|
Highlights the bounding box and annotation when the mouse hovers over a person in the plot.
|
|
|
|
INPUTS:
|
|
- event: The hover event.
|
|
- fig: The figure object.
|
|
- rects: The rectangles representing bounding boxes.
|
|
- annotations: The annotations corresponding to each bounding box.
|
|
- bounding_boxes_list: List of tuples containing bounding box coordinates.
|
|
|
|
OUTPUTS:
|
|
- None. This function updates the plot in place.
|
|
'''
|
|
|
|
if event.xdata is None or event.ydata is None:
|
|
return
|
|
|
|
# Reset styles of all rectangles and annotations
|
|
reset_styles(rects, annotations)
|
|
|
|
# Find and highlight the bounding box under the mouse cursor
|
|
# remove NaN bounding boxes for make sure matching with rects
|
|
bounding_boxes_list = [bbox for bbox in bounding_boxes_list if np.all(np.isfinite(bbox)) and not np.any(np.isnan(bbox))]
|
|
for idx, (x_min, y_min, x_max, y_max) in enumerate(bounding_boxes_list):
|
|
if x_min <= event.xdata <= x_max and y_min <= event.ydata <= y_max:
|
|
highlight_bounding_box(rects[idx], annotations[idx])
|
|
break
|
|
|
|
fig.canvas.draw_idle()
|
|
|
|
|
|
def on_click(event, ax, bounding_boxes_list, selected_idx_container):
|
|
'''
|
|
Detects if a bounding box is clicked and records the index of the selected person.
|
|
|
|
INPUTS:
|
|
- event: The click event.
|
|
- ax: The axes object of the plot.
|
|
- bounding_boxes_list: List of tuples containing bounding box coordinates.
|
|
- selected_idx_container: List with one element to store the selected person's index.
|
|
|
|
OUTPUTS:
|
|
- None. Updates selected_idx_container[0] with the index of the selected person.
|
|
'''
|
|
|
|
if event.inaxes != ax or event.xdata is None or event.ydata is None:
|
|
return
|
|
|
|
for idx, (x_min, y_min, x_max, y_max) in enumerate(bounding_boxes_list):
|
|
if x_min <= event.xdata <= x_max and y_min <= event.ydata <= y_max:
|
|
selected_idx_container[0] = idx
|
|
plt.close()
|
|
break
|
|
|
|
|
|
def update_play(cap, image, slider, frame_to_json, pose_dir, json_dir_name, rects, annotations, bounding_boxes_list, ax, fig):
|
|
'''
|
|
Updates the plot when the slider value changes.
|
|
|
|
INPUTS:
|
|
- cap: cv2.VideoCapture. The video capture object.
|
|
- image: The image object in the plot.
|
|
- slider: The slider widget controlling the frame number.
|
|
- frame_to_json: dict. Mapping from frame numbers to JSON file names.
|
|
- pose_dir: str. Path to the directory containing pose data.
|
|
- json_dir_name: str. Name of the JSON directory for the current camera.
|
|
- rects: List of rectangle patches representing bounding boxes.
|
|
- annotations: List of text annotations for each bounding box.
|
|
- bounding_boxes_list: List of tuples to store bounding boxes for the current frame.
|
|
- ax: The axes object of the plot.
|
|
- fig: The figure object containing the plot.
|
|
|
|
OUTPUTS:
|
|
- None. Updates the plot with the new frame, bounding boxes, and annotations.
|
|
'''
|
|
|
|
frame_number = int(slider.val)
|
|
|
|
frame_rgb, bounding_boxes_list_new = load_frame_and_bounding_boxes(cap, frame_number, frame_to_json, pose_dir, json_dir_name)
|
|
if frame_rgb is None:
|
|
return
|
|
|
|
# Update frame image
|
|
image.set_data(frame_rgb)
|
|
|
|
# Update bounding boxes
|
|
bounding_boxes_list.clear()
|
|
bounding_boxes_list.extend(bounding_boxes_list_new)
|
|
|
|
# Draw bounding boxes and annotations
|
|
draw_bounding_boxes_and_annotations(ax, bounding_boxes_list, rects, annotations)
|
|
fig.canvas.draw_idle()
|
|
|
|
|
|
def get_selected_id_list(multi_person, vid_or_img_files, cam_names, cam_nb, json_files_names_range, search_around_frames, pose_dir, json_dirs_names):
|
|
'''
|
|
Allows the user to select a person from each camera by clicking on their bounding box in the video frames.
|
|
|
|
INPUTS:
|
|
- multi_person: bool. Indicates whether multiple people are present in the videos.
|
|
- vid_or_img_files: list of str. Paths to the video files for each camera or to the image directories for each camera.
|
|
- cam_names: list of str. Names of the cameras.
|
|
- cam_nb: int. Number of cameras.
|
|
- json_files_names_range: list of lists. Each sublist contains JSON file names for a camera.
|
|
- search_around_frames: list of tuples. Each tuple contains (start_frame, end_frame) for searching frames.
|
|
- pose_dir: str. Path to the directory containing pose data.
|
|
- json_dirs_names: list of str. Names of the JSON directories for each camera.
|
|
|
|
OUTPUTS:
|
|
- selected_id_list: list of int or None. List of the selected person indices for each camera.
|
|
'''
|
|
if not multi_person:
|
|
return [None] * cam_nb
|
|
|
|
else:
|
|
logging.info('Multi_person mode: selecting the person to synchronize on for each camera.')
|
|
selected_id_list = []
|
|
try: # video files
|
|
video_files_dict = {cam_name: file for cam_name in cam_names for file in vid_or_img_files if cam_name in os.path.basename(file)}
|
|
except: # image directories
|
|
video_files_dict = {cam_name: files for cam_name in cam_names for files in vid_or_img_files if cam_name in os.path.basename(files[0])}
|
|
|
|
for i, cam_name in enumerate(cam_names):
|
|
vid_or_img_files_cam = video_files_dict.get(cam_name)
|
|
if not vid_or_img_files_cam:
|
|
logging.warning(f'No video file nor image directory found for camera {cam_name}')
|
|
selected_id_list.append(None)
|
|
continue
|
|
try:
|
|
cap = cv2.VideoCapture(vid_or_img_files_cam)
|
|
if not cap.isOpened():
|
|
raise
|
|
except:
|
|
cap = vid_or_img_files_cam
|
|
|
|
frame_to_json = {int(re.split(r'(\d+)', name)[-2]): name for name in json_files_names_range[i]}
|
|
frame_number = search_around_frames[i][0]
|
|
|
|
frame_rgb, bounding_boxes_list = load_frame_and_bounding_boxes(cap, frame_number, frame_to_json, pose_dir, json_dirs_names[i])
|
|
if frame_rgb is None:
|
|
logging.warning(f'Cannot read frame {frame_number} from video {vid_or_img_files_cam}')
|
|
selected_id_list.append(None)
|
|
if isinstance(cap, cv2.VideoCapture):
|
|
cap.release()
|
|
continue
|
|
|
|
# Initialize plot
|
|
frame_height, frame_width = frame_rgb.shape[:2]
|
|
fig_width, fig_height = frame_width / 200, frame_height / 250
|
|
|
|
fig, ax = plt.subplots(figsize=(fig_width, fig_height))
|
|
ax.imshow(frame_rgb)
|
|
ax.set_title(f'{cam_name}: select the person to synchronize on', fontsize=10, color='black', pad=15)
|
|
ax.axis('off')
|
|
|
|
rects, annotations = [], []
|
|
draw_bounding_boxes_and_annotations(ax, bounding_boxes_list, rects, annotations)
|
|
|
|
selected_idx_container = [None]
|
|
|
|
# Event handling
|
|
fig.canvas.mpl_connect('motion_notify_event', lambda event: on_hover(event, fig, rects, annotations, bounding_boxes_list))
|
|
fig.canvas.mpl_connect('button_press_event', lambda event: on_click(event, ax, bounding_boxes_list, selected_idx_container))
|
|
|
|
# Add slider
|
|
ax_slider = plt.axes([ax.get_position().x0, 0.05, ax.get_position().width, 0.05])
|
|
slider = Slider(ax_slider, 'Frame ', search_around_frames[i][0], search_around_frames[i][1] - 1, valinit=frame_number, valfmt='%0.0f')
|
|
|
|
# Customize slider
|
|
slider.label.set_fontsize(10)
|
|
slider.poly.set_edgecolor((0, 0, 0, 0.5))
|
|
slider.poly.set_facecolor('lightblue')
|
|
slider.poly.set_linewidth(1)
|
|
|
|
# Connect the update function to the slider
|
|
slider.on_changed(lambda val: update_play(cap, ax.images[0], slider, frame_to_json, pose_dir, json_dirs_names[i], rects, annotations, bounding_boxes_list, ax, fig))
|
|
|
|
# Show the plot and handle events
|
|
plt.show()
|
|
cap.release()
|
|
|
|
if selected_idx_container[0] == None:
|
|
selected_idx_container[0] = 0
|
|
logging.warning(f'No person selected for camera {cam_name}: defaulting to person 0')
|
|
selected_id_list.append(selected_idx_container[0])
|
|
logging.info(f'--> Camera #{i}: selected person #{selected_idx_container[0]}')
|
|
logging.info('')
|
|
|
|
return selected_id_list
|
|
|
|
|
|
def convert_json2pandas(json_files, likelihood_threshold=0.6, keypoints_ids=[], multi_person=False, selected_id=None):
|
|
'''
|
|
Convert a list of JSON files to a pandas DataFrame.
|
|
Only takes one person in the JSON file.
|
|
|
|
INPUTS:
|
|
- json_files: list of str. Paths of the the JSON files.
|
|
- likelihood_threshold: float. Drop values if confidence is below likelihood_threshold.
|
|
- keypoints_ids: list of int. Indices of the keypoints to extract.
|
|
|
|
OUTPUTS:
|
|
- df_json_coords: dataframe. Extracted coordinates in a pandas dataframe.
|
|
'''
|
|
|
|
nb_coords = len(keypoints_ids)
|
|
json_coords = []
|
|
for j_p in json_files:
|
|
with open(j_p) as j_f:
|
|
try:
|
|
json_data_all = json.load(j_f)['people']
|
|
|
|
# # previous approach takes person #0
|
|
# json_data = json_data_all[0]
|
|
# json_data = np.array([json_data['pose_keypoints_2d'][3*i:3*i+3] for i in keypoints_ids])
|
|
|
|
# # approach based on largest mean confidence does not work if person in background is better detected
|
|
# p_conf = [np.mean(np.array([p['pose_keypoints_2d'][3*i:3*i+3] for i in keypoints_ids])[:, 2])
|
|
# if 'pose_keypoints_2d' in p else 0
|
|
# for p in json_data_all]
|
|
# max_confidence_person = json_data_all[np.argmax(p_conf)]
|
|
# json_data = np.array([max_confidence_person['pose_keypoints_2d'][3*i:3*i+3] for i in keypoints_ids])
|
|
|
|
# latest approach: uses person with largest bounding box
|
|
if not multi_person:
|
|
bbox_area = [
|
|
(keypoints[:, 0].max() - keypoints[:, 0].min()) * (keypoints[:, 1].max() - keypoints[:, 1].min())
|
|
if 'pose_keypoints_2d' in p else 0
|
|
for p in json_data_all
|
|
for keypoints in [np.array([p['pose_keypoints_2d'][3*i:3*i+3] for i in keypoints_ids])]
|
|
]
|
|
max_area_person = json_data_all[np.argmax(bbox_area)]
|
|
json_data = np.array([max_area_person['pose_keypoints_2d'][3*i:3*i+3] for i in keypoints_ids])
|
|
|
|
elif multi_person:
|
|
if selected_id is not None: # We can sfely assume that selected_id is always not greater than len(json_data_all) because padding with 0 was done in the previous step
|
|
selected_person = json_data_all[selected_id]
|
|
json_data = np.array([selected_person['pose_keypoints_2d'][3*i:3*i+3] for i in keypoints_ids])
|
|
else:
|
|
json_data = [np.nan] * nb_coords * 3
|
|
|
|
# Remove points with low confidence
|
|
json_data = np.array([j if j[2]>likelihood_threshold else [np.nan, np.nan, np.nan] for j in json_data]).ravel().tolist()
|
|
except:
|
|
# print(f'No person found in {os.path.basename(json_dir)}, frame {i}')
|
|
json_data = [np.nan] * nb_coords*3
|
|
json_coords.append(json_data)
|
|
df_json_coords = pd.DataFrame(json_coords)
|
|
|
|
return df_json_coords
|
|
|
|
|
|
def drop_col(df, col_nb):
|
|
'''
|
|
Drops every nth column from a DataFrame.
|
|
|
|
INPUTS:
|
|
- df: dataframe. The DataFrame from which columns will be dropped.
|
|
- col_nb: int. The column number to drop.
|
|
|
|
OUTPUTS:
|
|
- dataframe: DataFrame with dropped columns.
|
|
'''
|
|
|
|
idx_col = list(range(col_nb-1, df.shape[1], col_nb))
|
|
df_dropped = df.drop(idx_col, axis=1)
|
|
df_dropped.columns = range(df_dropped.columns.size)
|
|
return df_dropped
|
|
|
|
|
|
def vert_speed(df, axis='y'):
|
|
'''
|
|
Calculate the vertical speed of a DataFrame along a specified axis.
|
|
|
|
INPUTS:
|
|
- df: dataframe. DataFrame of 2D coordinates.
|
|
- axis: str. The axis along which to calculate speed. 'x', 'y', or 'z', default is 'y'.
|
|
|
|
OUTPUTS:
|
|
- df_vert_speed: DataFrame of vertical speed values.
|
|
'''
|
|
|
|
axis_dict = {'x':0, 'y':1, 'z':2}
|
|
df_diff = df.diff()
|
|
df_diff = df_diff.fillna(df_diff.iloc[1]*2)
|
|
df_vert_speed = pd.DataFrame([df_diff.loc[:, 2*k + axis_dict[axis]] for k in range(int(df_diff.shape[1] / 2))]).T # modified ( df_diff.shape[1]*2 to df_diff.shape[1] / 2 )
|
|
df_vert_speed.columns = np.arange(len(df_vert_speed.columns))
|
|
return df_vert_speed
|
|
|
|
|
|
def interpolate_zeros_nans(col, kind):
|
|
'''
|
|
Interpolate missing points (of value nan)
|
|
|
|
INPUTS:
|
|
- col: pandas column of coordinates
|
|
- kind: 'linear', 'slinear', 'quadratic', 'cubic'. Default 'cubic'
|
|
|
|
OUTPUTS:
|
|
- col_interp: interpolated pandas column
|
|
'''
|
|
|
|
mask = ~(np.isnan(col) | col.eq(0)) # true where nans or zeros
|
|
idx_good = np.where(mask)[0]
|
|
try:
|
|
f_interp = interpolate.interp1d(idx_good, col[idx_good], kind=kind, bounds_error=False)
|
|
col_interp = np.where(mask, col, f_interp(col.index))
|
|
return col_interp
|
|
except:
|
|
# print('No good values to interpolate')
|
|
return col
|
|
|
|
|
|
def time_lagged_cross_corr(camx, camy, lag_range, show=True, ref_cam_name='0', cam_name='1'):
|
|
'''
|
|
Compute the time-lagged cross-correlation between two pandas series.
|
|
|
|
INPUTS:
|
|
- camx: pandas series. Coordinates of reference camera.
|
|
- camy: pandas series. Coordinates of camera to compare.
|
|
- lag_range: int or list. Range of frames for which to compute cross-correlation.
|
|
- show: bool. If True, display the cross-correlation plot.
|
|
- ref_cam_name: str. The name of the reference camera.
|
|
- cam_name: str. The name of the camera to compare with.
|
|
|
|
OUTPUTS:
|
|
- offset: int. The time offset for which the correlation is highest.
|
|
- max_corr: float. The maximum correlation value.
|
|
'''
|
|
|
|
if isinstance(lag_range, int):
|
|
lag_range = [-lag_range, lag_range]
|
|
|
|
pearson_r = [camx.corr(camy.shift(lag)) for lag in range(lag_range[0], lag_range[1])]
|
|
offset = int(np.floor(len(pearson_r)/2)-np.argmax(pearson_r))
|
|
if not np.isnan(pearson_r).all():
|
|
max_corr = np.nanmax(pearson_r)
|
|
|
|
if show:
|
|
f, ax = plt.subplots(2,1)
|
|
# speed
|
|
camx.plot(ax=ax[0], label = f'Reference: {ref_cam_name}')
|
|
camy.plot(ax=ax[0], label = f'Compared: {cam_name}')
|
|
ax[0].set(xlabel='Frame', ylabel='Speed (px/frame)')
|
|
ax[0].legend()
|
|
# time lagged cross-correlation
|
|
ax[1].plot(list(range(lag_range[0], lag_range[1])), pearson_r)
|
|
ax[1].axvline(np.ceil(len(pearson_r)/2) + lag_range[0],color='k',linestyle='--')
|
|
ax[1].axvline(np.argmax(pearson_r) + lag_range[0],color='r',linestyle='--',label='Peak synchrony')
|
|
plt.annotate(f'Max correlation={np.round(max_corr,2)}', xy=(0.05, 0.9), xycoords='axes fraction')
|
|
ax[1].set(title=f'Offset = {offset} frames', xlabel='Offset (frames)',ylabel='Pearson r')
|
|
|
|
plt.legend()
|
|
f.tight_layout()
|
|
plt.show()
|
|
else:
|
|
max_corr = 0
|
|
offset = 0
|
|
if show:
|
|
# print('No good values to interpolate')
|
|
pass
|
|
|
|
return offset, max_corr
|
|
|
|
|
|
def synchronize_cams_all(config_dict):
|
|
'''
|
|
Post-synchronize your cameras in case they are not natively synchronized.
|
|
|
|
For each camera, computes mean vertical speed for the chosen keypoints,
|
|
and find the time offset for which their correlation is highest.
|
|
|
|
Depending on the analysed motion, all keypoints can be taken into account,
|
|
or a list of them, or the right or left side.
|
|
All frames can be considered, or only those around a specific time (typically,
|
|
the time when there is a single participant in the scene performing a clear vertical motion).
|
|
Has also been successfully tested for synchronizing random walkswith random walks.
|
|
|
|
Keypoints whose likelihood is too low are filtered out; and the remaining ones are
|
|
filtered with a butterworth filter.
|
|
|
|
INPUTS:
|
|
- json files from each camera folders
|
|
- a Config.toml file
|
|
- a skeleton model
|
|
|
|
OUTPUTS:
|
|
- synchronized json files for each camera
|
|
'''
|
|
|
|
# Get parameters from Config.toml
|
|
project_dir = config_dict.get('project').get('project_dir')
|
|
pose_dir = os.path.realpath(os.path.join(project_dir, 'pose'))
|
|
pose_model = config_dict.get('pose').get('pose_model')
|
|
multi_person = config_dict.get('project').get('multi_person')
|
|
fps = config_dict.get('project').get('frame_rate')
|
|
frame_range = config_dict.get('project').get('frame_range')
|
|
display_sync_plots = config_dict.get('synchronization').get('display_sync_plots')
|
|
keypoints_to_consider = config_dict.get('synchronization').get('keypoints_to_consider')
|
|
approx_time_maxspeed = config_dict.get('synchronization').get('approx_time_maxspeed')
|
|
time_range_around_maxspeed = config_dict.get('synchronization').get('time_range_around_maxspeed')
|
|
|
|
likelihood_threshold = config_dict.get('synchronization').get('likelihood_threshold')
|
|
filter_cutoff = int(config_dict.get('synchronization').get('filter_cutoff'))
|
|
filter_order = int(config_dict.get('synchronization').get('filter_order'))
|
|
|
|
# Determine frame rate
|
|
video_dir = os.path.join(project_dir, 'videos')
|
|
vid_img_extension = config_dict['pose']['vid_img_extension']
|
|
vid_or_img_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension))
|
|
if not vid_or_img_files: # video_files is then img_dirs
|
|
image_folders = [f for f in os.listdir(video_dir) if os.path.isdir(os.path.join(video_dir, f))]
|
|
for image_folder in image_folders:
|
|
vid_or_img_files.append(glob.glob(os.path.join(video_dir, image_folder, '*'+vid_img_extension)))
|
|
|
|
if fps == 'auto':
|
|
try:
|
|
cap = cv2.VideoCapture(vid_or_img_files[0])
|
|
cap.read()
|
|
if cap.read()[0] == False:
|
|
raise
|
|
fps = int(cap.get(cv2.CAP_PROP_FPS))
|
|
except:
|
|
fps = 60
|
|
lag_range = time_range_around_maxspeed*fps # frames
|
|
|
|
# Retrieve keypoints from model
|
|
try: # from skeletons.py
|
|
model = eval(pose_model)
|
|
except:
|
|
try: # from Config.toml
|
|
model = DictImporter().import_(config_dict.get('pose').get(pose_model))
|
|
if model.id == 'None':
|
|
model.id = None
|
|
except:
|
|
raise NameError('Model not found in skeletons.py nor in Config.toml')
|
|
keypoints_ids = [node.id for _, _, node in RenderTree(model) if node.id!=None]
|
|
keypoints_names = [node.name for _, _, node in RenderTree(model) if node.id!=None]
|
|
|
|
# List json files
|
|
try:
|
|
pose_listdirs_names = next(os.walk(pose_dir))[1]
|
|
os.listdir(os.path.join(pose_dir, pose_listdirs_names[0]))[0]
|
|
except:
|
|
raise ValueError(f'No json files found in {pose_dir} subdirectories. Make sure you run Pose2Sim.poseEstimation() first.')
|
|
pose_listdirs_names = sort_stringlist_by_last_number(pose_listdirs_names)
|
|
json_dirs_names = [k for k in pose_listdirs_names if 'json' in k]
|
|
json_dirs = [os.path.join(pose_dir, j_d) for j_d in json_dirs_names] # list of json directories in pose_dir
|
|
json_files_names = [fnmatch.filter(os.listdir(os.path.join(pose_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
|
|
json_files_names = [sort_stringlist_by_last_number(j) for j in json_files_names]
|
|
nb_frames_per_cam = [len(fnmatch.filter(os.listdir(os.path.join(json_dir)), '*.json')) for json_dir in json_dirs]
|
|
cam_nb = len(json_dirs)
|
|
cam_list = list(range(cam_nb))
|
|
cam_names = [os.path.basename(j_dir).split('_')[0] for j_dir in json_dirs]
|
|
|
|
# frame range selection
|
|
f_range = [[0, min([len(j) for j in json_files_names])] if frame_range==[] else frame_range][0]
|
|
# json_files_names = [[j for j in json_files_cam if int(re.split(r'(\d+)',j)[-2]) in range(*f_range)] for json_files_cam in json_files_names]
|
|
|
|
# Determine frames to consider for synchronization
|
|
if isinstance(approx_time_maxspeed, list): # search around max speed
|
|
approx_frame_maxspeed = [int(fps * t) for t in approx_time_maxspeed]
|
|
nb_frames_per_cam = [len(fnmatch.filter(os.listdir(os.path.join(json_dir)), '*.json')) for json_dir in json_dirs]
|
|
search_around_frames = [[int(a-lag_range) if a-lag_range>0 else 0, int(a+lag_range) if a+lag_range<nb_frames_per_cam[i] else nb_frames_per_cam[i]+f_range[0]] for i,a in enumerate(approx_frame_maxspeed)]
|
|
logging.info(f'Synchronization is calculated around the times {approx_time_maxspeed} +/- {time_range_around_maxspeed} s.')
|
|
elif approx_time_maxspeed == 'auto': # search on the whole sequence (slower if long sequence)
|
|
search_around_frames = [[f_range[0], f_range[0]+nb_frames_per_cam[i]] for i in range(cam_nb)]
|
|
logging.info('Synchronization is calculated on the whole sequence. This may take a while.')
|
|
else:
|
|
raise ValueError('approx_time_maxspeed should be a list of floats or "auto"')
|
|
|
|
if keypoints_to_consider == 'right':
|
|
logging.info(f'Keypoints used to compute the best synchronization offset: right side.')
|
|
elif keypoints_to_consider == 'left':
|
|
logging.info(f'Keypoints used to compute the best synchronization offset: left side.')
|
|
elif isinstance(keypoints_to_consider, list):
|
|
logging.info(f'Keypoints used to compute the best synchronization offset: {keypoints_to_consider}.')
|
|
elif keypoints_to_consider == 'all':
|
|
logging.info(f'All keypoints are used to compute the best synchronization offset.')
|
|
logging.info(f'These keypoints are filtered with a Butterworth filter (cut-off frequency: {filter_cutoff} Hz, order: {filter_order}).')
|
|
logging.info(f'They are removed when their likelihood is below {likelihood_threshold}.\n')
|
|
|
|
# Extract, interpolate, and filter keypoint coordinates
|
|
logging.info('Synchronizing...')
|
|
df_coords = []
|
|
b, a = signal.butter(filter_order/2, filter_cutoff/(fps/2), 'low', analog = False)
|
|
json_files_names_range = [[j for j in json_files_cam if int(re.split(r'(\d+)',j)[-2]) in range(*frames_cam)] for (json_files_cam, frames_cam) in zip(json_files_names,search_around_frames)]
|
|
json_files_range = [[os.path.join(pose_dir, j_dir, j_file) for j_file in json_files_names_range[j]] for j, j_dir in enumerate(json_dirs_names)]
|
|
|
|
if np.array([j==[] for j in json_files_names_range]).any():
|
|
raise ValueError(f'No json files found within the specified frame range ({frame_range}) at the times {approx_time_maxspeed} +/- {time_range_around_maxspeed} s.')
|
|
|
|
# Handle manual selection if multi person is True
|
|
selected_id_list = get_selected_id_list(multi_person, vid_or_img_files, cam_names, cam_nb, json_files_names_range, search_around_frames, pose_dir, json_dirs_names)
|
|
|
|
for i in range(cam_nb):
|
|
df_coords.append(convert_json2pandas(json_files_range[i], likelihood_threshold=likelihood_threshold, keypoints_ids=keypoints_ids, multi_person=multi_person, selected_id=selected_id_list[i]))
|
|
df_coords[i] = drop_col(df_coords[i],3) # drop likelihood
|
|
if keypoints_to_consider == 'right':
|
|
kpt_indices = [i for i in range(len(keypoints_ids)) if keypoints_names[i].startswith('R') or keypoints_names[i].startswith('right')]
|
|
elif keypoints_to_consider == 'left':
|
|
kpt_indices = [i for i in range(len(keypoints_ids)) if keypoints_names[i].startswith('L') or keypoints_names[i].startswith('left')]
|
|
elif isinstance(keypoints_to_consider, list):
|
|
kpt_indices = [i for i in range(len(keypoints_ids)) if keypoints_names[i] in keypoints_to_consider]
|
|
elif keypoints_to_consider == 'all':
|
|
kpt_indices = [i for i in range(len(keypoints_ids))]
|
|
else:
|
|
raise ValueError('keypoints_to_consider should be "all", "right", "left", or a list of keypoint names.\n\
|
|
If you specified keypoints, make sure that they exist in your pose_model.')
|
|
|
|
kpt_indices = np.sort(np.concatenate([np.array(kpt_indices)*2, np.array(kpt_indices)*2+1]))
|
|
df_coords[i] = df_coords[i][kpt_indices]
|
|
df_coords[i] = df_coords[i].apply(interpolate_zeros_nans, axis=0, args = ['linear'])
|
|
df_coords[i] = df_coords[i].bfill().ffill()
|
|
df_coords[i] = pd.DataFrame(signal.filtfilt(b, a, df_coords[i], axis=0))
|
|
|
|
|
|
# Compute sum of speeds
|
|
df_speed = []
|
|
sum_speeds = []
|
|
for i in range(cam_nb):
|
|
df_speed.append(vert_speed(df_coords[i]))
|
|
sum_speeds.append(abs(df_speed[i]).sum(axis=1))
|
|
# nb_coords = df_speed[i].shape[1]
|
|
# sum_speeds[i][ sum_speeds[i]>vmax*nb_coords ] = 0
|
|
|
|
# # Replace 0 by random values, otherwise 0 padding may lead to unreliable correlations
|
|
# sum_speeds[i].loc[sum_speeds[i] < 1] = sum_speeds[i].loc[sum_speeds[i] < 1].apply(lambda x: np.random.normal(0,1))
|
|
|
|
sum_speeds[i] = pd.DataFrame(signal.filtfilt(b, a, sum_speeds[i], axis=0)).squeeze()
|
|
|
|
|
|
# Compute offset for best synchronization:
|
|
# Highest correlation of sum of absolute speeds for each cam compared to reference cam
|
|
ref_cam_id = nb_frames_per_cam.index(min(nb_frames_per_cam)) # ref cam: least amount of frames
|
|
ref_cam_name = cam_names[ref_cam_id]
|
|
ref_frame_nb = len(df_coords[ref_cam_id])
|
|
lag_range = int(ref_frame_nb/2)
|
|
cam_list.pop(ref_cam_id)
|
|
cam_names.pop(ref_cam_id)
|
|
offset = []
|
|
for cam_id, cam_name in zip(cam_list, cam_names):
|
|
offset_cam_section, max_corr_cam = time_lagged_cross_corr(sum_speeds[ref_cam_id], sum_speeds[cam_id], lag_range, show=display_sync_plots, ref_cam_name=ref_cam_name, cam_name=cam_name)
|
|
offset_cam = offset_cam_section - (search_around_frames[ref_cam_id][0] - search_around_frames[cam_id][0])
|
|
if isinstance(approx_time_maxspeed, list):
|
|
logging.info(f'--> Camera {ref_cam_name} and {cam_name}: {offset_cam} frames offset ({offset_cam_section} on the selected section), correlation {round(max_corr_cam, 2)}.')
|
|
else:
|
|
logging.info(f'--> Camera {ref_cam_name} and {cam_name}: {offset_cam} frames offset, correlation {round(max_corr_cam, 2)}.')
|
|
offset.append(offset_cam)
|
|
offset.insert(ref_cam_id, 0)
|
|
|
|
# rename json files according to the offset and copy them to pose-sync
|
|
sync_dir = os.path.abspath(os.path.join(pose_dir, '..', 'pose-sync'))
|
|
os.makedirs(sync_dir, exist_ok=True)
|
|
for d, j_dir in enumerate(json_dirs):
|
|
os.makedirs(os.path.join(sync_dir, os.path.basename(j_dir)), exist_ok=True)
|
|
for j_file in json_files_names[d]:
|
|
j_split = re.split(r'(\d+)',j_file)
|
|
j_split[-2] = f'{int(j_split[-2])-offset[d]:06d}'
|
|
if int(j_split[-2]) > 0:
|
|
json_offset_name = ''.join(j_split)
|
|
shutil.copy(os.path.join(pose_dir, os.path.basename(j_dir), j_file), os.path.join(sync_dir, os.path.basename(j_dir), json_offset_name))
|
|
|
|
logging.info(f'Synchronized json files saved in {sync_dir}.') |