🚧 update tools

1. update camera reader
2. update debug and visualize tools
This commit is contained in:
Qing Shuai 2022-08-21 16:04:51 +08:00
parent d534ba41fc
commit 5bc4b113ba
9 changed files with 2184 additions and 89 deletions

View File

@ -1,7 +1,7 @@
import cv2
import numpy as np
import os
from os.path import join
class FileStorage(object):
def __init__(self, filename, isWrite=False):
version = cv2.__version__
@ -10,27 +10,34 @@ class FileStorage(object):
if isWrite:
os.makedirs(os.path.dirname(filename), exist_ok=True)
self.fs = cv2.FileStorage(filename, cv2.FILE_STORAGE_WRITE)
self.fs = open(filename, 'w')
self.fs.write('%YAML:1.0\r\n')
self.fs.write('---\r\n')
else:
assert os.path.exists(filename), filename
self.fs = cv2.FileStorage(filename, cv2.FILE_STORAGE_READ)
self.isWrite = isWrite
def __del__(self):
if self.isWrite:
self.fs.close()
else:
cv2.FileStorage.release(self.fs)
def _write(self, out):
self.fs.write(out+'\r\n')
def write(self, key, value, dt='mat'):
if dt == 'mat':
cv2.FileStorage.write(self.fs, key, value)
self._write('{}: !!opencv-matrix'.format(key))
self._write(' rows: {}'.format(value.shape[0]))
self._write(' cols: {}'.format(value.shape[1]))
self._write(' dt: d')
self._write(' data: [{}]'.format(', '.join(['{:.3f}'.format(i) for i in value.reshape(-1)])))
elif dt == 'list':
if self.major_version == 4: # 4.4
self.fs.startWriteStruct(key, cv2.FileNode_SEQ)
self._write('{}:'.format(key))
for elem in value:
self.fs.write('', elem)
self.fs.endWriteStruct()
else: # 3.4
self.fs.write(key, '[')
for elem in value:
self.fs.write('none', elem)
self.fs.write('none', ']')
self._write(' - "{}"'.format(elem))
def read(self, key, dt='mat'):
if dt == 'mat':
@ -66,6 +73,8 @@ def read_intri(intri_name):
return cameras
def write_intri(intri_name, cameras):
if not os.path.exists(os.path.dirname(intri_name)):
os.makedirs(os.path.dirname(intri_name))
intri = FileStorage(intri_name, True)
results = {}
camnames = list(cameras.keys())
@ -74,11 +83,13 @@ def write_intri(intri_name, cameras):
key = key_.split('.')[0]
K, dist = val['K'], val['dist']
assert K.shape == (3, 3), K.shape
assert dist.shape == (1, 5) or dist.shape == (5, 1), dist.shape
assert dist.shape == (1, 5) or dist.shape == (5, 1) or dist.shape == (1, 4) or dist.shape == (4, 1), dist.shape
intri.write('K_{}'.format(key), K)
intri.write('dist_{}'.format(key), dist.reshape(1, 5))
intri.write('dist_{}'.format(key), dist.flatten()[None])
def write_extri(extri_name, cameras):
if not os.path.exists(os.path.dirname(extri_name)):
os.makedirs(os.path.dirname(extri_name))
extri = FileStorage(extri_name, True)
results = {}
camnames = list(cameras.keys())
@ -105,12 +116,15 @@ def read_camera(intri_name, extri_name, cam_names=[]):
cams[cam]['invK'] = np.linalg.inv(cams[cam]['K'])
Rvec = extri.read('R_{}'.format(cam))
Tvec = extri.read('T_{}'.format(cam))
assert Rvec is not None, cam
R = cv2.Rodrigues(Rvec)[0]
RT = np.hstack((R, Tvec))
cams[cam]['RT'] = RT
cams[cam]['R'] = R
cams[cam]['Rvec'] = Rvec
cams[cam]['T'] = Tvec
cams[cam]['center'] = - Rvec.T @ Tvec
P[cam] = cams[cam]['K'] @ cams[cam]['RT']
cams[cam]['P'] = P[cam]
@ -118,6 +132,13 @@ def read_camera(intri_name, extri_name, cam_names=[]):
cams['basenames'] = cam_names
return cams
def read_cameras(path, intri='intri.yml', extri='extri.yml', subs=[]):
cameras = read_camera(join(path, intri), join(path, extri))
cameras.pop('basenames')
if len(subs) > 0:
cameras = {key:cameras[key].astype(np.float32) for key in subs}
return cameras
def write_camera(camera, path):
from os.path import join
intri_name = join(path, 'intri.yml')
@ -146,12 +167,24 @@ def camera_from_img(img):
focal = 1.2*min(height, width) # as colmap
K = np.array([focal, 0., width/2, 0., focal, height/2, 0. ,0., 1.]).reshape(3, 3)
camera = {'K':K ,'R': np.eye(3), 'T': np.zeros((3, 1)), 'dist': np.zeros((1, 5))}
camera['invK'] = np.linalg.inv(camera['K'])
camera['P'] = camera['K'] @ np.hstack((camera['R'], camera['T']))
return camera
class Undistort:
@staticmethod
def image(frame, K, dist):
distortMap = {}
@classmethod
def image(cls, frame, K, dist, sub=None):
if sub is None:
return cv2.undistort(frame, K, dist, None)
else:
if sub not in cls.distortMap.keys():
h, w = frame.shape[:2]
mapx, mapy = cv2.initUndistortRectifyMap(K, dist, None, K, (w,h), 5)
cls.distortMap[sub] = (mapx, mapy)
mapx, mapy = cls.distortMap[sub]
img = cv2.remap(frame, mapx, mapy, cv2.INTER_NEAREST)
return img
@staticmethod
def points(keypoints, K, dist):
@ -170,10 +203,38 @@ class Undistort:
bbox = np.array([kpts[0, 0], kpts[0, 1], kpts[1, 0], kpts[1, 1], bbox[4]])
return bbox
def undistort(camera, frame=None, keypoints=None, output=None, bbox=None):
# bbox: 1, 7
print('This function is deprecated')
raise NotImplementedError
def unproj(kpts, invK):
homo = np.hstack([kpts[:, :2], np.ones_like(kpts[:, :1])])
homo = homo @ invK.T
return np.hstack([homo[:, :2], kpts[:, 2:]])
class UndistortFisheye:
@staticmethod
def image(frame, K, dist):
Knew = K.copy()
frame = cv2.fisheye.undistortImage(frame, K, dist, Knew=Knew)
return frame, Knew
@staticmethod
def points(keypoints, K, dist, Knew):
# keypoints: (N, 3)
assert len(keypoints.shape) == 2, keypoints.shape
kpts = keypoints[:, None, :2]
kpts = np.ascontiguousarray(kpts)
kpts = cv2.fisheye.undistortPoints(kpts, K, dist, P=Knew)
keypoints[:, :2] = kpts[:, 0]
return keypoints
@staticmethod
def bbox(bbox, K, dist, Knew):
keypoints = np.array([[bbox[0], bbox[1], 1], [bbox[2], bbox[3], 1]])
kpts = UndistortFisheye.points(keypoints, K, dist, Knew)
bbox = np.array([kpts[0, 0], kpts[0, 1], kpts[1, 0], kpts[1, 1], bbox[4]])
return bbox
def get_Pall(cameras, camnames):
Pall = np.stack([cameras[cam]['K'] @ np.hstack((cameras[cam]['R'], cameras[cam]['T'])) for cam in camnames])
return Pall
def get_fundamental_matrix(cameras, basenames):
skew_op = lambda x: np.array([[0, -x[2], x[1]], [x[2], 0, -x[0]], [-x[1], x[0], 0]])
@ -189,3 +250,59 @@ def get_fundamental_matrix(cameras, basenames):
if F[(icam, jcam)].sum() == 0:
F[(icam, jcam)] += 1e-12 # to avoid nan
return F
def interp_cameras(cameras, keys, step=20, loop=True, allstep=-1, **kwargs):
from scipy.spatial.transform import Rotation as R
from scipy.spatial.transform import Slerp
if allstep != -1:
tall = np.linspace(0., 1., allstep+1)[:-1].reshape(-1, 1, 1)
elif allstep == -1 and loop:
tall = np.linspace(0., 1., 1+step*len(keys))[:-1].reshape(-1, 1, 1)
elif allstep == -1 and not loop:
tall = np.linspace(0., 1., 1+step*(len(keys)-1))[:-1].reshape(-1, 1, 1)
cameras_new = {}
for ik in range(len(keys)):
if ik == len(keys) -1 and not loop:
break
if loop:
start, end = (ik * tall.shape[0])//len(keys), int((ik+1)*tall.shape[0])//len(keys)
print(ik, start, end, tall.shape)
else:
start, end = (ik * tall.shape[0])//(len(keys)-1), int((ik+1)*tall.shape[0])//(len(keys)-1)
t = tall[start:end].copy()
t = (t-t.min())/(t.max()-t.min())
left, right = keys[ik], keys[0 if ik == len(keys)-1 else ik + 1]
camera_left = cameras[left]
camera_right = cameras[right]
# 插值相机中心: center = - R.T @ T
center_l = - camera_left['R'].T @ camera_left['T']
center_r = - camera_right['R'].T @ camera_right['T']
center_l, center_r = center_l[None], center_r[None]
if False:
centers = center_l * (1-t) + center_r * t
else:
# 球面插值
norm_l, norm_r = np.linalg.norm(center_l), np.linalg.norm(center_r)
center_l, center_r = center_l/norm_l, center_r/norm_r
costheta = (center_l*center_r).sum()
sintheta = np.sqrt(1. - costheta**2)
theta = np.arctan2(sintheta, costheta)
centers = (np.sin(theta*(1-t)) * center_l + np.sin(theta * t) * center_r)/sintheta
norm = norm_l * (1-t) + norm_r * t
centers = centers * norm
key_rots = R.from_matrix(np.stack([camera_left['R'], camera_right['R']]))
key_times = [0, 1]
slerp = Slerp(key_times, key_rots)
interp_rots = slerp(t.squeeze()).as_matrix()
# 计算相机T RX + T = 0 => T = - R @ X
T = - np.einsum('bmn,bno->bmo', interp_rots, centers)
K = camera_left['K'] * (1-t) + camera_right['K'] * t
for i in range(T.shape[0]):
cameras_new['{}-{}-{}'.format(left, right, i)] = \
{
'K': K[i],
'dist': np.zeros((1, 5)),
'R': interp_rots[i],
'T': T[i]
}
return cameras_new

View File

