EasyMocap/easymocap/annotator/basic_annotator.py
2022-10-25 20:57:27 +08:00

623 lines
23 KiB
Python

import shutil
import cv2
import os
from tqdm import tqdm
from .basic_keyboard import print_help, register_keys
from .basic_visualize import plot_text, resize_to_screen, merge
from .basic_callback import point_callback, CV_KEY, get_key
from .bbox_callback import callback_select_image
from .file_utils import load_annot_to_tmp, read_json, save_annot
import copy
class ComposedCallback:
def __init__(self, callbacks=[point_callback], processes=[]) -> None:
self.callbacks = callbacks
self.processes = processes
def call(self, event, x, y, flags, param):
scale = param['scale']
x, y = int(round(x/scale)), int(round(y/scale))
for callback in self.callbacks:
callback(event, x, y, flags, param)
for key in ['click', 'start', 'end']:
if param[key] is not None:
break
else:
return 0
for process in self.processes:
ret = process(**param)
if ret: # 操作成功,结束
param['click'] = None
param['start'] = None
param['end'] = None
def get_valid_yn():
while True:
key = input('Saving this annotations? [y/n]')
if key in ['y', 'n']:
break
print('Please specify [y/n]')
return key
restore_key = {
'body25': ('bbox', 'keypoints'),
'handl': ('bbox_handl2d', 'handl2d'),
'handr': ('bbox_handr2d', 'handr2d'),
}
class BaseWindow:
# 2021.10.11: 考虑新的层次的抽象
# 这个实现的基类只包含了打开一个窗口->进行标注->关闭窗口 功能
def __init__(self, name, param, register_keys, vis_funcs, callbacks) -> None:
self.name = name
register_keys['h'] = print_help
register_keys['q'] = self.quit
register_keys['x'] = self.clear
register_keys['Q'] = self.quit_without_save
self.register_keys = register_keys
self.vis_funcs = vis_funcs
self.isOpen = True
param['click'] = None
param['start'] = None
param['end'] = None
self.param0 = copy.deepcopy(param)
self.param = param
cv2.namedWindow(self.name)
callback = ComposedCallback(processes=callbacks)
cv2.setMouseCallback(self.name, callback.call, self.param)
def quit_without_save(self, annotator, param):
self.quit(annotator, param, save=False)
def clear(self, annotator, param):
select = param['select']
for key in select.keys():
select[key] = -1
for key in ['click', 'start', 'end']:
self.param[key] = None
def quit(self, annotator, param, save=True):
for key in ['click', 'start', 'end']:
if self.param[key] is not None:
self.param[key] = None
break
else:
if not save:
for key in self.param.keys():
self.param[key] = self.param0[key]
else:
self.save_and_quit()
self.isOpen = False
cv2.destroyWindow(self.name)
def save_and_quit(self, key=None):
pass
def run(self, key=None, noshow=False):
if key is None:
key = chr(get_key())
if key in self.register_keys.keys():
self.register_keys[key](self, param=self.param)
if not self.isOpen:
return 0
if noshow:
return 0
img = self.param['img0'].copy()
for func in self.vis_funcs:
img = func(img, **self.param)
cv2.imshow(self.name, img)
class AnnotBase:
def __init__(self, dataset, key_funcs={}, callbacks=[], vis_funcs=[],
name = 'main', body='body25',
start=0, end=100000, step=10, no_window=False) -> None:
self.name = name
self.dataset = dataset
self.nFrames = len(dataset)
self.step = step
self.register_keys = register_keys.copy()
self.register_keys.update(key_funcs)
self.no_img = False
if resize_to_screen not in vis_funcs:
vis_funcs += [resize_to_screen]
self.vis_funcs = vis_funcs
self.start = start
self.end = end
self.isOpen = True
self._frame = self.start
self.visited_frames = set([self._frame])
bbox_name, kpts_name = restore_key[body]
self.param = {
'frame': 0, 'nFrames': self.nFrames,
'kpts_name': kpts_name, 'bbox_name': bbox_name,
'select': {bbox_name: -1, 'corner': -1},
'click': None,
'name': name,
'body': body,
'capture_screen':False}
self.set_frame(self.start)
self.no_window = no_window
if not no_window:
cv2.namedWindow(self.name)
callback = ComposedCallback(processes=callbacks)
cv2.setMouseCallback(self.name, callback.call, self.param)
@property
def working(self):
param = self.param
flag = False
if param['click'] is not None or param['start'] is not None:
flag = True
for key in self.param['select']:
if self.param['select'][key] != -1:
flag = True
return flag
@staticmethod
def clear_working(param):
param['click'] = None
param['start'] = None
param['end'] = None
for key in param['select']:
param['select'][key] = -1
def save_and_quit(self, key=None):
self.frame = self.frame
self.isOpen = False
cv2.destroyWindow(self.name)
# get the input
if key is None:
key = get_valid_yn()
if key == 'n':
return 0
for frame in tqdm(self.visited_frames, desc='writing'):
self.dataset.isTmp = True
_, annname = self.dataset[frame]
self.dataset.isTmp = False
_, annname_ = self.dataset[frame]
if annname is not None:
shutil.copyfile(annname, annname_)
@property
def frame(self):
return self._frame
def previous(self):
if self.frame == 0:
print('Reach to the first frame')
return None
imgname, annname = self.dataset[self.frame-1]
annots = load_annot_to_tmp(annname)
return annots
@staticmethod
def set_param(param, imgname, annname, nf, no_img=False):
annots = load_annot_to_tmp(annname)
# 清空键盘
for key in ['click', 'start', 'end']:
param[key] = None
# 清空选中
for key in param['select']:
param['select'][key] = -1
param['imgname'] = imgname
param['annname'] = annname
param['frame'] = nf
param['annots'] = annots
if not no_img:
assert os.path.exists(imgname), imgname
img0 = cv2.imread(imgname)
param['img0'] = img0
# param['pid'] = len(annot['annots'])
param['scale'] = min(CV_KEY.WINDOW_HEIGHT/img0.shape[0], CV_KEY.WINDOW_WIDTH/img0.shape[1])
# param['scale'] = 1
def set_frame(self, nf):
param = self.param
if 'annots' in param.keys():
save_annot(param['annname'], param['annots'])
self.clear_working(param)
imgname, annname = self.dataset[nf]
self.set_param(param, imgname, annname, nf, no_img=self.no_img)
@frame.setter
def frame(self, value):
self.visited_frames.add(value)
self._frame = value
# save current frames
save_annot(self.param['annname'], self.param['annots'])
self.set_frame(value)
def run(self, key=None, noshow=False):
if key is None:
key = chr(get_key())
if key in self.register_keys.keys():
self.register_keys[key](self, param=self.param)
if not self.isOpen:
return 0
if noshow:
return 0
img = self.param['img0'].copy()
for func in self.vis_funcs:
img = func(img, **self.param)
if not self.no_window:
cv2.imshow(self.name, img)
return img
class AnnotMV:
def __init__(self, datasets, key_funcs={}, key_funcs_view={}, callbacks=[], vis_funcs=[], vis_funcs_all=[],
name='main', step=100, body='body25', start=0, end=100000) -> None:
self.subs = list(datasets.keys())
self.annotdict = {}
self.nFrames = end
for sub, dataset in datasets.items():
annot = AnnotBase(dataset, key_funcs={}, callbacks=callbacks, vis_funcs=vis_funcs,
name=sub, step=step, body=body, start=start, end=end)
self.annotdict[sub] = annot
self.nFrames = min(self.nFrames, annot.nFrames)
self.isOpen = True
# self.register_keys_view = {key:register_keys[key] for key in 'q'}
self.register_keys_view = {}
if 'w' not in key_funcs:
for key in 'wasd':
self.register_keys_view[key] = register_keys[key]
self.register_keys_view.update(key_funcs_view)
self.register_keys = {
'Q': register_keys['q'],
'h': register_keys['H'],
'A': register_keys['A']
}
self.register_keys.update(key_funcs)
self.vis_funcs_all = vis_funcs_all
self.name = name
self.param = {}
@property
def frame(self):
sub = list(self.annotdict.keys())[0]
return self.annotdict[sub].frame
@property
def working(self):
return False
def save_and_quit(self):
key = get_valid_yn()
for sub, annot in self.annotdict.items():
annot.save_and_quit(key)
self.isOpen = False
def run(self, key=None, noshow=False):
if key is None:
key = chr(get_key())
for sub, annot in self.annotdict.items():
if key in self.register_keys_view.keys():
self.register_keys_view[key](annot, param=annot.param)
else:
annot.run(key='')
if key in self.register_keys.keys():
self.register_keys[key](self, param=self.param)
if len(self.vis_funcs_all) > 0 or True:
imgs = []
for sub in self.subs:
img = self.annotdict[sub].param['img0'].copy()
for func in self.vis_funcs_all:
img = func(img, sub, param=self.annotdict[sub].param)
imgs.append(img)
imgs = merge(imgs, square=True)
imgs = resize_to_screen(imgs, scale=CV_KEY.WINDOW_HEIGHT/imgs.shape[0])
cv2.imshow(self.name, imgs)
class AnnotMVMerge(BaseWindow):
# 这个类的设计理念是
# 只负责整体的合并与可视化,不要考虑具体的操作
restore_key = {
'body25': ('bbox', 'keypoints'),
'handl': ('bbox_handl2d', 'handl2d'),
'handr': ('bbox_handr2d', 'handr2d'),
'face': ('bbox_face2d', 'face2d'),
}
def __init__(self, datasets, register_keys, vis_funcs, vis_funcs_view, callbacks, body) -> None:
self.subs = list(datasets.keys())
self.isOpen = True
for key in 'wasd':
register_keys[key] = self.get_move(key)
register_keys['q'] = self.quit
register_keys['h'] = print_help
register_keys['x'] = self.clear
self.register_keys = register_keys
self.vis_funcs = vis_funcs
self.vis_funcs_view = vis_funcs_view
self.callbacks = callbacks
self.datasets = datasets
self.name = 'main'
frames = {sub:0 for sub in self.subs}
self.body = body
bbox_name, kpts_name = self.restore_key[body]
self.params_view = {sub:{
'body': body, 'bbox_name': bbox_name, 'kpts_name': kpts_name,
'select': {bbox_name:-1}} for sub in self.subs}
imgs, annots = self.load_images_annots(self.datasets, frames)
img0, ranges = merge(imgs, ret_range=True)
scale = 10000./img0.shape[0]
self.nFrames = len(self.datasets[self.subs[0]])
self.start = 0
self.end = self.nFrames
self.step = 50
self.visited_frames = {sub: set([self.start]) for sub in self.subs}
self.param = {
'scale': scale, 'ranges': ranges,
'click': None, 'start': None, 'end': None,
'frames': frames,
'body': body, 'bbox_name': bbox_name, 'kpts_name': kpts_name,
'select': {'camera': -1, bbox_name:-1, 'corner': -1}}
self.param['imgs'] = imgs
self.param['annots'] = annots
self.no_window = False
cv2.namedWindow(self.name)
callback = ComposedCallback(processes=callbacks)
cv2.setMouseCallback(self.name, callback.call, self.param)
def save_and_quit(self, key=None):
self.isOpen = False
self.update_param()
cv2.destroyWindow(self.name)
# get the input
if key is None:
key = get_valid_yn()
if key == 'n':
return 0
for nv, sub in enumerate(self.subs):
dataset = self.datasets[sub]
for frame in tqdm(self.visited_frames[sub], desc='writing'):
dataset.isTmp = True
_, annname = dataset[frame]
dataset.isTmp = False
_, annname_ = dataset[frame]
if annname is not None:
print(annname, annname_)
shutil.copyfile(annname, annname_)
@property
def frame(self):
return list(self.param['frames'].values())[0]
def update_param(self):
# 先保存
for nv, sub in enumerate(self.subs):
self.visited_frames[sub].add(self.param['frames'][sub])
save_annot(self.params_view[sub]['annname'], self.param['annots'][nv])
imgs, annots = self.load_images_annots(self.datasets, self.param['frames'])
self.param['imgs'] = imgs
self.param['annots'] = annots
def move(self, delta):
for sub in self.subs:
self.param['frames'][sub] += delta
self.update_param()
@staticmethod
def get_move(wasd):
get_frame = {
'a': lambda x, f: f - 1,
'd': lambda x, f: f + 1,
'w': lambda x, f: f - x.step,
's': lambda x, f: f + x.step
}[wasd]
text = {
'a': 'Move to last frame',
'd': 'Move to next frame',
'w': 'Move to last step frame',
's': 'Move to next step frame'
}
clip_frame = lambda x, f: max(x.start, min(x.nFrames-1, min(x.end-1, f)))
def move(annotator, **kwargs):
newframe = get_frame(annotator, annotator.frame)
newframe = clip_frame(annotator, newframe)
annotator.move(newframe - annotator.frame)
move.__doc__ = text[wasd]
return move
def load_images_annots(self, datasets, frames):
imgs, annots = [], []
for sub, dataset in datasets.items():
imgname, annname = dataset[frames[sub]]
img = cv2.imread(imgname)
annot = load_annot_to_tmp(annname)
imgs.append(img)
annots.append(annot)
self.params_view[sub]['imgname'] = imgname
self.params_view[sub]['annname'] = annname
return imgs, annots
def run(self, key=None, noshow=False):
# 更新选中
if key is None:
key = chr(get_key())
actv = self.param['select']['camera']
for sub in self.subs:
for sel in self.params_view[sub]['select']:
self.params_view[sub]['select'][sel] = -1
if actv != -1:
self.params_view[self.subs[actv]]['select'].update(self.param['select'])
if key in self.register_keys.keys():
func = self.register_keys[key]
if isinstance(func, list):
[f(self, param=self.param) for f in func]
else:
func(self, param=self.param)
if not self.isOpen:
return 0
if noshow:
return 0
imgs = self.param['imgs']
imgs = [img.copy() for img in imgs]
for nv, img in enumerate(imgs):
for func in self.vis_funcs_view:
img = func(img, self.param['annots'][nv], name=self.subs[nv], **self.params_view[self.subs[nv]])
if nv == actv:
cv2.rectangle(img, (0, 0), (img.shape[1], img.shape[0]), (0, 0, 255), img.shape[1]//100)
imgs[nv] = img
img = merge(imgs)
for func in self.vis_funcs:
img = func(img, **self.param)
cv2.imshow(self.name, img)
class AnnotMVMain:
def __init__(self, datasets, key_funcs={}, key_funcs_view={}, callbacks=[], vis_funcs=[], vis_funcs_all=[],
name='main', step=100, body='body25', start=0, end=100000,
scale=0.5) -> None:
self.subs = list(datasets.keys())
self.annotdict = {}
self.nFrames = end
for sub, dataset in datasets.items():
annot = AnnotBase(dataset, key_funcs={}, callbacks=callbacks, vis_funcs=vis_funcs,
name=sub, step=step, body=body, start=start, end=end, no_window=True)
self.annotdict[sub] = annot
self.nFrames = min(self.nFrames, annot.nFrames)
self.isOpen = True
self.register_keys_view = {}
self.register_keys = {
'Q': register_keys['q'],
'h': register_keys['H'],
'A': register_keys['A']
}
self.register_keys.update(key_funcs)
for key, val in self.register_keys.items():
print(key, val.__doc__)
self.vis_funcs_all = vis_funcs_all
self.name = name
imgs = self.load_images()
imgs, ranges = merge(imgs, ret_range=True, square=True)
self.scale = scale
self.param = {
'scale': scale, 'ranges': ranges,
'click': None, 'start': None, 'end': None,
'select': {'camera': -1}}
callbacks = [callback_select_image]
cv2.namedWindow(self.name)
callback = ComposedCallback(processes=callbacks)
cv2.setMouseCallback(self.name, callback.call, self.param)
@property
def frame(self):
sub = list(self.annotdict.keys())[0]
return self.annotdict[sub].frame
@property
def working(self):
return False
def save_and_quit(self, key=None):
if key is None:
key = get_valid_yn()
for sub, annot in self.annotdict.items():
annot.save_and_quit(key)
self.isOpen = False
def load_images(self):
imgs = []
for sub in self.subs:
img = self.annotdict[sub].param['img0'].copy()
imgs.append(img)
return imgs
def run(self, key=None, noshow=False):
if key is None:
key = chr(get_key())
active_v = self.param['select']['camera']
if active_v == -1:
# run the key for all cameras
if key in self.register_keys.keys():
self.register_keys[key](self, param=self.param)
elif ord(key) != 255:
for sub in self.subs:
self.annotdict[sub].run(key)
elif key == 'x':
self.param['select']['camera'] = -1
self.param['click'] = None
else:
# run the key for the selected cameras
self.annotdict[self.subs[active_v]].run(key=key)
if len(self.vis_funcs_all) > 0:
imgs = []
for nv, sub in enumerate(self.subs):
img = self.annotdict[sub].param['img0'].copy()
for func in self.vis_funcs_all:
# img = func(img, sub, param=self.annotdict[sub].param)
img = func(img, **self.annotdict[sub].param)
if self.param['select']['camera'] == nv:
cv2.rectangle(img, (0, 0), (img.shape[1], img.shape[0]), (0, 0, 255), img.shape[1]//100)
# img = plot_text(img, self.annotdict[sub].param['annots'], self.annotdict[sub].param['imgname'])
imgs.append(img)
imgs = merge(imgs, square=True)
for func in [resize_to_screen]:
# scale here
imgs = func(imgs, scale=self.scale)
cv2.imshow(self.name, imgs)
def load_parser():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('path', type=str)
parser.add_argument('--out', type=str, default=None)
parser.add_argument('--sub', type=str, nargs='+', default=[],
help='the sub folder lists when in video mode')
parser.add_argument('--from_file', type=str, default=None)
parser.add_argument('--image', type=str, default='images')
parser.add_argument('--annot', type=str, default='annots')
parser.add_argument('--body', type=str, default='body25')
parser.add_argument('--step', type=int, default=100)
parser.add_argument('--vis', action='store_true')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--ext', type=str, default='.jpg', choices=['.jpg', '.png'])
# new arguments
parser.add_argument('--start', type=int, default=0, help='frame start')
parser.add_argument('--end', type=int, default=100000, help='frame end')
return parser
def parse_parser(parser):
import os
from os.path import join
args = parser.parse_args()
if args.from_file is not None and args.from_file.endswith('.txt'):
assert os.path.exists(args.from_file), args.from_file
with open(args.from_file) as f:
datas = f.readlines()
subs = [d for d in datas if not d.startswith('#')]
subs = [d.rstrip().replace('https://www.youtube.com/watch?v=', '') for d in subs]
newsubs = sorted(os.listdir(join(args.path, args.image)))
clips = []
for newsub in newsubs:
if newsub in subs:
continue
if newsub.split('+')[0] in subs:
clips.append(newsub)
for sub in subs:
if os.path.exists(join(args.path, args.image, sub)):
clips.append(sub)
args.sub = sorted(clips)
elif args.from_file is not None and args.from_file.endswith('.json'):
data = read_json(args.from_file)
args.sub = sorted([v['vid'] for v in data])
elif len(args.sub) == 0:
if not os.path.exists(join(args.path, args.image)):
print('{} not exists, Please run extract_image first'.format(join(args.path, args.image)))
raise FileNotFoundError
subs = sorted(os.listdir(join(args.path, args.image)))
subs = [s for s in subs if os.path.isdir(join(args.path, args.image, s)) and not s.startswith('._')]
if len(subs) > 0 and subs[0].isdigit():
subs = sorted(subs, key=lambda x:int(x))
args.sub = subs
helps = """
Demo code for annotation:
- Input : {}
- => "{}"
- => {}
""".format(args.path, '", "'.join(args.sub), args.annot)
print(helps)
return args