161 lines
6.5 KiB
Python
161 lines
6.5 KiB
Python
'''
|
|
@ Date: 2021-03-02 16:12:59
|
|
@ Author: Qing Shuai
|
|
@ LastEditors: Qing Shuai
|
|
@ LastEditTime: 2022-10-11 16:36:12
|
|
@ FilePath: /EasyMocapPublic/apps/calibration/calib_intri.py
|
|
'''
|
|
# This script calibrate each intrinsic parameters
|
|
import shutil
|
|
import random
|
|
from easymocap.mytools.debug_utils import log, mywarn
|
|
from easymocap.mytools.vis_base import plot_points2d
|
|
from easymocap.mytools import write_intri, read_json, Timer
|
|
import numpy as np
|
|
import cv2
|
|
import os
|
|
from os.path import join
|
|
from glob import glob
|
|
from easymocap.annotator.chessboard import get_lines_chessboard
|
|
from tqdm import tqdm
|
|
|
|
def read_chess(chessname):
|
|
data = read_json(chessname)
|
|
k3d = np.array(data['keypoints3d'], dtype=np.float32)
|
|
k2d = np.array(data['keypoints2d'], dtype=np.float32)
|
|
if (k2d[:, -1] > 0.).sum() < k2d.shape[0]//2:
|
|
return False, k2d, k3d
|
|
if k2d[:, -1].sum() < k2d.shape[0]:
|
|
valid = k2d[:, -1] > 0.1
|
|
k2d = k2d[valid]
|
|
k3d = k3d[valid]
|
|
# TODO:去除正对相机的
|
|
# TODO:去除各条线不平行的噪声
|
|
return True, k2d, k3d
|
|
|
|
def pop(k2ds_, k3ds_, valid_idx, imgnames, max_num):
|
|
k2ds = np.stack(k2ds_)
|
|
dist = np.linalg.norm(k2ds[:, None] - k2ds[None, :], axis=-1).mean(axis=-1)
|
|
size = np.linalg.norm(k2ds[:, -1] - k2ds[:, 0], axis=-1)
|
|
dist = dist / size[:, None]
|
|
row = np.arange(dist.shape[0])
|
|
dist[row, row] = 9999.
|
|
col = dist.argmin(axis=0)
|
|
dist_min = dist[row, col]
|
|
indices = dist_min.argsort()[:dist_min.shape[0] - max_num]
|
|
if False:
|
|
img0 = cv2.imread(imgnames[valid_idx[idx]])
|
|
img1 = cv2.imread(imgnames[valid_idx[remove_id]])
|
|
cv2.imshow('01', np.hstack([img0, img1]))
|
|
cv2.waitKey(10)
|
|
print('remove: ', imgnames[valid_idx[remove_id]], imgnames[valid_idx[idx]])
|
|
indices = indices.tolist()
|
|
indices.sort(reverse=True, key=lambda x:col[x])
|
|
removed = set()
|
|
for idx in indices:
|
|
remove_id = col[idx]
|
|
if remove_id in removed:
|
|
continue
|
|
removed.add(remove_id)
|
|
valid_idx.pop(remove_id)
|
|
k2ds_.pop(remove_id)
|
|
k3ds_.pop(remove_id)
|
|
|
|
def load_chessboards(chessnames, imagenames, max_image, sample_image=-1, out='debug-calib'):
|
|
os.makedirs(out, exist_ok=True)
|
|
k3ds_, k2ds_, imgs = [], [], []
|
|
valid_idx = []
|
|
for i, chessname in enumerate(tqdm(chessnames, desc='read')):
|
|
flag, k2d, k3d = read_chess(chessname)
|
|
if not flag:
|
|
continue
|
|
k3ds_.append(k3d)
|
|
k2ds_.append(k2d)
|
|
valid_idx.append(i)
|
|
if max_image > 0 and len(valid_idx) > max_image + int(max_image * 0.1):
|
|
pop(k2ds_, k3ds_, valid_idx, imagenames, max_num=max_image)
|
|
if sample_image > 0:
|
|
mywarn('[calibration] Load {} images, sample {} images'.format(len(k3ds_), sample_image))
|
|
index = [i for i in range(len(k2ds_))]
|
|
index_sample = random.sample(index, min(sample_image, len(index)))
|
|
valid_idx = [valid_idx[i] for i in index_sample]
|
|
k2ds_ = [k2ds_[i] for i in index_sample]
|
|
k3ds_ = [k3ds_[i] for i in index_sample]
|
|
else:
|
|
log('[calibration] Load {} images'.format(len(k3ds_)))
|
|
for ii, idx in enumerate(valid_idx):
|
|
shutil.copyfile(imagenames[idx], join(out, '{:06d}.jpg'.format(ii)))
|
|
return k3ds_, k2ds_
|
|
|
|
def calib_intri_share(path, image, ext):
|
|
camnames = sorted(os.listdir(join(path, image)))
|
|
camnames = [cam for cam in camnames if os.path.isdir(join(path, image, cam))]
|
|
|
|
imagenames = sorted(glob(join(path, image, '*', '*' + ext)))
|
|
chessnames = sorted(glob(join(path, 'chessboard', '*', '*.json')))
|
|
k3ds_, k2ds_ = load_chessboards(chessnames, imagenames, args.num, args.sample, out=join(args.path, 'output'))
|
|
with Timer('calibrate'):
|
|
print('[Info] start calibration with {} detections'.format(len(k2ds_)))
|
|
gray = cv2.imread(imagenames[0], 0)
|
|
k3ds = k3ds_
|
|
k2ds = [np.ascontiguousarray(k2d[:, :-1]) for k2d in k2ds_]
|
|
ret, K, dist, rvecs, tvecs = cv2.calibrateCamera(
|
|
k3ds, k2ds, gray.shape[::-1], None, None,
|
|
flags=cv2.CALIB_FIX_K3)
|
|
cameras = {}
|
|
for cam in camnames:
|
|
cameras[cam] = {
|
|
'K': K,
|
|
'dist': dist # dist: (1, 5)
|
|
}
|
|
if True:
|
|
img = cv2.imread(imagenames[0])
|
|
h, w = img.shape[:2]
|
|
newcameramtx, roi = cv2.getOptimalNewCameraMatrix(K, dist, (w,h), 1, (w,h))
|
|
mapx, mapy = cv2.initUndistortRectifyMap(K, dist, None, newcameramtx, (w,h), 5)
|
|
for imgname in tqdm(imagenames):
|
|
img = cv2.imread(imgname)
|
|
dst = cv2.remap(img, mapx, mapy, cv2.INTER_LINEAR)
|
|
outname = join(path, 'output', os.path.basename(imgname))
|
|
cv2.imwrite(outname, dst)
|
|
write_intri(join(path, 'output', 'intri.yml'), cameras)
|
|
|
|
def calib_intri(path, image, ext):
|
|
camnames = sorted(os.listdir(join(path, image)))
|
|
camnames = [cam for cam in camnames if os.path.isdir(join(path, image, cam))]
|
|
cameras = {}
|
|
for ic, cam in enumerate(camnames):
|
|
imagenames = sorted(glob(join(path, image, cam, '*'+ext)))
|
|
chessnames = sorted(glob(join(path, 'chessboard', cam, '*.json')))
|
|
k3ds_, k2ds_ = load_chessboards(chessnames, imagenames, args.num, out=join(args.path, 'output', cam+'_used'))
|
|
k3ds = k3ds_
|
|
k2ds = [np.ascontiguousarray(k2d[:, :-1]) for k2d in k2ds_]
|
|
gray = cv2.imread(imagenames[0], 0)
|
|
print('>> Camera {}: {:3d} frames'.format(cam, len(k2ds)))
|
|
with Timer('calibrate'):
|
|
ret, K, dist, rvecs, tvecs = cv2.calibrateCamera(
|
|
k3ds, k2ds, gray.shape[::-1], None, None,
|
|
flags=cv2.CALIB_FIX_K3)
|
|
cameras[cam] = {
|
|
'K': K,
|
|
'dist': dist # dist: (1, 5)
|
|
}
|
|
write_intri(join(path, 'output', 'intri.yml'), cameras)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('path', type=str, default='/home/')
|
|
parser.add_argument('--image', type=str, default='images')
|
|
parser.add_argument('--ext', type=str, default='.jpg', choices=['.jpg', '.png'])
|
|
parser.add_argument('--num', type=int, default=-1)
|
|
parser.add_argument('--sample', type=int, default=-1)
|
|
parser.add_argument('--share_intri', action='store_true')
|
|
parser.add_argument('--remove', action='store_true')
|
|
args = parser.parse_args()
|
|
if args.share_intri:
|
|
calib_intri_share(args.path, args.image, ext=args.ext)
|
|
else:
|
|
calib_intri(args.path, args.image, ext=args.ext)
|