@ -0,0 +1,439 @@
# Copyright (c) 2018, ETH Zurich and UNC Chapel Hill.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of ETH Zurich and UNC Chapel Hill nor the names of
# its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Johannes L. Schoenberger (jsch-at-demuc-dot-de)
import os
import sys
import collections
import numpy as np
import struct
import cv2
CameraModel = collections.namedtuple(
"CameraModel", ["model_id", "model_name", "num_params"])
Camera = collections.namedtuple(
"Camera", ["id", "model", "width", "height", "params"])
BaseImage = collections.namedtuple(
"Image", ["id", "qvec", "tvec", "camera_id", "name", "xys", "point3D_ids"])
Point3D = collections.namedtuple(
"Point3D", ["id", "xyz", "rgb", "error", "image_ids", "point2D_idxs"])
class Image(BaseImage):
def qvec2rotmat(self):
return qvec2rotmat(self.qvec)
CAMERA_MODELS = {
CameraModel(model_id=0, model_name="SIMPLE_PINHOLE", num_params=3),
CameraModel(model_id=1, model_name="PINHOLE", num_params=4),
CameraModel(model_id=2, model_name="SIMPLE_RADIAL", num_params=4),
CameraModel(model_id=3, model_name="RADIAL", num_params=5),
CameraModel(model_id=4, model_name="OPENCV", num_params=8),
CameraModel(model_id=5, model_name="OPENCV_FISHEYE", num_params=8),
CameraModel(model_id=6, model_name="FULL_OPENCV", num_params=12),
CameraModel(model_id=7, model_name="FOV", num_params=5),
CameraModel(model_id=8, model_name="SIMPLE_RADIAL_FISHEYE", num_params=4),
CameraModel(model_id=9, model_name="RADIAL_FISHEYE", num_params=5),
CameraModel(model_id=10, model_name="THIN_PRISM_FISHEYE", num_params=12)
}
CAMERA_MODEL_IDS = dict([(camera_model.model_id, camera_model) \
for camera_model in CAMERA_MODELS])
CAMERA_MODEL_NAMES = dict([(camera_model.model_name, camera_model)
for camera_model in CAMERA_MODELS])
def read_next_bytes(fid, num_bytes, format_char_sequence, endian_character="<"):
"""Read and unpack the next bytes from a binary file.
:param fid:
:param num_bytes: Sum of combination of {2, 4, 8}, e.g. 2, 6, 16, 30, etc.
:param format_char_sequence: List of {c, e, f, d, h, H, i, I, l, L, q, Q}.
:param endian_character: Any of {@, =, <, >, !}
:return: Tuple of read and unpacked values.
"""
data = fid.read(num_bytes)
return struct.unpack(endian_character + format_char_sequence, data)
def read_cameras_text(path):
"""
see: src/base/reconstruction.cc
void Reconstruction::WriteCamerasText(const std::string& path)
void Reconstruction::ReadCamerasText(const std::string& path)
"""
cameras = {}
with open(path, "r") as fid:
while True:
line = fid.readline()
if not line:
break
line = line.strip()
if len(line) > 0 and line[0] != "#":
elems = line.split()
camera_id = int(elems[0])
model = elems[1]
width = int(elems[2])
height = int(elems[3])
params = np.array(tuple(map(float, elems[4:])))
cameras[camera_id] = Camera(id=camera_id, model=model,
width=width, height=height,
params=params)
return cameras
def read_cameras_binary(path_to_model_file):
"""
see: src/base/reconstruction.cc
void Reconstruction::WriteCamerasBinary(const std::string& path)
void Reconstruction::ReadCamerasBinary(const std::string& path)
"""
cameras = {}
with open(path_to_model_file, "rb") as fid:
num_cameras = read_next_bytes(fid, 8, "Q")[0]
for camera_line_index in range(num_cameras):
camera_properties = read_next_bytes(
fid, num_bytes=24, format_char_sequence="iiQQ")
camera_id = camera_properties[0]
model_id = camera_properties[1]
model_name = CAMERA_MODEL_IDS[camera_properties[1]].model_name
width = camera_properties[2]
height = camera_properties[3]
num_params = CAMERA_MODEL_IDS[model_id].num_params
params = read_next_bytes(fid, num_bytes=8*num_params,
format_char_sequence="d"*num_params)
cameras[camera_id] = Camera(id=camera_id,
model=model_name,
width=width,
height=height,
params=np.array(params))
assert len(cameras) == num_cameras
return cameras
def read_images_text(path):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadImagesText(const std::string& path)
void Reconstruction::WriteImagesText(const std::string& path)
"""
images = {}
with open(path, "r") as fid:
while True:
line = fid.readline()
if not line:
break
line = line.strip()
if len(line) > 0 and line[0] != "#":
elems = line.split()
image_id = int(elems[0])
qvec = np.array(tuple(map(float, elems[1:5])))
tvec = np.array(tuple(map(float, elems[5:8])))
camera_id = int(elems[8])
image_name = elems[9]
elems = fid.readline().split()
xys = np.column_stack([tuple(map(float, elems[0::3])),
tuple(map(float, elems[1::3]))])
point3D_ids = np.array(tuple(map(int, elems[2::3])))
images[image_id] = Image(
id=image_id, qvec=qvec, tvec=tvec,
camera_id=camera_id, name=image_name,
xys=xys, point3D_ids=point3D_ids)
return images
def read_images_binary(path_to_model_file):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadImagesBinary(const std::string& path)
void Reconstruction::WriteImagesBinary(const std::string& path)
"""
images = {}
with open(path_to_model_file, "rb") as fid:
num_reg_images = read_next_bytes(fid, 8, "Q")[0]
for image_index in range(num_reg_images):
binary_image_properties = read_next_bytes(
fid, num_bytes=64, format_char_sequence="idddddddi")
image_id = binary_image_properties[0]
qvec = np.array(binary_image_properties[1:5])
tvec = np.array(binary_image_properties[5:8])
camera_id = binary_image_properties[8]
image_name = ""
current_char = read_next_bytes(fid, 1, "c")[0]
while current_char != b"\x00": # look for the ASCII 0 entry
image_name += current_char.decode("utf-8")
current_char = read_next_bytes(fid, 1, "c")[0]
num_points2D = read_next_bytes(fid, num_bytes=8,
format_char_sequence="Q")[0]
x_y_id_s = read_next_bytes(fid, num_bytes=24*num_points2D,
format_char_sequence="ddq"*num_points2D)
xys = np.column_stack([tuple(map(float, x_y_id_s[0::3])),
tuple(map(float, x_y_id_s[1::3]))])
point3D_ids = np.array(tuple(map(int, x_y_id_s[2::3])))
images[image_id] = Image(
id=image_id, qvec=qvec, tvec=tvec,
camera_id=camera_id, name=image_name,
xys=xys, point3D_ids=point3D_ids)
return images
def read_points3D_text(path):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadPoints3DText(const std::string& path)
void Reconstruction::WritePoints3DText(const std::string& path)
"""
points3D = {}
with open(path, "r") as fid:
while True:
line = fid.readline()
if not line:
break
line = line.strip()
if len(line) > 0 and line[0] != "#":
elems = line.split()
point3D_id = int(elems[0])
xyz = np.array(tuple(map(float, elems[1:4])))
rgb = np.array(tuple(map(int, elems[4:7])))
error = float(elems[7])
image_ids = np.array(tuple(map(int, elems[8::2])))
point2D_idxs = np.array(tuple(map(int, elems[9::2])))
points3D[point3D_id] = Point3D(id=point3D_id, xyz=xyz, rgb=rgb,
error=error, image_ids=image_ids,
point2D_idxs=point2D_idxs)
return points3D
def read_points3d_binary(path_to_model_file):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadPoints3DBinary(const std::string& path)
void Reconstruction::WritePoints3DBinary(const std::string& path)
"""
points3D = {}
with open(path_to_model_file, "rb") as fid:
num_points = read_next_bytes(fid, 8, "Q")[0]
for point_line_index in range(num_points):
binary_point_line_properties = read_next_bytes(
fid, num_bytes=43, format_char_sequence="QdddBBBd")
point3D_id = binary_point_line_properties[0]
xyz = np.array(binary_point_line_properties[1:4])
rgb = np.array(binary_point_line_properties[4:7])
error = np.array(binary_point_line_properties[7])
track_length = read_next_bytes(
fid, num_bytes=8, format_char_sequence="Q")[0]
track_elems = read_next_bytes(
fid, num_bytes=8*track_length,
format_char_sequence="ii"*track_length)
image_ids = np.array(tuple(map(int, track_elems[0::2])))
point2D_idxs = np.array(tuple(map(int, track_elems[1::2])))
points3D[point3D_id] = Point3D(
id=point3D_id, xyz=xyz, rgb=rgb,
error=error, image_ids=image_ids,
point2D_idxs=point2D_idxs)
return points3D
def read_model(path, ext):
if ext == ".txt":
cameras = read_cameras_text(os.path.join(path, "cameras" + ext))
images = read_images_text(os.path.join(path, "images" + ext))
points3D = read_points3D_text(os.path.join(path, "points3D") + ext)
else:
cameras = read_cameras_binary(os.path.join(path, "cameras" + ext))
images = read_images_binary(os.path.join(path, "images" + ext))
points3D = read_points3d_binary(os.path.join(path, "points3D") + ext)
return cameras, images, points3D
def qvec2rotmat(qvec):
return np.array([
[1 - 2 * qvec[2]**2 - 2 * qvec[3]**2,
2 * qvec[1] * qvec[2] - 2 * qvec[0] * qvec[3],
2 * qvec[3] * qvec[1] + 2 * qvec[0] * qvec[2]],
[2 * qvec[1] * qvec[2] + 2 * qvec[0] * qvec[3],
1 - 2 * qvec[1]**2 - 2 * qvec[3]**2,
2 * qvec[2] * qvec[3] - 2 * qvec[0] * qvec[1]],
[2 * qvec[3] * qvec[1] - 2 * qvec[0] * qvec[2],
2 * qvec[2] * qvec[3] + 2 * qvec[0] * qvec[1],
1 - 2 * qvec[1]**2 - 2 * qvec[2]**2]])
def rotmat2qvec(R):
Rxx, Ryx, Rzx, Rxy, Ryy, Rzy, Rxz, Ryz, Rzz = R.flat
K = np.array([
[Rxx - Ryy - Rzz, 0, 0, 0],
[Ryx + Rxy, Ryy - Rxx - Rzz, 0, 0],
[Rzx + Rxz, Rzy + Ryz, Rzz - Rxx - Ryy, 0],
[Ryz - Rzy, Rzx - Rxz, Rxy - Ryx, Rxx + Ryy + Rzz]]) / 3.0
eigvals, eigvecs = np.linalg.eigh(K)
qvec = eigvecs[[3, 0, 1, 2], np.argmax(eigvals)]
if qvec[0] < 0:
qvec *= -1
return qvec
def write_cameras_text(cameras, path):
"""
see: src/base/reconstruction.cc
void Reconstruction::WriteCamerasText(const std::string& path)
void Reconstruction::ReadCamerasText(const std::string& path)
"""
HEADER = '# Camera list with one line of data per camera:\n'
'# CAMERA_ID, MODEL, WIDTH, HEIGHT, PARAMS[]\n'
'# Number of cameras: {}\n'.format(len(cameras))
with open(path, "w") as fid:
fid.write(HEADER)
for _, cam in cameras.items():
to_write = [cam.id, cam.model, cam.width, cam.height, *cam.params]
line = " ".join([str(elem) for elem in to_write])
fid.write(line + "\n")
def write_next_bytes(fid, data, format_char_sequence, endian_character="<"):
"""pack and write to a binary file.
:param fid:
:param data: data to send, if multiple elements are sent at the same time,
they should be encapsuled either in a list or a tuple
:param format_char_sequence: List of {c, e, f, d, h, H, i, I, l, L, q, Q}.
should be the same length as the data list or tuple
:param endian_character: Any of {@, =, <, >, !}
"""
if isinstance(data, (list, tuple)):
bytes = struct.pack(endian_character + format_char_sequence, *data)
else:
bytes = struct.pack(endian_character + format_char_sequence, data)
fid.write(bytes)
def write_cameras_binary(cameras, path_to_model_file):
"""
see: src/base/reconstruction.cc
void Reconstruction::WriteCamerasBinary(const std::string& path)
void Reconstruction::ReadCamerasBinary(const std::string& path)
"""
with open(path_to_model_file, "wb") as fid:
write_next_bytes(fid, len(cameras), "Q")
for _, cam in cameras.items():
model_id = CAMERA_MODEL_NAMES[cam.model].model_id
camera_properties = [cam.id,
model_id,
cam.width,
cam.height]
write_next_bytes(fid, camera_properties, "iiQQ")
for p in cam.params:
write_next_bytes(fid, float(p), "d")
return cameras
def write_images_binary(images, path_to_model_file):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadImagesBinary(const std::string& path)
void Reconstruction::WriteImagesBinary(const std::string& path)
"""
with open(path_to_model_file, "wb") as fid:
write_next_bytes(fid, len(images), "Q")
for _, img in images.items():
write_next_bytes(fid, img.id, "i")
write_next_bytes(fid, img.qvec.tolist(), "dddd")
write_next_bytes(fid, img.tvec.tolist(), "ddd")
write_next_bytes(fid, img.camera_id, "i")
for char in img.name:
write_next_bytes(fid, char.encode("utf-8"), "c")
write_next_bytes(fid, b"\x00", "c")
write_next_bytes(fid, len(img.point3D_ids), "Q")
for xy, p3d_id in zip(img.xys, img.point3D_ids):
write_next_bytes(fid, [*xy, p3d_id], "ddq")
def write_images_text(images, path):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadImagesText(const std::string& path)
void Reconstruction::WriteImagesText(const std::string& path)
"""
if len(images) == 0:
mean_observations = 0
else:
mean_observations = sum((len(img.point3D_ids) for _, img in images.items()))/len(images)
HEADER = '# Image list with two lines of data per image:\n'
'# IMAGE_ID, QW, QX, QY, QZ, TX, TY, TZ, CAMERA_ID, NAME\n'
'# POINTS2D[] as (X, Y, POINT3D_ID)\n'
'# Number of images: {}, mean observations per image: {}\n'.format(len(images), mean_observations)
with open(path, "w") as fid:
fid.write(HEADER)
for _, img in images.items():
image_header = [img.id, *img.qvec, *img.tvec, img.camera_id, img.name]
first_line = " ".join(map(str, image_header))
fid.write(first_line + "\n")
points_strings = []
for xy, point3D_id in zip(img.xys, img.point3D_ids):
points_strings.append(" ".join(map(str, [*xy, point3D_id])))
fid.write(" ".join(points_strings) + "\n")
def write_points3D_text(points3D, path):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadPoints3DText(const std::string& path)
void Reconstruction::WritePoints3DText(const std::string& path)
"""
if len(points3D) == 0:
mean_track_length = 0
else:
mean_track_length = sum((len(pt.image_ids) for _, pt in points3D.items()))/len(points3D)
HEADER = '# 3D point list with one line of data per point:\n'
'# POINT3D_ID, X, Y, Z, R, G, B, ERROR, TRACK[] as (IMAGE_ID, POINT2D_IDX)\n'
'# Number of points: {}, mean track length: {}\n'.format(len(points3D), mean_track_length)
with open(path, "w") as fid:
fid.write(HEADER)
for _, pt in points3D.items():
point_header = [pt.id, *pt.xyz, *pt.rgb, pt.error]
fid.write(" ".join(map(str, point_header)) + " ")
track_strings = []
for image_id, point2D in zip(pt.image_ids, pt.point2D_idxs):
track_strings.append(" ".join(map(str, [image_id, point2D])))
fid.write(" ".join(track_strings) + "\n")
def write_points3d_binary(points3D, path_to_model_file):
"""
see: src/base/reconstruction.cc
void Reconstruction::ReadPoints3DBinary(const std::string& path)
void Reconstruction::WritePoints3DBinary(const std::string& path)
"""
with open(path_to_model_file, "wb") as fid:
write_next_bytes(fid, len(points3D), "Q")
for _, pt in points3D.items():
write_next_bytes(fid, pt.id, "Q")
write_next_bytes(fid, pt.xyz.tolist(), "ddd")
write_next_bytes(fid, pt.rgb.tolist(), "BBB")
write_next_bytes(fid, pt.error, "d")
track_length = pt.image_ids.shape[0]
write_next_bytes(fid, track_length, "Q")
for image_id, point2D_id in zip(pt.image_ids, pt.point2D_idxs):
write_next_bytes(fid, [image_id, point2D_id], "ii")

View File

@ -0,0 +1,468 @@
'''
@ Date: 2022-06-20 15:03:50
@ Author: Qing Shuai
@ Mail: s_q@zju.edu.cn
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-08-16 20:24:07
@ FilePath: /EasyMocapPublic/easymocap/mytools/colmap_wrapper.py
'''
import shutil
import sys
import os
import sqlite3
import numpy as np
from os.path import join
import cv2
from .debug_utils import mkdir, run_cmd, log, mywarn
from .colmap_structure import Camera, Image, CAMERA_MODEL_NAMES
from .colmap_structure import rotmat2qvec
from .colmap_structure import read_points3d_binary
IS_PYTHON3 = sys.version_info[0] >= 3
MAX_IMAGE_ID = 2**31 - 1
CREATE_CAMERAS_TABLE = """CREATE TABLE IF NOT EXISTS cameras (
camera_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
model INTEGER NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
params BLOB,
prior_focal_length INTEGER NOT NULL)"""
CREATE_DESCRIPTORS_TABLE = """CREATE TABLE IF NOT EXISTS descriptors (
image_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB,
FOREIGN KEY(image_id) REFERENCES images(image_id) ON DELETE CASCADE)"""
CREATE_IMAGES_TABLE = """CREATE TABLE IF NOT EXISTS images (
image_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
name TEXT NOT NULL UNIQUE,
camera_id INTEGER NOT NULL,
prior_qw REAL,
prior_qx REAL,
prior_qy REAL,
prior_qz REAL,
prior_tx REAL,
prior_ty REAL,
prior_tz REAL,
CONSTRAINT image_id_check CHECK(image_id >= 0 and image_id < {}),
FOREIGN KEY(camera_id) REFERENCES cameras(camera_id))
""".format(MAX_IMAGE_ID)
CREATE_TWO_VIEW_GEOMETRIES_TABLE = """
CREATE TABLE IF NOT EXISTS two_view_geometries (
pair_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB,
config INTEGER NOT NULL,
F BLOB,
E BLOB,
H BLOB)
"""
CREATE_KEYPOINTS_TABLE = """CREATE TABLE IF NOT EXISTS keypoints (
image_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB,
FOREIGN KEY(image_id) REFERENCES images(image_id) ON DELETE CASCADE)
"""
CREATE_MATCHES_TABLE = """CREATE TABLE IF NOT EXISTS matches (
pair_id INTEGER PRIMARY KEY NOT NULL,
rows INTEGER NOT NULL,
cols INTEGER NOT NULL,
data BLOB)"""
CREATE_NAME_INDEX = \
"CREATE UNIQUE INDEX IF NOT EXISTS index_name ON images(name)"
CREATE_ALL = "; ".join([
CREATE_CAMERAS_TABLE,
CREATE_IMAGES_TABLE,
CREATE_KEYPOINTS_TABLE,
CREATE_DESCRIPTORS_TABLE,
CREATE_MATCHES_TABLE,
CREATE_TWO_VIEW_GEOMETRIES_TABLE,
CREATE_NAME_INDEX
])
def image_ids_to_pair_id(image_id1, image_id2):
if image_id1 > image_id2:
image_id1, image_id2 = image_id2, image_id1
return image_id1 * MAX_IMAGE_ID + image_id2
def pair_id_to_image_ids(pair_id):
image_id2 = pair_id % MAX_IMAGE_ID
image_id1 = (pair_id - image_id2) // MAX_IMAGE_ID
return image_id1, image_id2
def array_to_blob(array):
if IS_PYTHON3:
return array.tobytes()
else:
return np.getbuffer(array)
def blob_to_array(blob, dtype, shape=(-1,)):
if blob is None:
return np.empty((0, 2), dtype=dtype)
if IS_PYTHON3:
return np.frombuffer(blob, dtype=dtype).reshape(*shape)
else:
return np.frombuffer(blob, dtype=dtype).reshape(*shape)
class COLMAPDatabase(sqlite3.Connection):
@staticmethod
def connect(database_path):
return sqlite3.connect(database_path, factory=COLMAPDatabase)
def __init__(self, *args, **kwargs):
super(COLMAPDatabase, self).__init__(*args, **kwargs)
self.create_tables = lambda: self.executescript(CREATE_ALL)
self.create_cameras_table = \
lambda: self.executescript(CREATE_CAMERAS_TABLE)
self.create_descriptors_table = \
lambda: self.executescript(CREATE_DESCRIPTORS_TABLE)
self.create_images_table = \
lambda: self.executescript(CREATE_IMAGES_TABLE)
self.create_two_view_geometries_table = \
lambda: self.executescript(CREATE_TWO_VIEW_GEOMETRIES_TABLE)
self.create_keypoints_table = \
lambda: self.executescript(CREATE_KEYPOINTS_TABLE)
self.create_matches_table = \
lambda: self.executescript(CREATE_MATCHES_TABLE)
self.create_name_index = lambda: self.executescript(CREATE_NAME_INDEX)
def add_camera(self, model, width, height, params,
prior_focal_length=False, camera_id=None):
params = np.asarray(params, np.float64)
cursor = self.execute(
"INSERT INTO cameras VALUES (?, ?, ?, ?, ?, ?)",
(camera_id, model, width, height, array_to_blob(params),
prior_focal_length))
return cursor.lastrowid
def add_image(self, name, camera_id,
prior_q=np.full(4, np.NaN), prior_t=np.full(3, np.NaN), image_id=None):
cursor = self.execute(
"INSERT INTO images VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(image_id, name, camera_id, prior_q[0], prior_q[1], prior_q[2],
prior_q[3], prior_t[0], prior_t[1], prior_t[2]))
return cursor.lastrowid
def add_keypoints(self, image_id, keypoints):
assert(len(keypoints.shape) == 2)
assert(keypoints.shape[1] in [2, 4, 6])
keypoints = np.asarray(keypoints, np.float32)
self.execute(
"INSERT INTO keypoints VALUES (?, ?, ?, ?)",
(image_id,) + keypoints.shape + (array_to_blob(keypoints),))
def add_descriptors(self, image_id, descriptors):
descriptors = np.ascontiguousarray(descriptors, np.uint8)
self.execute(
"INSERT INTO descriptors VALUES (?, ?, ?, ?)",
(image_id,) + descriptors.shape + (array_to_blob(descriptors),))
def add_matches(self, image_id1, image_id2, matches):
assert(len(matches.shape) == 2)
assert(matches.shape[1] == 2)
if image_id1 > image_id2:
matches = matches[:,::-1]
pair_id = image_ids_to_pair_id(image_id1, image_id2)
matches = np.asarray(matches, np.uint32)
self.execute(
"INSERT INTO matches VALUES (?, ?, ?, ?)",
(pair_id,) + matches.shape + (array_to_blob(matches),))
def add_two_view_geometry(self, image_id1, image_id2, matches,
F=np.eye(3), E=np.eye(3), H=np.eye(3), config=2):
assert(len(matches.shape) == 2)
assert(matches.shape[1] == 2)
if image_id1 > image_id2:
matches = matches[:,::-1]
pair_id = image_ids_to_pair_id(image_id1, image_id2)
matches = np.asarray(matches, np.uint32)
F = np.asarray(F, dtype=np.float64)
E = np.asarray(E, dtype=np.float64)
H = np.asarray(H, dtype=np.float64)
self.execute(
"INSERT INTO two_view_geometries VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(pair_id,) + matches.shape + (array_to_blob(matches), config,
array_to_blob(F), array_to_blob(E), array_to_blob(H)))
def read_images(self):
image_id_to_name, name_to_image_id = {}, {}
image_results = self.execute("SELECT * FROM images")
for result in image_results:
image_id, name, camera_id, q0, q1, q2, q3, t0, t1, t2 = result
image_id_to_name[image_id] = name
name_to_image_id[name] = image_id
return image_id_to_name, name_to_image_id
def read_keypoints(self, mapping=None):
image_id_to_keypoints = {}
keypoints_results = self.execute("SELECT * FROM keypoints")
for keypoints_result in keypoints_results:
image_id, rows, cols, keypoints = keypoints_result
keypoints = blob_to_array(keypoints, np.float32, (rows, cols))
if mapping is None:
image_id_to_keypoints[image_id] = keypoints
else:
image_id_to_keypoints[mapping[image_id]] = keypoints
return image_id_to_keypoints
def read_matches(self, mapping=None):
matches_results = self.execute("SELECT * FROM matches")
matches = {}
for matches_result in matches_results:
pair_id, rows, cols, match = matches_result
image_id0, image_id1 = pair_id_to_image_ids(pair_id)
if rows == 0:
continue
match = blob_to_array(match, dtype=np.uint32, shape=(rows, cols))
if mapping is not None:
image_id0 = mapping[image_id0]
image_id1 = mapping[image_id1]
matches[(image_id0, image_id1)] = match
return matches
def read_two_view_geometry(self, mapping=None):
geometry = self.execute("SELECT * FROM two_view_geometries")
geometries = {}
for pair_id, rows, cols, data, config, F, E, H in geometry:
F = blob_to_array(F, dtype=np.float64)
E = blob_to_array(E, dtype=np.float64)
H = blob_to_array(H, dtype=np.float64)
image_id0, image_id1 = pair_id_to_image_ids(pair_id)
match = blob_to_array(data, dtype=np.uint32, shape=(rows, cols))
if rows == 0:continue
if mapping is not None:
image_id0 = mapping[image_id0]
image_id1 = mapping[image_id1]
geometries[(image_id0, image_id1)] = {'matches': match, 'F':F, 'E':E, 'H':H, 'config': config}
return geometries
def create_empty_db(database_path):
if os.path.exists(database_path):
mywarn('Removing old database: {}'.format(database_path))
os.remove(database_path)
print('Creating an empty database...')
db = COLMAPDatabase.connect(database_path)
db.create_tables()
db.commit()
db.close()
def create_cameras(db, cameras, subs, width, height, share_intri=True):
model = 'OPENCV'
if share_intri:
cam_id = 1
K = cameras[subs[0]]['K']
D = cameras[subs[0]]['dist'].reshape(1, 5)
fx, fy, cx, cy, k1, k2, p1, p2, k3, k4, k5, k6 = K[0, 0], K[1, 1], K[0, 2], K[1, 2], D[0, 0], D[0, 1], D[0, 2], D[0, 3], D[0, 4], 0, 0, 0
params = [fx, fy, cx, cy, k1, k2, p1, p2]
# params = [fx, fy, cx, cy, 0, 0, 0, 0]
camera = Camera(
id=cam_id,
model=model,
width=width,
height=height,
params=params
)
cameras_colmap = {cam_id: camera}
cameras_map = {sub:cam_id for sub in subs}
#
db.add_camera(CAMERA_MODEL_NAMES[model].model_id, width, height, params,
prior_focal_length=False, camera_id=cam_id)
else:
raise NotImplementedError
return cameras_colmap, cameras_map
def create_images(db, cameras, cameras_map, image_names):
subs = sorted(list(image_names.keys()))
images = {}
for sub, image_name in image_names.items():
img_id = subs.index(sub) + 1
R = cameras[sub]['R']
T = cameras[sub]['T']
qvec = rotmat2qvec(R)
tvec = T.T[0]
image = Image(
id=img_id,
qvec=qvec,
tvec=tvec,
camera_id=cameras_map[sub],
name=os.path.basename(image_name),
xys=[],
point3D_ids=[]
)
images[img_id] = image
db.add_image(image.name, camera_id=image.camera_id,
prior_q=image.qvec, prior_t=image.tvec, image_id=img_id)
return images
def copy_images(data, out, nf=0, copy_func=shutil.copyfile, mask='mask', add_mask=True):
subs = sorted(os.listdir(join(data, 'images')))
image_names = {}
for sub in subs:
srcname = join(data, 'images', sub, '{:06d}.jpg'.format(nf))
if not os.path.exists(srcname):
mywarn('{} not exists, skip'.format(srcname))
return False
dstname = join(out, 'images', '{}.jpg'.format(sub))
image_names[sub] = dstname
if os.path.exists(dstname):
continue
os.makedirs(os.path.dirname(dstname), exist_ok=True)
copy_func(srcname, dstname)
mskname = join(data, mask, sub, '{:06d}.png'.format(nf))
dstname = join(out, 'mask', '{}.jpg.png'.format(sub))
if os.path.exists(mskname) and add_mask:
os.makedirs(os.path.dirname(dstname), exist_ok=True)
copy_func(mskname, dstname)
return True, image_names
def colmap_feature_extract(colmap, path, share_camera, add_mask):
'''
struct SiftMatchingOptions {
// Number of threads for feature matching and geometric verification.
int num_threads = -1;
// Whether to use the GPU for feature matching.
bool use_gpu = true;
// Index of the GPU used for feature matching. For multi-GPU matching,
// you should separate multiple GPU indices by comma, e.g., "0,1,2,3".
std::string gpu_index = "-1";
// Maximum distance ratio between first and second best match.
double max_ratio = 0.8;
// Maximum distance to best match.
double max_distance = 0.7;
// Whether to enable cross checking in matching.
bool cross_check = true;
// Maximum number of matches.
int max_num_matches = 32768;
// Maximum epipolar error in pixels for geometric verification.
double max_error = 4.0;
// Confidence threshold for geometric verification.
double confidence = 0.999;
// Minimum/maximum number of RANSAC iterations. Note that this option
// overrules the min_inlier_ratio option.
int min_num_trials = 100;
int max_num_trials = 10000;
// A priori assumed minimum inlier ratio, which determines the maximum
// number of iterations.
double min_inlier_ratio = 0.25;
// Minimum number of inliers for an image pair to be considered as
// geometrically verified.
int min_num_inliers = 15;
// Whether to attempt to estimate multiple geometric models per image pair.
bool multiple_models = false;
// Whether to perform guided matching, if geometric verification succeeds.
bool guided_matching = false;
bool Check() const;
};
'''
cmd = f'{colmap} feature_extractor --database_path {path}/database.db \
--image_path {path}/images \
--SiftExtraction.peak_threshold 0.006 \
--ImageReader.camera_model OPENCV \
'
if share_camera:
cmd += ' --ImageReader.single_camera 1'
if add_mask:
cmd += f' --ImageReader.mask_path {path}/mask'
cmd += f' >> {path}/log.txt'
run_cmd(cmd)
def colmap_feature_match(colmap, path):
cmd = f'{colmap} exhaustive_matcher --database_path {path}/database.db \
--SiftMatching.guided_matching 0 \
--SiftMatching.max_ratio 0.8 \
--SiftMatching.max_distance 0.5 \
--SiftMatching.cross_check 1 \
--SiftMatching.max_error 4 \
--SiftMatching.max_num_matches 32768 \
--SiftMatching.confidence 0.9999 \
--SiftMatching.max_num_trials 10000 \
--SiftMatching.min_inlier_ratio 0.25 \
--SiftMatching.min_num_inliers 30 \
>> {path}/log.txt'
run_cmd(cmd)
def colmap_ba(colmap, path, with_init=False):
if with_init:
cmd = f'{colmap} point_triangulator --database_path {path}/database.db \
--image_path {path}/images \
--input_path {path}/sparse/0 \
--output_path {path}/sparse/0 \
--Mapper.tri_merge_max_reproj_error 3 \
--Mapper.ignore_watermarks 1 \
--Mapper.filter_max_reproj_error 2 \
>> {path}/log.txt'
run_cmd(cmd)
cmd = f'{colmap} bundle_adjuster \
--input_path {path}/sparse/0 \
--output_path {path}/sparse/0 \
>> {path}/log.txt'
run_cmd(cmd)
points3d = read_points3d_binary(join(path, 'sparse', '0', 'points3D.bin'))
pids = list(points3d.keys())
mean_error = np.mean([points3d[p].error for p in pids])
log('Triangulate {} points, mean error: {:.2f} pixel'.format(len(pids), mean_error))
else:
mkdir(join(path, 'sparse'))
cmd = f'{colmap} mapper --database_path {path}/database.db --image_path {path}/images --output_path {path}/sparse \
--Mapper.ba_refine_principal_point 1 \
--Mapper.ba_global_max_num_iterations 1000 \
>> {path}/log.txt'
run_cmd(cmd)
def colmap_dense(colmap, path):
mkdir(join(path, 'dense'))
cmd = f'{colmap} image_undistorter --image_path {path}/images --input_path {path}/sparse/0 --output_path {path}/dense --output_type COLMAP --max_image_size 2000'
run_cmd(cmd)
cmd = f'{colmap} patch_match_stereo \
--workspace_path {path}/dense \
--workspace_format COLMAP \
--PatchMatchStereo.geom_consistency true \
>> {path}/log.txt'
run_cmd(cmd)
cmd = f'{colmap} stereo_fusion \
--workspace_path {path}/dense \
--workspace_format COLMAP \
--input_type geometric \
--output_path {path}/dense/fused.ply \
>> {path}/log.txt'
run_cmd(cmd)

View File

@ -0,0 +1,86 @@
'''
@ Date: 2022-02-14 14:54:50
@ Author: Qing Shuai
@ Mail: s_q@zju.edu.cn
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-06-14 18:07:19
@ FilePath: /EasyMocapPublic/easymocap/mytools/debug_utils.py
'''
from termcolor import colored
import os
from os.path import join
import shutil
import subprocess
import time
import datetime
def toc():
return time.time() * 1000
def myprint(cmd, level):
color = {'run': 'blue', 'info': 'green', 'warn': 'yellow', 'error': 'red'}[level]
print(colored(cmd, color))
def log(text):
myprint(text, 'info')
def log_time(text):
strf = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
print(colored(strf, 'yellow'), colored(text, 'green'))
def mywarn(text):
myprint(text, 'warn')
warning_infos = set()
def oncewarn(text):
if text in warning_infos:
return
warning_infos.add(text)
myprint(text, 'warn')
def myerror(text):
myprint(text, 'error')
def run_cmd(cmd, verbo=True, bg=False):
if verbo: myprint('[run] ' + cmd, 'run')
if bg:
args = cmd.split()
print(args)
p = subprocess.Popen(args)
return [p]
else:
os.system(cmd)
return []
def mkdir(path):
if os.path.exists(path):
return 0
log('mkdir {}'.format(path))
os.makedirs(path, exist_ok=True)
def cp(srcname, dstname):
mkdir(join(os.path.dirname(dstname)))
shutil.copyfile(srcname, dstname)
def print_table(header, contents):
from tabulate import tabulate
length = len(contents[0])
tables = [[] for _ in range(length)]
mean = ['Mean']
for icnt, content in enumerate(contents):
for i in range(length):
if isinstance(content[i], float):
tables[i].append('{:6.2f}'.format(content[i]))
else:
tables[i].append('{}'.format(content[i]))
if icnt > 0:
mean.append('{:6.2f}'.format(sum(content)/length))
tables.append(mean)
print(tabulate(tables, header, tablefmt='fancy_grid'))
def check_exists(path):
flag1 = os.path.isfile(path) and os.path.exists(path)
flag2 = os.path.isdir(path) and len(os.listdir(path)) >= 10
return flag1 or flag2

View File

@ -2,8 +2,8 @@
@ Date: 2021-03-15 12:23:12
@ Author: Qing Shuai
@ LastEditors: Qing Shuai
@ LastEditTime: 2021-06-14 22:25:58
@ FilePath: /EasyMocapRelease/easymocap/mytools/file_utils.py
@ LastEditTime: 2022-07-21 15:55:02
@ FilePath: /EasyMocapPublic/easymocap/mytools/file_utils.py
'''
import os
import json
@ -11,12 +11,18 @@ import numpy as np
from os.path import join
mkdir = lambda x:os.makedirs(x, exist_ok=True)
mkout = lambda x:mkdir(os.path.dirname(x))
# mkout = lambda x:mkdir(os.path.dirname(x)) if x is not None
def mkout(x):
if x is not None:
mkdir(os.path.dirname(x))
def read_json(path):
assert os.path.exists(path), path
with open(path) as f:
try:
data = json.load(f)
except:
print('Reading error {}'.format(path))
data = []
return data
def save_json(file, data):
@ -25,6 +31,17 @@ def save_json(file, data):
with open(file, 'w') as f:
json.dump(data, f, indent=4)
def append_json(file, data):
if not os.path.exists(os.path.dirname(file)):
os.makedirs(os.path.dirname(file))
if os.path.exists(file):
res = read_json(file)
assert isinstance(res, list)
res.append(data)
data = res
with open(file, 'w') as f:
json.dump(data, f, indent=4)
save_annot = save_json
def getFileList(root, ext='.jpg'):
@ -51,19 +68,24 @@ def read_annot(annotname, mode='body25'):
data[i]['id'] = data[i].pop('personID')
if 'keypoints2d' in data[i].keys() and 'keypoints' not in data[i].keys():
data[i]['keypoints'] = data[i].pop('keypoints2d')
for key in ['bbox', 'keypoints', 'handl2d', 'handr2d', 'face2d']:
for key in ['bbox', 'keypoints',
'bbox_handl2d', 'handl2d',
'bbox_handr2d', 'handr2d',
'bbox_face2d', 'face2d']:
if key not in data[i].keys():continue
data[i][key] = np.array(data[i][key])
if key == 'face2d':
# TODO: Make parameters, 17 is the offset for the eye brows,
# etc. 51 is the total number of FLAME compatible landmarks
data[i][key] = data[i][key][17:17+51, :]
if 'bbox' in data[i].keys():
data[i]['bbox'] = data[i]['bbox'][:5]
if data[i]['bbox'][-1] < 0.001:
# print('{}/{} bbox conf = 0, may be error'.format(annotname, i))
data[i]['bbox'][-1] = 1
print('{}/{} bbox conf = 0, may be error'.format(annotname, i))
data[i]['bbox'][-1] = 0
# combine the basic results
if mode == 'body25':
data[i]['keypoints'] = data[i]['keypoints']
data[i]['keypoints'] = data[i].get('keypoints', np.zeros((25, 3)))
elif mode == 'body15':
data[i]['keypoints'] = data[i]['keypoints'][:15, :]
elif mode in ['handl', 'handr']:
@ -91,7 +113,7 @@ def array2raw(array, separator=' ', fmt='%.3f'):
res.append(separator.join([fmt%(d) for d in data]))
def myarray2string(array, separator=', ', fmt='%.3f', indent=8):
def myarray2string(array, separator=', ', fmt='%7.7f', indent=8):
assert len(array.shape) == 2, 'Only support MxN matrix, {}'.format(array.shape)
blank = ' ' * indent
res = ['[']
@ -110,14 +132,16 @@ def write_common_results(dumpname=None, results=[], keys=[], fmt='%2.3f'):
out_text.append(' {\n')
output = {}
output['id'] = data['id']
for key in keys:
if key not in data.keys():continue
for k in ['type']:
if k in data.keys():output[k] = '\"{}\"'.format(data[k])
keys_current = [k for k in keys if k in data.keys()]
for key in keys_current:
# BUG: This function will failed if the rows of the data[key] is too large
# output[key] = np.array2string(data[key], max_line_width=1000, separator=', ', formatter=format_out)
output[key] = myarray2string(data[key], separator=', ', fmt=fmt)
for key in output.keys():
out_text.append(' \"{}\": {}'.format(key, output[key]))
if key != keys[-1]:
if key != keys_current[-1]:
out_text.append(',\n')
else:
out_text.append('\n')
@ -134,17 +158,16 @@ def write_common_results(dumpname=None, results=[], keys=[], fmt='%2.3f'):
else:
return ''.join(out_text)
def write_keypoints3d(dumpname, results):
def write_keypoints3d(dumpname, results, keys = ['keypoints3d']):
# TODO:rewrite it
keys = ['keypoints3d']
write_common_results(dumpname, results, keys, fmt='%6.3f')
write_common_results(dumpname, results, keys, fmt='%6.7f')
def write_vertices(dumpname, results):
keys = ['vertices']
write_common_results(dumpname, results, keys, fmt='%6.3f')
write_common_results(dumpname, results, keys, fmt='%6.5f')
def write_smpl(dumpname, results):
keys = ['Rh', 'Th', 'poses', 'expression', 'shapes']
keys = ['Rh', 'Th', 'poses', 'handl', 'handr', 'expression', 'shapes']
write_common_results(dumpname, results, keys)
def batch_bbox_from_pose(keypoints2d, height, width, rate=0.1):

View File

@ -2,8 +2,8 @@
@ Date: 2021-04-21 15:19:21
@ Author: Qing Shuai
@ LastEditors: Qing Shuai
@ LastEditTime: 2021-07-29 16:12:37
@ FilePath: /EasyMocap/easymocap/mytools/reader.py
@ LastEditTime: 2022-07-22 23:23:26
@ FilePath: /EasyMocapPublic/easymocap/mytools/reader.py
'''
# function to read data
"""
@ -27,17 +27,14 @@ def read_keypoints3d(filename):
res_ = []
for d in data:
pid = d['id'] if 'id' in d.keys() else d['personID']
pose3d = np.array(d['keypoints3d'], dtype=np.float32)
if pose3d.shape[0] > 25:
# 对于有手的情况把手的根节点赋值成body25上的点
pose3d[25, :] = pose3d[7, :]
pose3d[46, :] = pose3d[4, :]
ret = {'id': pid, 'type': 'body25'}
for key in ['keypoints3d', 'handl3d', 'handr3d', 'face3d']:
if key not in d.keys():continue
pose3d = np.array(d[key], dtype=np.float32)
if pose3d.shape[1] == 3:
pose3d = np.hstack([pose3d, np.ones((pose3d.shape[0], 1))])
res_.append({
'id': pid,
'keypoints3d': pose3d
})
ret[key] = pose3d
res_.append(ret)
return res_
def read_keypoints3d_dict(filename):
@ -56,9 +53,11 @@ def read_keypoints3d_dict(filename):
def read_smpl(filename):
datas = read_json(filename)
if isinstance(datas, dict):
datas = datas['annots']
outputs = []
for data in datas:
for key in ['Rh', 'Th', 'poses', 'shapes', 'expression']:
for key in ['Rh', 'Th', 'poses', 'handl', 'handr', 'shapes', 'expression', 'keypoints3d']:
if key in data.keys():
data[key] = np.array(data[key], dtype=np.float32)
# for smplx results

