EasyMocap/easymocap/neuralbody/dataset/mvbase.py

847 lines
37 KiB
Python
Raw Permalink Normal View History

2022-10-25 20:06:04 +08:00
import os
from os.path import join
import json
import cv2
import copy
from ...mytools.reader import read_smpl, read_json
from ...mytools.camera_utils import Undistort, read_cameras
from ...mytools.debug_utils import myerror, mywarn, oncewarn
from ...mytools.timer import Timer
from .utils_sample import get_bounds, get_rays, sample_rays_rate, sample_rays
import numpy as np
from .utils_reader import img_to_numpy, numpy_to_img, read_json_with_cache, parse_semantic
class BaseBase:
def __init__(self, split):
self.split = split
self.infos = []
self.file_cache = {}
self.back_mask_cache = {}
self.cache_ray_o_d = {}
self.timer = False
def __len__(self):
return len(self.infos)
def __getitem__(self, index):
raise NotImplementedError
def scale_and_undistort(self, img, info, undis=True):
img = img_to_numpy(img)
if self.image_args.scale != 1:
H, W = int(img.shape[0] * self.image_args['scale']), int(img.shape[1] * self.image_args['scale'])
img = cv2.resize(img, (W, H), interpolation=cv2.INTER_NEAREST)
K, D = info['camera']['K'], info['camera']['dist']
sub = info['sub']
if self.image_args.undis and np.linalg.norm(D) > 0. and undis:
img = Undistort.image(img, K, D, sub=sub)
return img
def parse_object_args(self, object_keys, object_args, ignore_keys):
for _class in ['human', 'ball']:
if 'all'+_class in object_args.keys():
object_args = object_args.copy()
_allargs = object_args.pop('all'+_class)
pids = _allargs.pop('pids')
for pid in pids:
_args = copy.deepcopy(_allargs)
_args['args'].pid = pid
object_args[_class+'_{}'.format(pid)] = _args
self.ignore_keys = ignore_keys
self.object_args = object_args
self.object_keys = object_keys
def object_factory(self, root, obj_type, obj_args, info, params, file_cache):
# 通用的一些参数例如SMPL
pid = obj_args.get('pid', -1) # pid only available in human or ball
sub = info['sub']
if 'frame' in params:
frame = params['frame']
else:
frame = info['frame']
feat = {}
reader = obj_args.get('reader', {}) # reader only available in human
use_param_foreachview = obj_args.get('use_param_foreachview', False)
for key, args in reader.items():
if key == 'smpl':
if use_param_foreachview and sub.startswith('novel'):
sub_ = sorted(os.listdir(join(root, args.root)))[0]
smplname = join(root, args.root, sub_, '{:06d}.json'.format(frame))
elif use_param_foreachview and not sub.startswith('novel'):
smplname = join(root, args.root, sub, '{:06d}.json'.format(frame))
else:
smplname = join(root, args.root, '{:06d}.json'.format(frame))
data = read_json_with_cache(smplname, file_cache)
if isinstance(data, dict):
data = data['annots']
data = [d for d in data if d['id'] == pid]
assert len(data) > 0, "{} don't have {}".format(smplname, pid)
data = data[0]
for key in ['poses', 'shapes', 'Rh', 'Th']:
feat[key] = np.array(data[key], dtype=np.float32)
feat['R'] = cv2.Rodrigues(feat['Rh'])[0]
elif key == 'vertices':
if use_param_foreachview and sub.startswith('novel'):
sub_ = sorted(os.listdir(join(root, args.root)))[0]
smplname = join(root, args.root, sub_, '{:06d}.json'.format(frame))
elif use_param_foreachview and not sub.startswith('novel'):
smplname = join(root, args.root, sub, '{:06d}.json'.format(frame))
else:
smplname = join(root, args.root, '{:06d}.json'.format(frame))
data = read_json_with_cache(smplname, file_cache)
if isinstance(data, dict):
data = data['annots']
data = [d for d in data if d['id'] == pid][0]
if 'keypoints3d' in data.keys() and 'vertices' not in data.keys():
# use keypoints3d instead
k3d = np.array(data['keypoints3d'], dtype=np.float32)
conf = k3d[:, 3]
k3d = k3d[:, :3]
data['vertices'] = k3d[conf>0.1]
for key in ['vertices']:
feat[key] = np.array(data[key], dtype=np.float32)
feat['bounds'] = get_bounds(feat['vertices'], delta=args.padding)
elif key in ['depth']:
depthname = join(root, args.root, sub, '{:06d}.png'.format(frame))
if not os.path.exists(depthname):
depth = None
else:
depth = cv2.imread(depthname, cv2.IMREAD_ANYDEPTH)
depth = depth.astype(np.float32)/1000
feat['depth'] = depth
elif key in ['mask', 'instance', 'label', 'semantic']:
if args.root == 'none': continue
mskname = join(root, args.root, sub, '{:06d}_{}.png'.format(frame, pid))
if not os.path.exists(mskname):
mskname0 = join(root, args.root, sub, '{:06d}.png'.format(frame))
if pid == 0 and os.path.exists(mskname0):
mskname = mskname0
oncewarn('Using null pid to read mask')
else:
if not 'novel_' in mskname:
print('!!!{} not exists'.format(mskname))
feat[key] = None
continue
assert os.path.exists(mskname), mskname
if key == 'semantic':
msk = cv2.imread(mskname)
else:
msk = cv2.imread(mskname, 0)
msk = self.scale_and_undistort(msk, info, undis=args.undis)
if key == 'mask':
feat[key] = msk>0
elif key == 'label':
feat[key] = msk
elif key == 'semantic':
feat[key] = parse_semantic(msk)
feat['mask'] = feat[key] > 0
else:
raise NotImplementedError
if obj_type in ['nearfar', 'nearfardepth']:
from .utils_sample import NearFarSampler
obj = NearFarSampler(split=self.split, near=obj_args.near, far=obj_args.far, depth=feat.get('depth', None))
elif obj_type == 'bbox':
from .utils_sample import AABBSampler
obj = AABBSampler(split=self.split, bounds=obj_args.bounds)
elif obj_type == 'twobbox':
from .utils_sample import TwoAABBSampler
obj = TwoAABBSampler(split=self.split, bbox_inter=obj_args.bbox_inter, bbox_outer=obj_args.bbox_outer)
elif obj_type == 'compose':
objlist = []
for key, obj_args_ in obj_args.items():
obj_ = self.object_factory(root, obj_args_.model, obj_args_, info, params, file_cache)
objlist.append(obj_)
from .utils_sample import ComposeSampler
obj = ComposeSampler(split=self.split, objlist=objlist)
elif obj_type == 'bodybbox':
from .utils_sample import AABBwMask, AABBSampler
if 'vertices' in feat.keys():
vertices = feat['vertices']
if 'label' in feat.keys() and feat['label'] is not None:
obj = AABBwMask.from_vertices(
label=feat['label'].astype(np.float32),
mask=feat['label'], # use label to represents mask
rate_body=obj_args.rate_body,
dilate=True,
split=self.split, vertices=vertices, delta=obj_args.reader.vertices.padding)
else:
obj = AABBSampler.from_vertices(
split=self.split, vertices=vertices, delta=obj_args.reader.vertices.padding)
else:
center = feat['Th']
obj = AABBSampler(self.split, center=center, scale=obj_args.scale)
elif obj_type == 'maskbbox':
from .utils_sample import AABBwMask
center = feat['Th']
obj = AABBwMask(split=self.split, center=center, scale=obj_args.scale, mask=feat['mask'])
elif obj_type == 'neuralbody' or obj_type == 'neuralbody-smplmask':
from .utils_sample import AABBwMask
# ATTN
dilate = obj_type == 'neuralbody'
if 'rotate_axis' in params.keys():
axis = {
'y': np.array([0., 1., 0.], dtype=np.float32)
}[params['rotate_axis']]
Rrel = cv2.Rodrigues(params['rotate_angle']*axis)[0]
feat['vertices'] = (feat['vertices'] - feat['Th']) @ Rrel.T + feat['Th']
feat['R'] = Rrel @ feat['R']
feat['Rh'] = cv2.Rodrigues(feat['R'])[0].reshape(1, 3)
feat['bounds'] = get_bounds(feat['vertices'], delta=obj_args.reader.vertices.padding)
if 'rotation' in params.keys() or 'translation' in params.keys():
# support rotation and translation
rotation = params.get('rotation', [0., 0., 0.])
translation = params.get('translation', [0., 0., 0.])
Rrel = cv2.Rodrigues(np.array(rotation))[0].astype(np.float32)
Trel = np.array(translation).reshape(1, 3).astype(np.float32)
# update the original Rh, Th
feat['R'] = Rrel @ feat['R']
feat['Rh'] = cv2.Rodrigues(feat['R'])[0].reshape(1, 3)
feat['Th'] = (Rrel @ feat['Th'].T + Trel.T).T
feat['vertices'] = feat['vertices'] @ Rrel.T + Trel
feat['bounds'] = get_bounds(feat['vertices'], delta=obj_args.reader.vertices.padding)
# calculate canonical vertices and bounds
feat['vertices_canonical'] = (feat['vertices'] - feat['Th']) @ feat['R'].T.T
feat['bounds_canonical'] = get_bounds(feat['vertices_canonical'], delta=obj_args.reader.vertices.padding)
obj = AABBwMask(split=self.split, bounds=feat['bounds'],
mask=feat.get('mask', None),
label=feat.get('label', None),
dilate=dilate,
rate_body=obj_args.rate_body)
for key in ['R', 'Rh', 'Th', 'vertices', 'poses', 'shapes',
'vertices_canonical', 'bounds_canonical']:
obj.feature[key] = feat[key]
# extra keys
for key in ['semantic']:
if key in feat.keys():
obj.feature_input[key] = feat[key]
elif obj_type == 'neuralbody-womask':
from .utils_sample import AABBSampler
obj = AABBSampler(split=self.split, bounds=feat['bounds'])
for key in ['R', 'Rh', 'Th', 'vertices', 'poses', 'shapes']:
obj.feature[key] = feat[key]
elif obj_type == 'trajbbox':
from .utils_sample import AABBSampler
annname = join(self.root, obj_args.root, '{:06d}.json'.format(frame))
annots = read_json_with_cache(annname, file_cache)
annots = [a for a in annots if a['id'] == pid]
assert len(annots) == 1, annname
annots = annots[0]
center = np.array(annots['keypoints3d'], dtype=np.float32)[:, :3]
obj = AABBSampler(self.split, center=center, scale=obj_args.scale)
obj.feature['center'] = center
elif obj_type == 'cylinder':
from .utils_sample import CylinderSampler
obj = CylinderSampler(split=self.split, **obj_args)
elif obj_type == 'plane':
from .utils_sample import PlaneSampler
obj = PlaneSampler(split=self.split, **obj_args)
else:
myerror('[Error] Unknown object type: {}'.format(obj_type))
raise NotImplementedError
for key, val in params.items():
obj.feature[key] = val
return obj
def get_objects(self, root, info, object_keys, object_args):
objects = {}
current_keys = object_keys.copy()
file_cache = {}
if len(current_keys) == 0:
current_keys = list(object_args.keys())
for oid, wrapkey in enumerate(current_keys):
key, params = wrapkey, {}
if key in self.ignore_keys: continue
if '@' in wrapkey:
key_ = wrapkey.split('_@')[0]
params = json.loads(wrapkey.split('_@')[1].replace("'", '"'))
val = object_args[key_]
else:
val = object_args[key]
with Timer(wrapkey, not self.timer):
model = self.object_factory(root, val['model'], val['args'], info, params, file_cache)
objects[key] = model
return objects
def sample_ray(self, img, back_mask, info, objects, debug=False):
H, W = img.shape[:2]
K, R, T = info['camera']['K'].copy(), info['camera']['R'], info['camera']['T']
sub = info['sub']
if sub not in self.cache_ray_o_d:
# cache this ray as it takes 0.2s
ray_o, ray_d = get_rays(H, W, K, R, T)
self.cache_ray_o_d[sub] = (ray_o, ray_d)
ray_o, ray_d = self.cache_ray_o_d[sub]
# sample rays according to the rate
bounds, rates = {}, {}
for key, obj in objects.items():
with Timer('mask '+key, not self.timer):
ret = obj.mask(K, R, T, H, W, ray_o=ray_o, ray_d=ray_d)
if isinstance(ret, np.ndarray):
bounds[key] = ret
if self.split == 'train':
rates[key] = self.object_args[key]['rate']
elif isinstance(ret, dict):
for key_, val_ in ret.items():
bounds[key+key_] = val_['mask']
if self.split == 'train':
rates[key+key_] = self.object_args[key]['rate'] * val_['rate']
else:
raise NotImplementedError
if self.split == 'train':
# if self.sample_args['method'] == 'rate' and self.split == 'train':
with Timer('sample ray', not self.timer):
coord = sample_rays_rate(bounds, rates, back_mask, **self.sample_args)
else:
bounds = np.dstack(list(bounds.values()))
bound_sum = np.sum(bounds, axis=-1)
coord = sample_rays(bound_sum, back_mask, self.split, **self.sample_args)
if debug:
import matplotlib.pyplot as plt
plt.imshow(img)
plt.scatter(coord[:, 1], coord[:, 0])
plt.figure()
bounds = np.dstack(bounds.values()).copy()
for i in range(bounds.shape[-1]):
bounds[..., i] *= 2**i
bound_sum = np.sum(bounds, axis=-1)
plt.imshow(bound_sum)
plt.show()
import ipdb;ipdb.set_trace()
ray_o = ray_o[coord[:, 0], coord[:, 1]]
ray_d = ray_d[coord[:, 0], coord[:, 1]]
rgb = img[coord[:, 0], coord[:, 1]]
return ray_o, ray_d, rgb, coord
def sample_near_far(self, rgb, ray_o, ray_d, coord, objects):
ret = {
'rgb': rgb, 'coord': coord,
'ray_o': ray_o, 'ray_d': ray_d, 'viewdirs': ray_d/np.linalg.norm(ray_d, axis=-1, keepdims=True),
}
# TODO:这里是每个物体分别进行采样,没有统一调度
# sample the ray in the fore
for key, model in objects.items():
with Timer('sample '+key, not self.timer):
near, far, mask = model(ray_o, ray_d, coord)
# 这里把单个物体的相关信息直接返回了用于兼容单人的情况单人就不用考虑如何取mask了
for k in ['rgb', 'coord', 'ray_o', 'ray_d', 'viewdirs']:
ret[key+'_'+k] = ret[k][mask]
ret.update({
key+'_near': near,
key+'_far': far,
key+'_mask': mask,
key+'_bounds': model.bounds})
# update other features
for k, v in model.feature.items():
if 'coord' in k:
ret[key+'_'+k] = v[coord[:, 0], coord[:, 1]]
else:
ret[key+'_'+k] = v
return ret
def create_cameras(self, camera_args):
from .utils_sample import create_center_radius
RT = create_center_radius(
center=camera_args.center,
radius=camera_args.radius, up=camera_args.up,
angle_x=camera_args.angle_x,
ranges=camera_args.ranges)
focal = camera_args.focal
K = np.array([
focal, 0, camera_args.H/2, 0, focal, camera_args.W/2, 0, 0, 1
]).reshape(3, 3)
cameras = [{'K': K, 'R': RT[i, :3, :3], 'T': RT[i, :3, 3:]} for i in range(RT.shape[0])]
return cameras
class BaseDataset(BaseBase):
# This class is mainly for multiview or single view training dataset
# contains the operation for image
def __init__(self, root, subs, ranges, split,
image_args,
object_keys, object_args, ignore_keys,
sample_args) -> None:
super().__init__(split=split)
self.root = root
self.cameras = read_cameras(root)
self.subs = self.check_subs(root, subs, image_args.root)
self.ranges = ranges
self.infos = self.get_allnames(root, self.subs, ranges, image_args)
self.split = split
self.image_args = image_args
self.sample_args = sample_args
self.parse_object_args(object_keys, object_args, ignore_keys)
self.debug = False
self.check = False
self.check_data()
def check_data(self):
from tqdm import tqdm
visited = set()
for i in tqdm(range(len(self)), desc='check all the data'):
info = self.infos[i]
sub = info['sub']
if sub in visited:continue
visited.add(sub)
data = self[i]
def get_allnames(self, root, subs, ranges, image_args):
infos = []
index = 0
unsync = {}
if image_args.get('unsync', 'none') != 'none':
unsync = read_json(join(root, image_args['unsync']))
for nv, sub in enumerate(subs):
camera = self.cameras[sub].copy()
K = camera['K'].copy()
K[:2] *= image_args.scale
camera['K'] = K
for nnf, nf in enumerate(range(*ranges)):
imgname = join(root, image_args.root, sub, '{:06d}{}'.format(nf, image_args.ext))
info = {
'imgname': imgname,
'sub': sub,
'frame': nf,
'nf': nnf,
'nv': nv,
'index': index,
'camera': camera
}
if sub in unsync.keys():
info['time'] = nf + unsync[sub]
infos.append(info)
return infos
@staticmethod
def check_subs(root, subs, image_root):
if len(subs) == 0:
subs = sorted(os.listdir(join(root, image_root)))
if subs[0].isdigit():
subs.sort(key=lambda x:int(x))
return subs
def read_image(self, imgname, image_args, info, isgray=False, skip_mask=False, mask_global='_0.png'):
if isgray:
img = cv2.imread(imgname, 0)
else:
img = cv2.imread(imgname)
img = img_to_numpy(img)
if image_args.mask_bkgd and not skip_mask:
# TODO: polish mask name
mskname = imgname.replace(image_args.root, image_args.mask).replace(image_args.ext, mask_global)
if not os.path.exists(mskname):
mskname = mskname.replace(mask_global, '.png')
assert os.path.exists(mskname), mskname
msk = cv2.imread(mskname, 0)
msk_rand = np.random.rand(msk.shape[0], msk.shape[1], 3)
msk_rand = (msk_rand * 255).astype(np.uint8)
img[msk==0] = -1.
# img[msk==0] = msk_rand[msk==0]
# 这里选择了先进行畸变矫正再进行scale这样相机参数就不需要修改
# 由于预先进行了map所以畸变矫正不会很慢
if self.image_args.scale != 1:
H, W = int(img.shape[0] * self.image_args['scale']), int(img.shape[1] * self.image_args['scale'])
img = cv2.resize(img, (W, H), interpolation=cv2.INTER_NEAREST)
K, D = info['camera']['K'], info['camera']['dist']
sub = info['sub']
if self.image_args.undis and np.linalg.norm(D) > 0.:
img = Undistort.image(img, K, D, sub=sub)
if info.get('flip_lr', False):
img = cv2.flip(img, 1)
return img
def read_bkgd(self, imgname, image_args, info):
backname = join(self.root, 'background', info['sub'], '000000.jpg')
if not os.path.exists(backname) or not image_args.get('read_background'):
return None
back = self.read_image(backname, image_args, info, skip_mask=True)
return back
def read_backmask(self, imgname, image_args, info, blankshape):
sub = info['sub']
if self.split != 'demo' and sub in self.back_mask_cache.keys():
back_mask = self.back_mask_cache[sub].copy()
elif self.split != 'demo':
mskname = join(self.root, 'mask-background', sub, '000000.png')
# if not exists, should read and undistort
back_mask = np.ones(blankshape, dtype=np.float32)
K, D = info['camera']['K'], info['camera']['dist']
if self.image_args.undis and np.linalg.norm(D) > 0.:
back_mask = Undistort.image(back_mask, K, D, sub=sub)
if os.path.exists(mskname) and self.split == 'train' and self.image_args.ignore_back:
oncewarn('using mask of background')
_back_mask = self.read_image(mskname, image_args, info, isgray=True)
# 畸变矫正后边缘上填充的值是0这里取反之后边缘上的填充值又变成了1
_back_mask[_back_mask>0] = 1.
back_mask = (1. - _back_mask) * back_mask
self.back_mask_cache[sub] = back_mask
back_mask = self.back_mask_cache[sub].copy()
else:
# in demo mode sample all pixels
back_mask = np.ones(blankshape, dtype=np.float32)
# get the framewise mask
mskname = imgname.replace('images', 'mask-ignore').replace('.jpg', '.png')
if os.path.exists(mskname):
# oncewarn('using mask of frame {}'.format(mskname))
mask_frame = self.read_image(mskname, image_args, info, isgray=True)
mask_frame[mask_frame>0] = 1.
mask_frame = 1. - mask_frame
back_mask = back_mask * mask_frame
return back_mask
def __getitem__(self, index):
info = self.infos[index]
imgname = info['imgname']
with Timer('read image', not self.timer):
img = self.read_image(imgname, self.image_args, info)
with Timer('read background', not self.timer):
back = self.read_bkgd(imgname, self.image_args, info)
with Timer('read back', not self.timer):
back_mask = self.read_backmask(imgname, self.image_args, info, blankshape=img.shape[:2])
object_keys = info.get('object_keys', self.object_keys)
objects = self.get_objects(self.root, info, object_keys, self.object_args)
# sample the ray from image
ray_o, ray_d, rgb, coord = self.sample_ray(img, back_mask, info, objects, debug=self.debug)
ret = self.sample_near_far(rgb, ray_o, ray_d, coord, objects)
# append the background
if back is not None:
ret['background'] = back[coord[:, 0], coord[:, 1]]
meta = {
'split': self.split,
'H': img.shape[0], 'W': img.shape[1],
'index': index,
'nframe': info['nf'], 'nview': info['nv'],
'time': info.get('time', info['nf']),
'sub': info['sub'],
'keys': list(objects.keys()),
}
if self.check:
meta['img'] = img
meta['object_keys'] = meta['keys']
ret['meta'] = meta
return ret
class BaseDatasetDemo(BaseDataset):
def __init__(self, camera_args, demo_args=None, **kwargs):
self.camera_args = camera_args
super().__init__(**kwargs)
scale = self.image_args.scale
self.blank = np.zeros((int(camera_args.H*scale), int(camera_args.W*scale), 3), dtype=np.uint8)
self.infos = self.create_demo_cameras(self.image_args.scale, camera_args, demo_args)
# self.camera_args = camera_args
# K, RTs, D = self.create_demo_camera(camera_method, camera_args, self.cameras)
@staticmethod
def _demo_play_frames(ranges, K, R, T):
infos = []
index = 0
frames = [i for i in range(*ranges)]
for nv in range(K.shape[0]):
nnf = nv % (len(frames))
nf = frames[nnf]
info = {
'imgname': 'none',
'sub': 'novel_'+str(nv),
'frame': nf,
'nf': nnf,
'nv': nv,
'index': index,
'camera': {
'K': K[nv],
'dist': np.zeros((1, 5)),
'R': R[nv],
'T': T[nv]
}
}
infos.append(info)
index += 1
return infos
@staticmethod
def _demo_keyframe(ranges, K, R, T, frame, nFrames):
infos = []
index = 0
frames = [i for i in range(*ranges)]
for nnf, nf in enumerate(frames):
info = {
'imgname': 'none',
'sub': 'novel_'+str(0),
'frame': nf,
'nf': nnf,
'nv': 0,
'index': index,
'camera': {
'K': K[0],
'dist': np.zeros((1, 5)),
'R': R[0],
'T': T[0]
}
}
if nf == frame:
for i in range(nFrames):
angle = i/nFrames * 2 * np.pi
object_keys = ["human_0_@{{'rotate_angle': {}, 'rotate_axis': 'y'}}".format(angle)]
_info = info.copy()
_info['object_keys'] = object_keys
infos.append(_info)
info['index'] += 1
index = info['index'] + 1
continue
infos.append(info)
index += 1
return infos
def _demo_script(self, ranges, K, R, T, stages):
infos = []
index = 0
frames = [i for i in range(*ranges)]
for name, stage in stages.items():
_infos = []
_frames = list(range(*stage.frame))
_views = list(range(*stage.view))
if len(_frames) == 1 and len(_views) != 1:
_frames = _frames * len(_views)
elif len(_views) == 1 and len(_frames) != 1:
_views = _views * len(_frames)
elif len(_views) == 1 and len(_frames) == 1 and 'steps' in stage.keys():
_views = _views * stage.steps
_frames = _frames * stage.steps
elif len(_views) != 1 and len(_frames) != 1 and len(_views) != len(_frames):
raise NotImplementedError
_index = [i for i in range(len(_frames))]
for _i in _index:
nv, nf = _views[_i], _frames[_i]
nv = nv % (K.shape[0])
info = {
'imgname': 'none',
'sub': 'novel_'+str(nv),
'frame': nf,
'nf': frames.index(nf),
'nv': nv,
'index': _i + index,
'camera': {
'K': K[nv],
'dist': np.zeros((1, 5)),
'R': R[nv],
'T': T[nv]
}
}
# create object
float_i = _i*1./(len(_index) - 1)
object_keys = stage.object_keys.copy()
if len(object_keys) == 0:
object_keys = list(self.object_args.keys()).copy()
if 'effect' in stage.keys():
if stage.effect in ['disappear', 'appear']:
for _obj in stage.effect_args.key:
object_keys.remove(_obj)
if stage.effect == 'disappear':
occ = (1 - float_i)**3
elif stage.effect == 'appear':
occ = float_i**3
object_keys.append(_obj+"_@{{'scale_occ': {}, 'min_acc': 0.5}}".format(occ))
if stage.effect in ['zoom']:
scale = float_i * stage.effect_args.scale[1] + (1-float_i) * stage.effect_args.scale[0]
cx = float_i * stage.effect_args.cx[1] + (1-float_i) * stage.effect_args.cx[0]
cy = float_i * stage.effect_args.cy[1] + (1-float_i) * stage.effect_args.cy[0]
_K = info['camera']['K'].copy()
_K[:2, :2] *= scale
_K[0, 2] *= cx
_K[1, 2] *= cy
info['camera']['K'] = _K
info['camera']['K'] = _K
info['sub'] = info['sub'] + '_scale_{}'.format(scale)
if stage.effect_args.get('use_previous_K', False):
info['camera']['K'] = infos[-1]['camera']['K']
info['object_keys'] = object_keys
_infos.append(info)
index += len(_index)
infos.extend(_infos)
return infos
def create_demo_cameras(self, scale, camera_args, demo_args=None):
if camera_args.method == 'none':
from .utils_sample import create_center_radius
RTs = create_center_radius(**camera_args)
K = np.array([
camera_args.focal, 0, camera_args.W/2,
0, camera_args.focal, camera_args.H/2,
0, 0, 1], dtype=np.float32).reshape(3, 3)[None].repeat(RTs.shape[0], 0)
R = RTs[:, :3, :3]
T= RTs[:, :3, 3:]
elif camera_args.method == 'mean':
from .utils_sample import create_cameras_mean
K, R, T = create_cameras_mean(list(self.cameras.values()), camera_args)
K[:, 0, 2] = camera_args.W / 2
K[:, 1, 2] = camera_args.H / 2
elif camera_args.method == 'static':
assert len(self.subs) == 1, "Only support monocular videos"
camera = self.cameras[self.subs[0]]
K = camera['K'][None]
R = camera['R'][None]
T = camera['T'][None]
elif camera_args.method == 'line':
for key, camera in self.cameras.items():
R = camera['R']
T = camera['T']
center_old = - R.T @ T
print(key, center_old.T[0])
camera = self.cameras[str(camera_args.ref_sub)]
K = camera['K'][None]
R = camera['R'][None]
T = camera['T'][None]
t = np.linspace(0., 1., camera_args.allstep).reshape(-1, 1)
t = t - 0.33
t[t<0.] = 0.
t = t/t.max()
start = np.array(camera_args.center_start).reshape(1, 3)
end = np.array(camera_args.center_end).reshape(1, 3)
center = end * t + start * (1-t)
K = K.repeat(camera_args.allstep, 0)
R = R.repeat(camera_args.allstep, 0)
T = - np.einsum('fab,fb->fa', R, center)
T = T.reshape(-1, 3, 1)
K[:, :2] *= scale
if demo_args is None:
infos = self._demo_play_frames(self.ranges, K, R, T)
return infos
# create scripts
if demo_args.mode == 'scripts':
infos = self._demo_script(self.ranges, K, R, T, demo_args.stages)
elif demo_args.mode == 'keyframe+rotate':
infos = self._demo_keyframe(self.ranges, K, R, T, demo_args.frame, demo_args.nFrames)
else:
raise NotImplementedError
return infos
def get_allnames(self, root, subs, ranges, image_args):
return []
def read_image(self, imgname, image_args, info):
return self.blank
class BaseNovelPose(BaseBase):
def __init__(self, root, object_keys, ignore_keys, object_args, sample_args, camera_args):
super().__init__(split='demo')
self.root = root
self.parse_object_args(object_keys, object_args, ignore_keys)
self.params = self.load_all_smpl(root)
self.cameras = self.create_cameras(camera_args)
self.infos = self.combine_frame_and_camera(self.params, self.cameras)
self.H = camera_args.H
self.W = camera_args.W
self.sample_args = sample_args
self.check = False
def load_all_smpl(self, root):
from glob import glob
from tqdm import tqdm
smplnames = sorted(glob(join(root, 'smpl', '*.json')))
params = []
for nf, smplname in enumerate(tqdm(smplnames)):
frame = int(os.path.basename(smplname).split('.')[0])
nv, sub = 0, 'novelpose0'
# nf, frame = 0, 0
imgname = 'smplname'
params.append({
'imgname': imgname,
'sub': sub,
'frame': frame,
'nf': nf,
'nv': nv,
'index': nf,
})
return params
def combine_frame_and_camera(self, params, cameras):
infos = []
for i in range(len(params)):
info = params[i].copy()
info['camera'] = cameras[0]
infos.append(info)
return infos
def read_blank_image(self, *args):
img = np.zeros([self.H, self.W, 3], dtype=np.float32)
return img
def __len__(self):
return len(self.infos)
def __getitem__(self, index):
info = self.infos[index]
objects = self.get_objects(self.root, info, self.object_keys, self.object_args)
info = self.infos[index]
imgname = info['imgname']
img = self.read_blank_image(imgname, info)
back_mask = np.ones_like(img[:, :, 0])
# sample the ray from image
ray_o, ray_d, rgb, coord = self.sample_ray(img, back_mask, info, objects)
ret = self.sample_near_far(rgb, ray_o, ray_d, coord, objects)
info['nf'] = 0
meta = {
'H': img.shape[0], 'W': img.shape[1],
'index': index,
'nframe': info['nf'], 'nview': info['nv'],
'sub': info['sub'],
'keys': list(objects.keys()),
}
if self.check:
meta['img'] = img
meta['object_keys'] = meta['keys']
ret['meta'] = meta
return ret
class BaseCanonical(BaseNovelPose):
def __init__(self, nFrames, **kwargs):
super().__init__(**kwargs)
self.nFrames = nFrames
def load_all_smpl(self, root):
smpl = read_smpl(root)[0]
for key in ['poses', 'Rh', 'Th']:
smpl[key] = np.zeros_like(smpl[key])
return smpl
def combine_frame_and_camera(self, params, cameras):
infos = []
for i in range(len(cameras)):
info = params.copy()
info['camera'] = cameras[i]
info.update({
'imgname': f'view_{i}_frame_0',
'sub': 'novel_' + str(i),
'frame': 0,
'nf': 0,
'nv': i,
'index': i,
})
infos.append(info)
return infos
def get_objects(self, root, info, object_keys, object_args):
objects = {'human_0': AABBSampler(split='test', bounds=[[-1, -1.3, -0.3], [1, 0.7, 0.3]])}
objects['human_0'].feature['shapes'] = self.params['shapes']
return objects
if __name__ == '__main__':
from ...config import Config, load_object
from copy import deepcopy
# config = Config.load('config/neuralbody/dataset/multiview_custom.yml')
# config = Config.load('config/neuralbody/dataset/neuralbody_lightstage.yml')
config = Config.load('config/neuralbody/dataset/neuralbody_soccer.yml')
data_share = config.pop('data_share_args')
data_share.root = os.environ['data']
for split in ['train', 'val', 'demo']:
data = deepcopy(data_share)
data.merge_from_other_cfg(config['data_{}_args'.format(split)])
config['data_{}_args'.format(split)] = data
print(config)
dataset = load_object(config.data_train_module, config.data_train_args)
dataset.debug = True
for data in dataset:
print(data.keys())