View File

@ -0,0 +1,735 @@
import numpy as np
import cv2
from easymocap.datasets.base import crop_image
from easymocap.estimator.wrapper_base import bbox_from_keypoints
from easymocap.mytools.vis_base import merge, plot_keypoints_auto
from .debug_utils import log, mywarn, myerror
def batch_triangulate(keypoints_, Pall, min_view=2):
# keypoints: (nViews, nJoints, 3)
# Pall: (nViews, 3, 4)
# A: (nJoints, nViewsx2, 4), x: (nJoints, 4, 1); b: (nJoints, nViewsx2, 1)
v = (keypoints_[:, :, -1]>0).sum(axis=0)
valid_joint = np.where(v >= min_view)[0]
keypoints = keypoints_[:, valid_joint]
conf3d = keypoints[:, :, -1].sum(axis=0)/v[valid_joint]
# P2: P矩阵的最后一行(1, nViews, 1, 4)
P0 = Pall[None, :, 0, :]
P1 = Pall[None, :, 1, :]
P2 = Pall[None, :, 2, :]
# uP2: x坐标乘上P2: (nJoints, nViews, 1, 4)
uP2 = keypoints[:, :, 0].T[:, :, None] * P2
vP2 = keypoints[:, :, 1].T[:, :, None] * P2
conf = keypoints[:, :, 2].T[:, :, None]
Au = conf * (uP2 - P0)
Av = conf * (vP2 - P1)
A = np.hstack([Au, Av])
u, s, v = np.linalg.svd(A)
X = v[:, -1, :]
X = X / X[:, 3:]
# out: (nJoints, 4)
result = np.zeros((keypoints_.shape[1], 4))
result[valid_joint, :3] = X[:, :3]
result[valid_joint, 3] = conf3d #* (conf[..., 0].sum(axis=-1)>min_view)
return result
def project_points(keypoints, RT, einsum='vab,kb->vka'):
homo = np.concatenate([keypoints[..., :3], np.ones_like(keypoints[..., :1])], axis=-1)
kpts2d = np.einsum(einsum, RT, homo)
kpts2d[..., :2] /= kpts2d[..., 2:]
return kpts2d
def make_Cnk(n, k):
import itertools
res = {}
for n_ in range(3, n+1):
n_0 = [i for i in range(n_)]
for k_ in range(2, k+1):
res[(n_, k_)] = list(map(list, itertools.combinations(n_0, k_)))
return res
MAX_VIEWS = 30
Cnk = make_Cnk(MAX_VIEWS, 3)
def robust_triangulate_point(kpts2d, Pall, dist_max, min_v = 3):
nV = kpts2d.shape[0]
if len(kpts2d) < min_v:# 重建失败
return [], None
# min_v = max(2, nV//2)
# 1. choose the combination of min_v
index_ = Cnk[(len(kpts2d), min(min_v, len(kpts2d)))]
# 2. proposals: store the reconstruction points of each proposal
proposals = np.zeros((len(index_), 4))
weight_self = np.zeros((nV, len(index_)))
for i, index in enumerate(index_):
weight_self[index, i] = 100.
point = batch_triangulate(kpts2d[index, :], Pall[index], min_view=min_v)
proposals[i] = point
# 3. project the proposals to each view
# and calculate the reprojection error
# (nViews, nProposals, 4)
kpts_repro = project_points(proposals, Pall)
conf = (proposals[None, :, -1] > 0) * (kpts2d[..., -1] > 0)
# err: (nViews, nProposals)
err = np.linalg.norm(kpts_repro[..., :2] - kpts2d[..., :2], axis=-1) * conf
valid = 1. - err/dist_max
valid[valid<0] = 0
# consider the weight of different view
# TODO:naive weight:
conf = kpts2d[..., -1]
weight = conf
# (valid > 0)*weight_self 一项用于强制要求使用到的两个视角都需要被用到
# 增加一项使用的视角数的加成
weight_sum = (weight * valid).sum(axis=0) + ((valid > 0)*weight_self).sum(axis=0) - min_v * 100
if weight_sum.max() < 0:# 重建失败
return [], None
best = weight_sum.argmax()
if (err[index_[best], best] > dist_max).any():
return [], None
# 对于选出来的proposal寻找其大于0的其他视角
point = proposals[best]
best_add = np.where(valid[:, best])[0].tolist()
index = list(index_[best])
best_add.sort(key=lambda x:-weight[x])
for add in best_add:
if add in index:
continue
index.append(add)
point = batch_triangulate(kpts2d[index, :], Pall[index], min_view=min_v)
kpts_repro = project_points(point, Pall[index])
err = np.linalg.norm(kpts_repro[..., :2] - kpts2d[index, ..., :2], axis=-1)
if (err > dist_max).any():
index.remove(add)
break
return index, point
def remove_outview(kpts2d, out_view, debug):
if len(out_view) == 0:
return False
outv = out_view[0]
if debug:
mywarn('[triangulate] remove outview: {} from {}'.format(outv, out_view))
kpts2d[outv] = 0.
return True
def remove_outjoint(kpts2d, Pall, out_joint, dist_max, min_view=3, debug=False):
if len(out_joint) == 0:
return False
if debug:
mywarn('[triangulate] remove outjoint: {}'.format(out_joint))
for nj in out_joint:
valid = np.where(kpts2d[:, nj, -1] > 0)[0]
if len(valid) < min_view:
# if less than 3 visible view, set these unvisible
kpts2d[:, nj, -1] = 0
continue
if len(valid) > MAX_VIEWS:
# only select max points
conf = -kpts2d[:, nj, -1]
valid = conf.argsort()[:MAX_VIEWS]
index_j, point = robust_triangulate_point(kpts2d[valid, nj:nj+1], Pall[valid], dist_max=dist_max, min_v=3)
index_j = valid[index_j]
# print('select {} for joint {}'.format(index_j, nj))
set0 = np.zeros(kpts2d.shape[0])
set0[index_j] = 1.
kpts2d[:, nj, -1] *= set0
return True
def project_and_distance(kpts3d, RT, kpts2d):
kpts_proj = project_points(kpts3d, RT)
# 1. distance between input and projection
conf = (kpts3d[None, :, -1] > 0) * (kpts2d[:, :, -1] > 0)
dist = np.linalg.norm(kpts_proj[..., :2] - kpts2d[..., :2], axis=-1) * conf
return dist, conf
def iterative_triangulate(kpts2d, RT, previous=None,
min_conf=0.1, min_view=3, min_joints=3, dist_max=0.05, dist_vel=0.05,
thres_outlier_view=0.4, thres_outlier_joint=0.4, debug=False):
kpts2d = kpts2d.copy()
conf = kpts2d[..., -1]
kpts2d[conf<min_conf] = 0.
if debug:
log('[triangulate] kpts2d: {}'.format(kpts2d.shape))
# TODO: consider large motion
if previous is not None:
dist, conf = project_and_distance(previous, RT, kpts2d)
nottrack = (dist > dist_vel) & conf
if nottrack.sum() > 0:
kpts2d[nottrack] = 0.
if debug:
log('[triangulate] Remove with track {}'.format(np.where(nottrack)))
while True:
# 0. triangulate and project
kpts3d = batch_triangulate(kpts2d, RT, min_view=min_view)
dist, conf = project_and_distance(kpts3d, RT, kpts2d)
# 2. find the outlier
vv, jj = np.where(dist > dist_max)
if vv.shape[0] < 1:
if debug:
log('[triangulate] Not found outlier, break')
break
ratio_outlier_view = (dist>dist_max).sum(axis=1)/(1e-5 + conf.sum(axis=1))
ratio_outlier_joint = (dist>dist_max).sum(axis=0)/(1e-5 + conf.sum(axis=0))
# 3. find the totally wrong detections
out_view = np.where(ratio_outlier_view > thres_outlier_view)[0]
out_joint = np.where(ratio_outlier_joint > thres_outlier_joint)[0]
if len(out_view) > 1:
dist_view = dist.sum(axis=1)/(1e-5 + conf.sum(axis=1))
out_view = out_view.tolist()
out_view.sort(key=lambda x:-dist_view[x])
if debug: mywarn('[triangulate] Remove outlier view: {}'.format(ratio_outlier_view))
if remove_outview(kpts2d, out_view, debug): continue
if remove_outjoint(kpts2d, RT, out_joint, dist_max, debug=debug): continue
if debug:
log('[triangulate] Directly remove {}, {}'.format(vv, jj))
kpts2d[vv, jj, -1] = 0.
if debug:
log('[triangulate] finally {} valid points'.format((kpts3d[..., -1]>0).sum()))
if (kpts3d[..., -1]>0).sum() < min_joints:
kpts3d[..., -1] = 0.
kpts2d[..., -1] = 0.
return kpts3d, kpts2d
return kpts3d, kpts2d
class BaseTriangulator:
def __init__(self, config, debug, keys) -> None:
self.config = config
self.debug = debug
self.keys = keys
def project_and_check(self, kpts3d, kpts2d, RT):
kpts_proj = project_points(kpts3d, RT)
conf = (kpts3d[None, :, -1] > 0) * (kpts2d[:, :, -1] > 0)
dist = np.linalg.norm(kpts_proj[..., :2] - kpts2d[..., :2], axis=-1) * conf
return conf, dist
def triangulate_with_results(self, pid, data, results):
new = {'id': pid}
for key in self.keys:
key3d = key.replace('2d', '3d')
if len(results) == 0:
kpts3d, kpts2d = iterative_triangulate(data[key + '_unproj'], data['RT'],
debug=self.debug, **self.config[key])
else:
if len(results) == 1:
previous = results[-1][key3d] # TODO: mean previous frame
elif len(results) >= 2:
# TODO: mean previous velocity
previous0 = results[-2][key3d] # TODO: mean previous frame
previous1 = results[-1][key3d] # TODO: mean previous frame
vel = (previous1[:, :3] - previous0[:, :3])*((previous0[:, -1:]>0)&(previous0[:, -1:]>0))
previous = previous1.copy()
previous[:, :3] += vel
kpts3d, kpts2d = iterative_triangulate(data[key + '_unproj'], data['RT'],
debug=self.debug, previous=previous, **self.config[key])
vel = np.linalg.norm(kpts3d[:, :3] - previous[:, :3], axis=-1)
new[key] = np.concatenate([data[key+'_distort'][..., :-1], kpts2d[..., -1:]], axis=-1)
new[key3d] = kpts3d
return new
class SimpleTriangulator(BaseTriangulator):
def __init__(self, keys, debug, config,
pid=0) -> None:
super().__init__(config, debug, keys)
self.results = []
self.infos = []
self.dim_name = ['_joints', '_views']
self.pid = pid
def __call__(self, data, results=None):
info = {}
if results is None:
results = self.results
new = {'id': self.pid}
for key in self.keys:
if key not in data.keys(): continue
key3d = key.replace('2d', '3d')
if self.debug:
log('[triangulate] {}'.format(key))
if len(results) == 0:
kpts3d, kpts2d = iterative_triangulate(data[key + '_unproj'], data['RT'],
debug=self.debug, **self.config[key])
else:
if len(results) == 1:
previous = results[-1][key3d] # TODO: mean previous frame
elif len(results) >= 2:
# TODO: mean previous velocity
previous0 = results[-2][key3d] # TODO: mean previous frame
previous1 = results[-1][key3d] # TODO: mean previous frame
vel = (previous1[:, :3] - previous0[:, :3])*((previous0[:, -1:]>0)&(previous0[:, -1:]>0))
previous = previous1.copy()
previous[:, :3] += vel
kpts3d, kpts2d = iterative_triangulate(data[key + '_unproj'], data['RT'],
debug=self.debug, previous=previous, **self.config[key])
vel = np.linalg.norm(kpts3d[:, :3] - previous[:, :3], axis=-1)
new[key] = np.concatenate([data[key+'_distort'][..., :-1], kpts2d[..., -1:]], axis=-1)
new[key.replace('2d', '3d')] = kpts3d
if self.debug:
conf, dist = self.project_and_check(kpts3d, kpts2d, data['RT'])
for dim in [0, 1]:
info_dim = {
'valid': conf.sum(axis=dim),
'dist': 10000*dist.sum(axis=dim)/(1e-5 + conf.sum(axis=dim)),
}
info[key+self.dim_name[dim]] = info_dim
info[key+'_joints']['valid3d'] = kpts3d[:, -1] >0
results.append(new)
self.infos.append(info)
return [new]
def report(self):
if not self.debug:
return 0
from .debug_utils import print_table
for key in self.infos[0].keys():
metrics = list(self.infos[0][key].keys())
values = [np.mean(np.stack([info[key][metric] for info in self.infos]), axis=0) for metric in metrics]
metrics = [key] + metrics
values = [[i for i in range(values[0].shape[0])]] + values
print_table(metrics, values)
class SimpleTriangulatorMulti(SimpleTriangulator):
def __init__(self, pids, **cfg) -> None:
super().__init__(**cfg)
self.results = {}
def __call__(self, data, results=None):
res_now = []
for ipid, pid in enumerate(data['pid']):
if pid not in self.results.keys():
self.results[pid] = []
data_ = {'RT': data['RT']}
for key in self.keys:
data_[key+'_distort'] = data[key+'_distort'][:, ipid]
data_[key+'_unproj'] = data[key+'_unproj'][:, ipid]
data_[key] = data[key][:, ipid]
res = self.triangulate_with_results(pid, data_, self.results[pid])
self.results[pid].append(res)
res_now.append(res)
return res_now
def skew_op(x):
skew_op = lambda x: np.array([[0, -x[2], x[1]], [x[2], 0, -x[0]], [-x[1], x[0], 0]])
res = np.zeros((3, 3), dtype=x.dtype)
# 0, -z, y
res[0, 1] = -x[2, 0]
res[0, 2] = x[1, 0]
# z, 0, -x
res[1, 0] = x[2, 0]
res[1, 2] = -x[0, 0]
# -y, x, 0
res[2, 0] = -x[1, 0]
res[2, 1] = x[0, 0]
return res
def fundamental_op(K0, K1, R_0, T_0, R_1, T_1):
invK0 = np.linalg.inv(K0)
return invK0.T @ (R_0 @ R_1.T) @ K1.T @ skew_op(K1 @ R_1 @ R_0.T @ (T_0 - R_0 @ R_1.T @ T_1))
def drawlines(img1,img2,lines,pts1,pts2):
''' img1 - image on which we draw the epilines for the points in img2
lines - corresponding epilines '''
r,c = img1.shape[:2]
for r,pt1,pt2 in zip(lines,pts1,pts2):
pt1 = list(map(lambda x:int(x+0.5), pt1[:2].tolist()))
pt2 = list(map(lambda x:int(x+0.5), pt2[:2].tolist()))
if pt1[0] < 0 or pt1[1] < 0:
continue
color = tuple(np.random.randint(0,255,3).tolist())
x0,y0 = map(int, [0, -r[2]/r[1] ])
x1,y1 = map(int, [c, -(r[2]+r[0]*c)/r[1] ])
img1 = cv2.line(img1, (x0,y0), (x1,y1), color,1)
img1 = cv2.circle(img1,tuple(pt1),5,color,-1)
img2 = cv2.circle(img2,tuple(pt2),5,color,-1)
return img1,img2
def SimpleConstrain(dimGroups):
constrain = np.ones((dimGroups[-1], dimGroups[-1]))
for i in range(len(dimGroups)-1):
start, end = dimGroups[i], dimGroups[i+1]
constrain[start:end, start:end] = 0
N = constrain.shape[0]
constrain[range(N), range(N)] = 1
return constrain
def check_cluster(affinity, row, views, dimGroups, indices, p2dAssigned, visited):
affinity_row = affinity[row].copy()
# given affinity and row, select the combine of all possible set
cluster = np.where((affinity[row]>0)&(p2dAssigned==-1)&(visited==0))[0].tolist()
cluster.sort(key=lambda x:-affinity[row, x])
views_ = views[cluster]
view_count = np.bincount(views[cluster])
indices_all = [indices]
for col in cluster:
v = views[col]
nOld = len(indices_all)
if indices[v] != -1: # already assigned, copy and make new
for i in range(nOld):
ind = indices_all[i].copy()
ind[v] = col
indices_all.append(ind)
else: # not assigned, assign
for i in range(nOld):
indices_all[i][v] = col
return indices_all
def views_from_dimGroups(dimGroups):
views = np.zeros(dimGroups[-1], dtype=np.int)
for nv in range(len(dimGroups) - 1):
views[dimGroups[nv]:dimGroups[nv+1]] = nv
return views
class SimpleMatchAndTriangulator(SimpleTriangulator):
def __init__(self, num_joints, min_views, min_joints, cfg_svt, cfg_track, **cfg) -> None:
super().__init__(**cfg)
self.nJoints = num_joints
self.cfg_svt = cfg_svt
self.cfg_track = cfg_track
self.min_views = min_views
self.min_joints = min_joints
self.time = -1
self.max_id = 0
self.tracks = {}
self.loglevel_dict = {
'info': 0,
'warn': 1,
'error': 2,
}
self.loglevel = self.loglevel_dict['info'] # ['info', 'warn', 'error']
self.debug = False
self.data = None
self.people = None
def log(self, text):
if self.loglevel > 0:
return 0
log(text)
def warn(self, text):
if self.loglevel > 1:
return 0
mywarn(text)
@staticmethod
def distance_by_epipolar(pts0, pts1, K0, K1, R0, T0, R1, T1):
F = fundamental_op(K0, K1, R0, T0, R1, T1)
# Find epilines corresponding to points in left image (first image) and
# drawing its lines on right image
lines0 = cv2.computeCorrespondEpilines(pts0[..., :2].reshape (-1,1,2), 2, F)
# Find epilines corresponding to points in right image (second image) and
# drawing its lines on left image
lines1 = cv2.computeCorrespondEpilines(pts1[..., :2].reshape(-1,1,2), 1, F)
if False:
H, W = 1080, 1920
img0 = np.zeros((H, W, 3), dtype=np.uint8) +255
img4, img3 = drawlines(img0.copy(), img0.copy(), lines0.reshape(-1, 3), pts1.reshape(-1, 3), pts0.reshape(-1,3))
img5,img6 = drawlines(img0.copy(), img0.copy(), lines1.reshape(-1, 3), pts0.reshape(-1,3), pts1.reshape(-1,3))
import matplotlib.pyplot as plt
plt.subplot(121)
plt.imshow(img5)
plt.subplot(122)
plt.imshow(img4)
plt.show()
lines0 = lines0.reshape(pts0.shape)
lines1 = lines1.reshape(pts1.shape)
# dist: (D_v0, D_v1, nJoints)
dist01 = np.abs(np.sum(lines0[:, None, :, :2] * pts1[None, :, :, :2], axis=-1) + lines0[:, None, :, 2])
conf = pts0[:, None, :, 2] * pts1[None, :, :, 2]
dist10 = np.abs(np.sum(lines1[:, None, :, :2] * pts0[None, :, :, :2], axis=-1) + lines1[:, None, :, 2])
dist = np.sum(dist01 * conf + dist10.transpose(1, 0, 2) * conf, axis=-1)/(conf.sum(axis=-1) + 1e-5)/2
return dist
def _simple_associate2d_triangulate(self, data, affinity, dimGroups, prev_id):
# sum1 = affinity.sum(axis=1)
# 注意:这里的排序应该是对每个视角,挑选最大的一个
sum1 = np.zeros((affinity.shape[0]))
for i in range(len(dimGroups)-1):
start, end = dimGroups[i], dimGroups[i+1]
if end == start:continue
sum1 += affinity[:, start:end].max(axis=-1)
n2d = affinity.shape[0]
nViews = len(dimGroups) - 1
idx_zero = np.zeros(nViews, dtype=np.int) - 1
views = views_from_dimGroups(dimGroups)
# the assigned results of each person
p2dAssigned = np.zeros(n2d, dtype=np.int) - 1
visited = np.zeros(n2d, dtype=np.int)
sortidx = np.argsort(-sum1)
pid = 0
k3dresults = []
for idx in sortidx:
if p2dAssigned[idx] != -1:
continue
if prev_id[idx] != -1:
results = [self.people[prev_id[idx]]]
else:
results = []
proposals = check_cluster(affinity, row=idx, views=views,
dimGroups=dimGroups, indices=idx_zero.copy(), p2dAssigned=p2dAssigned, visited=visited)
for indices in proposals:
if (indices > -1).sum() < self.min_views - (len(results)):
continue
# set keypoints2d
info = {'RT': data['RT']}
for name in ['keypoints2d', 'keypoints2d_unproj', 'keypoints2d_distort']:
info[name] = np.zeros((nViews, self.nJoints, 3), dtype=np.float32)
for nv in range(nViews):
if indices[nv] == -1: continue
for name in ['keypoints2d', 'keypoints2d_unproj', 'keypoints2d_distort']:
info[name][nv] = data[name][nv][indices[nv]-dimGroups[nv]]
res = super().__call__(info, results=results)[0]
k2d = res['keypoints2d']
valid_view = (k2d[..., 2] > 0).sum(axis=-1) > self.min_joints
# if valid_view.sum() < self.min_views - len(results): # 这里如果是有前一帧的话len(results)会是2不知道之前为啥有这个条件使用
if valid_view.sum() < self.min_views:
self.log('[associate] Skip proposal {}->{} with not enough valid view {}'.format(idx, indices, (k2d[..., 2] > 0).sum(axis=-1)))
continue
valid_joint = res['keypoints3d'][:, -1] > 0.1
if valid_joint.sum() < self.min_joints:
self.log('[associate] Skip proposal {}->{} as not enough joints'.format(idx, indices))
continue
indices[~valid_view] = -1
if (indices < 0).all():
import ipdb; ipdb.set_trace()
self.log('[associate] Add indices {}, valid {}'.format(indices, (k2d[..., 2] > 0).sum(axis=-1)))
res['id'] = pid
res['indices'] = indices
res['valid_view'] = valid_view
res['valid_joints'] = res['keypoints3d'][:, -1] > 0.1
k3dresults.append(res)
for nv in range(nViews):
if valid_view[nv] and indices[nv] != -1:
p2dAssigned[indices[nv]] = pid
visited[indices[nv]] = 1
pid += 1
break
visited[idx] = 1
self.log('[associate] {} points not visited, {} not assigned'.format(visited.shape[0] - visited.sum(), (p2dAssigned==-1).sum()))
k3dresults.sort(key=lambda x: -x['keypoints2d'][..., -1].sum())
return k3dresults
def _calculate_affinity_MxM(self, dims, dimGroups, data, key):
M = dimGroups[-1]
distance = np.zeros((M, M), dtype=np.float32)
nViews = len(dims)
for v0 in range(nViews-1):
for v1 in range(1, nViews):
# calculate distance between (v0, v1)
if v0 >= v1:
continue
if dims[v0] == 0 or dims[v1] == 0:
continue
if True:
pts0 = data[key][v0]
pts1 = data[key][v1]
K0, K1 = data['K'][v0], data['K'][v1]
R0, T0 = data['Rc'][v0], data['Tc'][v0]
R1, T1 = data['Rc'][v1], data['Tc'][v1]
dist = self.distance_by_epipolar(pts0, pts1, K0, K1, R0, T0, R1, T1)
dist /= (K0[0, 0] + K1[0, 0])/2
else:
dist = self.distance_by_ray(pts0, pts1, R0, T0, R1, T1)
distance[dimGroups[v0]:dimGroups[v0+1], dimGroups[v1]:dimGroups[v1+1]] = dist
distance[dimGroups[v1]:dimGroups[v1+1], dimGroups[v0]:dimGroups[v0+1]] = dist.T
DIST_MAX = self.cfg_track.track_dist_max
for nv in range(nViews):
distance[dimGroups[nv]:dimGroups[nv+1], dimGroups[nv]:dimGroups[nv+1]] = DIST_MAX
distance -= np.eye(M) * DIST_MAX
aff = (DIST_MAX - distance)/DIST_MAX
aff = np.clip(aff, 0, 1)
return aff
def _calculate_affinity_MxN(self, dims, dimGroups, data, key, results):
M = dimGroups[-1]
N = len(results)
distance = np.zeros((M, N), dtype=np.float32)
nViews = len(dims)
k3d = np.stack([r['keypoints3d'] for r in results])
kpts_proj = project_points(k3d, data['KRT'], einsum='vab,pkb->vpka')
depth = kpts_proj[..., -1]
kpts_proj[depth<0] = -10000
for v in range(nViews):
if dims[v] == 0:
continue
focal = data['K'][v][0, 0]
pts2d = data[key][v][:, None]
pts_repro = kpts_proj[v][None]
conf = np.sqrt(pts2d[..., -1]*k3d[None, ..., -1])
diff = np.linalg.norm(pts2d[..., :2] - pts_repro[..., :2], axis=-1)
diff = np.sum(diff*conf, axis=-1)/(1e-5 + np.sum(conf, axis=-1))
dist = diff / focal
distance[dimGroups[v]:dimGroups[v+1], :] = dist
DIST_MAX = self.cfg_track.track_repro_max
aff = (DIST_MAX - distance)/DIST_MAX
aff = np.clip(aff, 0, 1)
return aff
def _svt_optimize_affinity(self, affinity, dimGroups):
# match SVT
import pymatchlr
observe = np.ones_like(affinity)
aff_svt = pymatchlr.matchSVT(affinity, dimGroups, SimpleConstrain(dimGroups), observe, self.cfg_svt)
aff_svt[aff_svt<self.cfg_svt.aff_min] = 0.
if False:
import matplotlib.pyplot as plt
M = affinity.shape[0]
plt.subplot(121)
plt.imshow(affinity)
plt.hlines([i-0.5 for i in dimGroups[1:]], -0.5, M-0.5, 'w')
plt.vlines([i-0.5 for i in dimGroups[1:]], -0.5, M-0.5, 'w')
plt.subplot(122)
sum_row = aff_svt.sum(axis=1, keepdims=True)/(len(dimGroups) - 1)
plt.imshow(np.hstack([aff_svt, sum_row]))
plt.hlines([i-0.5 for i in dimGroups[1:]], -0.5, M-0.5, 'w')
plt.vlines([i-0.5 for i in dimGroups[1:]], -0.5, M-0.5, 'w')
plt.ioff()
plt.show()
return aff_svt
def _track_add(self, res):
pid = res['id']
if pid == -1:
pid = self.max_id
res['id'] = pid
self.max_id += 1
self.log('[{:06d}] Create track {} <- {}'.format(self.time, pid, res['indices']))
if False:
crops = []
data = self.data
kpts = np.vstack(data['keypoints2d'])
for nv in range(len(data['imgname'])):
img = cv2.imread(data['imgname'][nv])
if res['indices'][nv] == -1: continue
_kpts = kpts[res['indices'][nv]]
bbox = bbox_from_keypoints(_kpts)
plot_keypoints_auto(img, _kpts, pid)
crop = crop_image(img, bbox, crop_square=True)
crops.append(crop)
debug = merge(crops)
cv2.imwrite('debug/{:06d}.jpg'.format(pid), debug)
else:
self.max_id = max(self.max_id, pid+1)
self.log('[{:06d}] Initialize track {}, valid joints={}'.format(self.time, pid, (res['keypoints3d'][:, -1]>0.01).sum()))
self.tracks[pid] = {
'start_time': self.time,
'end_time': self.time+1,
'missing_frame': [],
'infos': [res]
}
def _track_update(self, res, pid):
res['id'] = pid
info = self.tracks[pid]
self.log('[{:06d}] Update track {} [{}->{}], valid joints={}'.format(self.time, pid, info['start_time'], info['end_time'], (res['keypoints3d'][:, -1]>0.1).sum()))
self.tracks[pid]['end_time'] = self.time + 1
self.tracks[pid]['infos'].append(res)
def _track_merge(self, res, pid):
res['id'] = -1
# TODO: merge
def _track_and_update(self, data, results):
cfg = self.cfg_track
self.time += 1
if self.time == 0:
# initialize the tracks
for res in results:
self._track_add(res)
return results
# filter the missing frames
for pid in list(self.tracks.keys()):
if self.time - self.tracks[pid]['end_time'] > cfg.max_missing_frame:
self.warn('[{:06d}] Remove track {}'.format(self.time, pid))
self.tracks.pop(pid)
# track the results with greedy matching
for idx_match, res in enumerate(results):
res['id'] = -1
# compute the distance
k3d = res['keypoints3d'][None]
pids_free = [pid for pid in self.tracks.keys() if self.tracks[pid]['end_time'] != self.time+1]
pids_used = [pid for pid in self.tracks.keys() if self.tracks[pid]['end_time'] == self.time+1]
def check_dist(k3d_check):
dist = np.linalg.norm(k3d[..., :3] - k3d_check[..., :3], axis=-1)
conf = np.sqrt(k3d[..., 3] * k3d_check[..., 3])
dist_mean = ((conf>0.1).sum(axis=-1) < self.min_joints)*cfg.track_dist_max + np.sum(dist * conf, axis=-1)/(1e-5 + np.sum(conf, axis=-1))
argmin = dist_mean.argmin()
dist_min = dist_mean[argmin]
return dist_mean, argmin, dist_min
# check free
NOT_VISITED = -2
NOT_FOUND = -1
flag_tracked, flag_current = NOT_VISITED, NOT_VISITED
if len(pids_free) > 0:
k3d_check = np.stack([self.tracks[pid]['infos'][-1]['keypoints3d'] for pid in pids_free])
dist_track, best, best_dist_track = check_dist(k3d_check)
if best_dist_track < cfg.track_dist_max:
flag_tracked = best
else:
flag_tracked = NOT_FOUND
# check used
if len(pids_used) > 0:
k3d_check = np.stack([self.tracks[pid]['infos'][-1]['keypoints3d'] for pid in pids_used])
dist_cur, best, best_dist_curr = check_dist(k3d_check)
if best_dist_curr < cfg.track_dist_max:
flag_current = best
else:
flag_current = NOT_FOUND
if flag_tracked >= 0 and (flag_current == NOT_VISITED or flag_current == NOT_FOUND):
self._track_update(res, pids_free[flag_tracked])
elif (flag_tracked == NOT_FOUND or flag_tracked==NOT_VISITED) and flag_current >= 0:
# 没有跟踪到但是有当前帧的3D的合并
self.log('[{:06d}] Merge track {} to {}'.format(self.time, idx_match, pids_used[flag_current]))
self._track_merge(res, pids_used[flag_current])
elif flag_tracked == NOT_FOUND and flag_current == NOT_FOUND:
# create a new track
self._track_add(res)
else:
# 丢弃
self.log('[{:06d}] Remove track {}. No close points'.format(self.time, idx_match))
for pid in list(self.tracks.keys()):
if self.tracks[pid]['end_time'] != self.time + 1:
self.warn('[{:06d}] Tracking {} missing'.format(self.time, pid))
results = [r for r in results if r['id']!=-1]
return results
def __call__(self, data):
# match the data
self.data = data
key = 'keypoints2d'
dims = [d.shape[0] for d in data[key]]
dimGroups = np.cumsum([0] + dims)
# 1. compute affinity
affinity = self._calculate_affinity_MxM(dims, dimGroups, data, key)
N2D = affinity.shape[0]
if self.people is not None and len(self.people) > 0:
# add 3d affinity
_affinity = affinity
affinity_3d = self._calculate_affinity_MxN(dims, dimGroups, data, key, self.people)
affinity = np.concatenate([affinity, affinity_3d], axis=1)
eye3d = np.eye(affinity_3d.shape[1])
affinity = np.concatenate([affinity, np.hstack((affinity_3d.T, eye3d))], axis=0)
dimGroups = dimGroups.tolist()
dimGroups.append(dimGroups[-1]+affinity_3d.shape[1])
affinity = self._svt_optimize_affinity(affinity, dimGroups)
# affinity = self._svt_optimize_affinity(_affinity, dimGroups[:-1])
# recover
affinity_3d = np.hstack([np.ones((N2D, 1))*0.5, affinity[:N2D, N2D:]])
prev_id = affinity_3d.argmax(axis=-1) - 1
affinity = affinity[:N2D, :N2D]
dimGroups = np.array(dimGroups[:-1])
else:
affinity = self._svt_optimize_affinity(affinity, dimGroups)
prev_id = np.zeros(N2D) - 1
# 2. associate and triangulate
results = self._simple_associate2d_triangulate(data, affinity, dimGroups, prev_id)
# 3. track, filter and return
results = self._track_and_update(data, results)
results.sort(key=lambda x:x['id'])
self.people = results
return results

View File

@ -2,27 +2,38 @@
@ Date: 2020-11-28 17:23:04
@ Author: Qing Shuai
@ LastEditors: Qing Shuai
@ LastEditTime: 2021-08-22 16:11:25
@ FilePath: /EasyMocap/easymocap/mytools/vis_base.py
@ LastEditTime: 2022-08-12 21:50:56
@ FilePath: /EasyMocapPublic/easymocap/mytools/vis_base.py
'''
import cv2
import numpy as np
import json
def generate_colorbar(N = 20, cmap = 'jet'):
def generate_colorbar(N = 20, cmap = 'jet', rand=True):
bar = ((np.arange(N)/(N-1))*255).astype(np.uint8).reshape(-1, 1)
colorbar = cv2.applyColorMap(bar, cv2.COLORMAP_JET).squeeze()
if False:
colorbar = np.clip(colorbar + 64, 0, 255)
if rand:
import random
random.seed(666)
index = [i for i in range(N)]
random.shuffle(index)
rgb = colorbar[index, :]
else:
rgb = colorbar
rgb = rgb.tolist()
return rgb
colors_bar_rgb = generate_colorbar(cmap='hsv')
# colors_bar_rgb = generate_colorbar(cmap='hsv')
colors_bar_rgb = [
(94, 124, 226), # 青色
(255, 200, 87), # yellow
(74, 189, 172), # green
(8, 76, 97), # blue
(219, 58, 52), # red
(77, 40, 49), # brown
]
colors_table = {
'b': [0.65098039, 0.74117647, 0.85882353],
@ -34,15 +45,19 @@ colors_table = {
'r': [ 251/255., 128/255., 114/255.],
'_orange': [ 253/255., 174/255., 97/255.],
'y': [ 250/255., 230/255., 154/255.],
'_r':[255/255,0,0],
'g':[0,255/255,0],
'_b':[0,0,255/255],
'k':[0,0,0],
'_r':[255/255,0,0],
'_g':[0,255/255,0],
'_b':[0,0,255/255],
'_k':[0,0,0],
'_y':[255/255,255/255,0],
'purple':[128/255,0,128/255],
'smap_b':[51/255,153/255,255/255],
'smap_r':[255/255,51/255,153/255],
'smap_b':[51/255,255/255,153/255],
'person': [255/255,255/255,255/255],
'handl': [255/255,51/255,153/255],
'handr': [51/255,255/255,153/255],
}
def get_rgb(index):
@ -51,7 +66,9 @@ def get_rgb(index):
return (255, 255, 255)
if index < -1:
return (0, 0, 0)
col = colors_bar_rgb[index%len(colors_bar_rgb)]
# elif index == 0:
# return (245, 150, 150)
col = list(colors_bar_rgb[index%len(colors_bar_rgb)])[::-1]
else:
col = colors_table.get(index, (1, 0, 0))
col = tuple([int(c*255) for c in col[::-1]])
@ -80,13 +97,14 @@ def plot_cross(img, x, y, col, width=-1, lw=-1):
cv2.line(img, (int(x-width), int(y)), (int(x+width), int(y)), col, lw)
cv2.line(img, (int(x), int(y-width)), (int(x), int(y+width)), col, lw)
def plot_bbox(img, bbox, pid, vis_id=True):
def plot_bbox(img, bbox, pid, scale=1, vis_id=True):
# 画bbox: (l, t, r, b)
x1, y1, x2, y2 = bbox[:4]
x1 = int(round(x1))
x2 = int(round(x2))
y1 = int(round(y1))
y2 = int(round(y2))
x1, y1, x2, y2, c = bbox
if c < 0.01:return img
x1 = int(round(x1*scale))
x2 = int(round(x2*scale))
y1 = int(round(y1*scale))
y2 = int(round(y2*scale))
color = get_rgb(pid)
lw = max(img.shape[0]//300, 2)
cv2.rectangle(img, (x1, y1), (x2, y2), color, lw)
@ -94,11 +112,20 @@ def plot_bbox(img, bbox, pid, vis_id=True):
font_scale = img.shape[0]/1000
cv2.putText(img, '{}'.format(pid), (x1, y1+int(25*font_scale)), cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, 2)
def plot_keypoints(img, points, pid, config, vis_conf=False, use_limb_color=True, lw=2):
def plot_keypoints(img, points, pid, config, vis_conf=False, use_limb_color=True, lw=2, fliplr=False):
lw = max(lw, 2)
H, W = img.shape[:2]
for ii, (i, j) in enumerate(config['kintree']):
if i >= len(points) or j >= len(points):
continue
if (i >25 or j > 25) and config['nJoints'] != 42:
_lw = max(int(lw/4), 1)
else:
_lw = lw
pt1, pt2 = points[i], points[j]
if fliplr:
pt1 = (W-pt1[0], pt1[1])
pt2 = (W-pt2[0], pt2[1])
if use_limb_color:
col = get_rgb(config['colors'][ii])
else:
@ -106,32 +133,111 @@ def plot_keypoints(img, points, pid, config, vis_conf=False, use_limb_color=True
if pt1[-1] > 0.01 and pt2[-1] > 0.01:
image = cv2.line(
img, (int(pt1[0]+0.5), int(pt1[1]+0.5)), (int(pt2[0]+0.5), int(pt2[1]+0.5)),
col, lw)
for i in range(len(points)):
col, _lw)
for i in range(min(len(points), config['nJoints'])):
x, y = points[i][0], points[i][1]
if fliplr:
x = W - x
c = points[i][-1]
if c > 0.01:
text_size = img.shape[0]/1000
col = get_rgb(pid)
radius = int(lw/1.5)
if i > 25 and config['nJoints'] != 42:
radius = max(int(radius/4), 1)
cv2.circle(img, (int(x+0.5), int(y+0.5)), radius, col, -1)
if vis_conf:
cv2.putText(img, '{:.1f}'.format(c), (int(x), int(y)),
cv2.FONT_HERSHEY_SIMPLEX, text_size, col, 2)
def plot_keypoints_auto(img, points, pid, vis_conf=False, use_limb_color=True, scale=1, lw=-1):
from ..dataset.config import CONFIG
config_name = {25: 'body25', 21: 'hand', 42:'handlr', 17: 'coco', 1:'points', 67:'bodyhand', 137: 'total', 79:'up'}[len(points)]
config = CONFIG[config_name]
if lw == -1:
lw = img.shape[0]//200
if config_name == 'hand':
lw = img.shape[0]//1000
lw = max(lw, 1)
for ii, (i, j) in enumerate(config['kintree']):
if i >= len(points) or j >= len(points):
continue
if i >= 25 and config_name in ['bodyhand', 'total']:
lw = max(img.shape[0]//400, 1)
pt1, pt2 = points[i], points[j]
if use_limb_color:
col = get_rgb(config['colors'][ii])
else:
col = get_rgb(pid)
if pt1[0] < 0 or pt1[1] < 0 or pt1[0] > 10000 or pt1[1] > 10000:
continue
if pt2[0] < 0 or pt2[1] < 0 or pt2[0] > 10000 or pt2[1] > 10000:
continue
if pt1[-1] > 0.01 and pt2[-1] > 0.01:
image = cv2.line(
img, (int(pt1[0]*scale+0.5), int(pt1[1]*scale+0.5)), (int(pt2[0]*scale+0.5), int(pt2[1]*scale+0.5)),
col, lw)
lw = img.shape[0]//200
if config_name == 'hand':
lw = img.shape[0]//500
lw = max(lw, 1)
for i in range(len(points)):
x, y = points[i][0]*scale, points[i][1]*scale
if x < 0 or y < 0 or x >10000 or y >10000:
continue
if i >= 25 and config_name in ['bodyhand', 'total']:
lw = max(img.shape[0]//400, 1)
c = points[i][-1]
if c > 0.01:
col = get_rgb(pid)
if len(points) == 1:
cv2.circle(img, (int(x+0.5), int(y+0.5)), lw*10, col, lw*2)
plot_cross(img, int(x+0.5), int(y+0.5), width=lw*5, col=col, lw=lw*2)
else:
cv2.circle(img, (int(x+0.5), int(y+0.5)), lw*2, col, -1)
if vis_conf:
cv2.putText(img, '{:.1f}'.format(c), (int(x), int(y)), cv2.FONT_HERSHEY_SIMPLEX, 1, col, 2)
def plot_points2d(img, points2d, lines, lw=4, col=(0, 255, 0), putText=True):
def plot_keypoints_total(img, annots, scale, pid_offset=0):
_lw = img.shape[0] // 150
for annot in annots:
pid = annot['personID'] + pid_offset
for key in ['keypoints', 'handl2d', 'handr2d']:
if key not in annot.keys():continue
if key in ['handl2d', 'handr2d', 'face2d']:
lw = _lw // 2
else:
lw = _lw
lw = max(lw, 1)
plot_keypoints_auto(img, annot[key], pid, vis_conf=False, use_limb_color=False, scale=scale, lw=lw)
if 'bbox' not in annot.keys() or (annot['bbox'][0] < 0 or annot['bbox'][0] >10000):
continue
plot_bbox(img, annot['bbox'], pid, scale=scale, vis_id=True)
return img
def plot_points2d(img, points2d, lines, lw=-1, col=(0, 255, 0), putText=True, style='+'):
# 将2d点画上去
if points2d.shape[1] == 2:
points2d = np.hstack([points2d, np.ones((points2d.shape[0], 1))])
if lw == -1:
lw = img.shape[0]//200
for i, (x, y, v) in enumerate(points2d):
if v < 0.01:
continue
c = col
plot_cross(img, x, y, width=10, col=c, lw=lw)
if '+' in style:
plot_cross(img, x, y, width=10, col=c, lw=lw*2)
if 'o' in style:
cv2.circle(img, (int(x), int(y)), 10, c, lw*2)
cv2.circle(img, (int(x), int(y)), lw, c, lw)
if putText:
font_scale = img.shape[0]/2000
c = col[::-1]
font_scale = img.shape[0]/1000
cv2.putText(img, '{}'.format(i), (int(x), int(y)), cv2.FONT_HERSHEY_SIMPLEX, font_scale, c, 2)
for i, j in lines:
if points2d[i][2] < 0.01 or points2d[j][2] < 0.01:
continue
plot_line(img, points2d[i], points2d[j], 2, col)
plot_line(img, points2d[i], points2d[j], max(1, lw//2), col)
row_col_ = {
2: (2, 1),
@ -140,7 +246,18 @@ row_col_ = {
9: (3, 3),
26: (4, 7)
}
def get_row_col(l):
row_col_square = {
2: (2, 1),
7: (3, 3),
8: (3, 3),
9: (3, 3),
26: (5, 5)
}
def get_row_col(l, square):
if square and l in row_col_square.keys():
return row_col_square[l]
if l in row_col_.keys():
return row_col_[l]
else:
@ -153,12 +270,19 @@ def get_row_col(l):
row, col = col, row
return row, col
def merge(images, row=-1, col=-1, resize=False, ret_range=False, **kwargs):
def merge(images, row=-1, col=-1, resize=False, ret_range=False, square=False, **kwargs):
if row == -1 and col == -1:
row, col = get_row_col(len(images))
row, col = get_row_col(len(images), square)
height = images[0].shape[0]
width = images[0].shape[1]
# special case
if height > width:
if len(images) == 3:
row, col = 1, 3
if len(images[0].shape) > 2:
ret_img = np.zeros((height * row, width * col, images[0].shape[2]), dtype=np.uint8) + 255
else:
ret_img = np.zeros((height * row, width * col), dtype=np.uint8) + 255
ranges = []
for i in range(row):
for j in range(col):

View File

@ -0,0 +1,104 @@
'''
@ Date: 2022-03-29 13:55:42
@ Author: Qing Shuai
@ Mail: s_q@zju.edu.cn
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-05-06 16:45:47
@ FilePath: /EasyMocapPublic/scripts/dataset/download_youtube.py
'''
from glob import glob
from os.path import join
from urllib.error import URLError
from pytube import YouTube
import os
from easymocap.mytools.debug_utils import log, mkdir, myerror
extensions = ['.mp4', '.webm']
def download_youtube(vid, outdir):
outname = join(outdir, vid)
url = 'https://www.youtube.com/watch?v={}'.format(vid)
for ext in extensions:
if os.path.exists(outname+ext) and not args.restart:
log('[Info]: skip video {}'.format(outname+ext))
return 0
log('[Info]: start to download video {}'.format(outname))
log('[Info]: {}'.format(url))
yt = YouTube(url)
try:
streams = yt.streams
except KeyError:
myerror('[Error]: not found streams: {}'.format(url))
return 1
except URLError:
myerror('[Error]: Url error: {}'.format(url))
return 1
find = False
streams_valid = []
res_range = ['2160p', '1440p', '1080p', '720p'] if not args.only4k else ['2160p']
if args.no720:
res_range.remove('720p')
for res in res_range:
for fps in [60, 50, 30, 25, 24]:
for ext in ['webm', 'mp4']:
for stream in streams:
if stream.resolution == res and \
stream.fps == fps and \
stream.mime_type == 'video/{}'.format(ext):
streams_valid.append(stream)
if len(streams_valid) == 0:
for stream in streams:
print(stream)
myerror('[BUG ] Not found valid stream, please check the streams')
return 0
# best_stream = yt.streams.order_by('filesize')[-1]
title = streams_valid[0].title
log('[Info]: {}'.format(title))
for stream in streams_valid:
res = stream.resolution
log('[Info]: The resolution is {}, ext={}'.format(res, stream.mime_type))
filename = '{}.{}'.format(vid, stream.mime_type.split('/')[-1])
try:
stream.download(output_path=outdir, filename=filename, max_retries=0)
log('[Info]: Succeed')
except:
myerror('[BUG ]: Failed')
continue
break
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('vid', type=str)
parser.add_argument('--database', type=str, default='data/youtube')
parser.add_argument('--num', type=int, default=1)
parser.add_argument('--only4k', action='store_true')
parser.add_argument('--no720', action='store_true')
parser.add_argument('--restart', action='store_true')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
vid = args.vid
# check database
database = join(args.database, 'videos')
mkdir(database)
videonames = sorted(os.listdir(database))
log('[download] video database in {}'.format(database))
log('[download] already has {} videos'.format(len(videonames)))
if vid.startswith('https'):
vid = vid.replace('https://www.youtube.com/watch?v=', '')
vid = vid.split('&')[0]
print(vid)
urls = [vid]
elif os.path.exists(vid):
with open(vid, 'r') as f:
urls = f.readlines()
urls = list(filter(lambda x:not x.startswith('#') and len(x) > 0, map(lambda x: x.strip().replace('https://www.youtube.com/watch?v=', '').split('&')[0], urls)))
log('[download] download {} videos from {}'.format(len(urls), vid))
else:
urls = [vid]
for url in urls:
download_youtube(url, database)