347 lines
14 KiB
Python
347 lines
14 KiB
Python
|
'''
|
||
|
@ Date: 2021-08-21 14:16:38
|
||
|
@ Author: Qing Shuai
|
||
|
@ LastEditors: Qing Shuai
|
||
|
@ LastEditTime: 2022-05-23 23:10:43
|
||
|
@ FilePath: /EasyMocapPublic/easymocap/estimator/openpose_wrapper.py
|
||
|
'''
|
||
|
import os
|
||
|
import shutil
|
||
|
from tqdm import tqdm
|
||
|
from .wrapper_base import bbox_from_keypoints, create_annot_file, check_result
|
||
|
from ..mytools import read_json
|
||
|
from ..annotator.file_utils import save_annot
|
||
|
from os.path import join
|
||
|
import numpy as np
|
||
|
import cv2
|
||
|
from glob import glob
|
||
|
from multiprocessing import Process
|
||
|
|
||
|
def run_openpose(image_root, annot_root, config):
|
||
|
os.makedirs(annot_root, exist_ok=True)
|
||
|
pwd = os.getcwd()
|
||
|
if os.name != 'nt':
|
||
|
cmd = './build/examples/openpose/openpose.bin --image_dir {} --write_json {} --display 0'.format(
|
||
|
image_root, annot_root)
|
||
|
else:
|
||
|
cmd = 'bin\\OpenPoseDemo.exe --image_dir {} --write_json {} --display 0'.format(
|
||
|
os.path.abspath(image_root), os.path.abspath(annot_root))
|
||
|
if config['res'] != 1:
|
||
|
cmd = cmd + ' --net_resolution -1x{}'.format(int(16*((368*config['res'])//16)))
|
||
|
if config['hand']:
|
||
|
cmd = cmd + ' --hand'
|
||
|
if config['face']:
|
||
|
cmd = cmd + ' --face'
|
||
|
if config['vis']:
|
||
|
cmd = cmd + ' --write_images {}'.format(annot_root)
|
||
|
else:
|
||
|
cmd = cmd + ' --render_pose 0'
|
||
|
os.chdir(config['root'])
|
||
|
print(cmd)
|
||
|
os.system(cmd)
|
||
|
os.chdir(pwd)
|
||
|
|
||
|
def convert_from_openpose(src, dst, image_root, ext):
|
||
|
# convert the 2d pose from openpose
|
||
|
inputlist = sorted(os.listdir(src))
|
||
|
for inp in tqdm(inputlist, desc='{:10s}'.format(os.path.basename(dst))):
|
||
|
annots = load_openpose(join(src, inp))
|
||
|
base = inp.replace('_keypoints.json', '')
|
||
|
annotname = join(dst, base+'.json')
|
||
|
imgname = join(image_root, inp.replace('_keypoints.json', ext))
|
||
|
annot = create_annot_file(annotname, imgname)
|
||
|
annot['annots'] = annots
|
||
|
save_annot(annotname, annot)
|
||
|
|
||
|
global_tasks = []
|
||
|
|
||
|
def extract_2d(image_root, annot_root, tmp_root, config):
|
||
|
if check_result(image_root, annot_root):
|
||
|
return global_tasks
|
||
|
if not check_result(image_root, tmp_root):
|
||
|
run_openpose(image_root, tmp_root, config)
|
||
|
# TODO: add current task to global_tasks
|
||
|
thread = Process(target=convert_from_openpose,
|
||
|
args=(tmp_root, annot_root, image_root, config['ext'])) # 应该不存在任何数据竞争
|
||
|
thread.start()
|
||
|
global_tasks.append(thread)
|
||
|
return global_tasks
|
||
|
|
||
|
def load_openpose(opname):
|
||
|
mapname = {
|
||
|
'face_keypoints_2d':'face2d',
|
||
|
'hand_left_keypoints_2d':'handl2d',
|
||
|
'hand_right_keypoints_2d':'handr2d'}
|
||
|
assert os.path.exists(opname), opname
|
||
|
data = read_json(opname)
|
||
|
out = []
|
||
|
pid = 0
|
||
|
for i, d in enumerate(data['people']):
|
||
|
keypoints = d['pose_keypoints_2d']
|
||
|
keypoints = np.array(keypoints).reshape(-1, 3)
|
||
|
annot = {
|
||
|
'bbox': bbox_from_keypoints(keypoints),
|
||
|
'personID': pid + i,
|
||
|
'keypoints': keypoints.tolist(),
|
||
|
'isKeyframe': False
|
||
|
}
|
||
|
bbox = annot['bbox']
|
||
|
if bbox[-1] < 0.01:
|
||
|
continue
|
||
|
annot['area'] = (bbox[2] - bbox[0])*(bbox[3] - bbox[1])
|
||
|
for key in mapname.keys():
|
||
|
if len(d[key]) == 0:
|
||
|
continue
|
||
|
kpts = np.array(d[key]).reshape(-1, 3)
|
||
|
annot[mapname[key]] = kpts.tolist()
|
||
|
annot['bbox_'+mapname[key]] = bbox_from_keypoints(kpts)
|
||
|
out.append(annot)
|
||
|
out.sort(key=lambda x:-x['area'])
|
||
|
for i in range(len(out)):
|
||
|
out[i]['personID'] = pid + i
|
||
|
return out
|
||
|
|
||
|
def get_crop(image, bbox, rot, scale=1.2):
|
||
|
l, t, r, b, _ = bbox
|
||
|
cx = (l+r)/2
|
||
|
cy = (t+b)/2
|
||
|
wx = (r-l)*scale/2
|
||
|
wy = (b-t)*scale/2
|
||
|
l = cx - wx
|
||
|
r = cx + wx
|
||
|
t = cy - wy
|
||
|
b = cy + wy
|
||
|
l = max(0, int(l+0.5))
|
||
|
t = max(0, int(t+0.5))
|
||
|
r = min(image.shape[1], int(r+0.5))
|
||
|
b = min(image.shape[0], int(b+0.5))
|
||
|
crop = image[t:b, l:r].copy()
|
||
|
crop = np.ascontiguousarray(crop)
|
||
|
# rotate the image
|
||
|
if rot == 180:
|
||
|
crop = cv2.flip(crop, -1)
|
||
|
return crop, (l, t)
|
||
|
|
||
|
def transoform_foot(crop_shape, start, rot, keypoints, kpts_old=None):
|
||
|
l, t = start
|
||
|
if rot == 180:
|
||
|
keypoints[..., 0] = crop_shape[1] - keypoints[..., 0] - 1
|
||
|
keypoints[..., 1] = crop_shape[0] - keypoints[..., 1] - 1
|
||
|
keypoints[..., 0] += l
|
||
|
keypoints[..., 1] += t
|
||
|
if kpts_old is None:
|
||
|
kpts_op = keypoints[0]
|
||
|
return kpts_op
|
||
|
# 选择最好的
|
||
|
kpts_np = np.array(kpts_old)
|
||
|
dist = np.linalg.norm(kpts_np[None, :15, :2] - keypoints[:, :15, :2], axis=-1)
|
||
|
conf = np.minimum(kpts_np[None, :15, 2], keypoints[:, :15, 2])
|
||
|
dist = (dist * conf).sum(axis=-1)/conf.sum(axis=-1)*conf.shape[1]/(conf>0).sum(axis=-1)
|
||
|
best = dist.argmin()
|
||
|
kpts_op = keypoints[best]
|
||
|
# TODO:判断一下关键点
|
||
|
# 这里以HRNet的估计为准
|
||
|
# WARN: disable feet
|
||
|
# 判断OpenPose的脚与HRNet的脚是否重合
|
||
|
if (kpts_np[[11, 14], -1] > 0.3).all() and (kpts_op[[11, 14], -1] > 0.3).all():
|
||
|
dist_ll = np.linalg.norm(kpts_np[11, :2] - kpts_op[11, :2])
|
||
|
dist_rr = np.linalg.norm(kpts_np[14, :2] - kpts_op[14, :2])
|
||
|
dist_lr = np.linalg.norm(kpts_np[11, :2] - kpts_op[14, :2])
|
||
|
dist_rl = np.linalg.norm(kpts_np[14, :2] - kpts_op[11, :2])
|
||
|
if dist_lr < dist_ll and dist_rl < dist_rr:
|
||
|
kpts_op[[19, 20, 21, 22, 23, 24]] = kpts_op[[22, 23, 24, 19, 20, 21]]
|
||
|
# if (kpts_np[[11, 14], -1] > 0.3).all() and (kpts_op[[19, 22], -1] > 0.3).all():
|
||
|
# if np.linalg.norm(kpts_op[19, :2] - kpts_np[11, :2]) \
|
||
|
# < np.linalg.norm(kpts_op[19, :2] - kpts_np[14, :2])\
|
||
|
# and np.linalg.norm(kpts_op[22, :2] - kpts_np[11, :2]) \
|
||
|
# > np.linalg.norm(kpts_op[22, :2] - kpts_np[14, :2]):
|
||
|
# kpts_op[[19, 20, 21, 22, 23, 24]] = kpts_op[[22, 23, 24, 19, 20, 21]]
|
||
|
# print('[info] swap left and right')
|
||
|
# 直接选择第一个
|
||
|
kpts_np[19:] = kpts_op[19:]
|
||
|
return kpts_np
|
||
|
|
||
|
def filter_feet(kpts):
|
||
|
# 判断左脚
|
||
|
if (kpts[[13, 14, 19], -1]>0).all():
|
||
|
l_feet = ((kpts[[19,20,21],-1]>0)*np.linalg.norm(kpts[[19, 20, 21], :2] - kpts[14, :2], axis=-1)).max()
|
||
|
l_leg = np.linalg.norm(kpts[13, :2] - kpts[14, :2])
|
||
|
if l_leg < 1.5 * l_feet:
|
||
|
kpts[[19, 20, 21]] = 0.
|
||
|
print('[LOG] remove left ankle {} < {}'.format(l_leg, l_feet))
|
||
|
# 判断右脚
|
||
|
if (kpts[[10, 11], -1]>0).all():
|
||
|
l_feet = ((kpts[[22, 23, 24],-1]>0)*np.linalg.norm(kpts[[22, 23, 24], :2] - kpts[11, :2], axis=-1)).max()
|
||
|
l_leg = np.linalg.norm(kpts[10, :2] - kpts[11, :2])
|
||
|
if l_leg < 1.5 * l_feet:
|
||
|
kpts[[22, 23, 24]] = 0.
|
||
|
print('[LOG] remove right ankle {} < {}'.format(l_leg, l_feet))
|
||
|
return kpts
|
||
|
|
||
|
class FeetEstimatorByCrop:
|
||
|
def __init__(self, openpose, tmpdir=None, fullbody=False, hand=False, face=False) -> None:
|
||
|
self.openpose = openpose
|
||
|
if tmpdir is None:
|
||
|
tmpdir = os.path.abspath(join('./', 'tmp'))
|
||
|
else:
|
||
|
tmpdir = os.path.abspath(tmpdir)
|
||
|
self.tmpdir = tmpdir
|
||
|
self.fullbody = fullbody
|
||
|
self.hand = hand
|
||
|
self.face = face
|
||
|
if os.path.exists(tmpdir):
|
||
|
shutil.rmtree(tmpdir)
|
||
|
os.makedirs(join(tmpdir, 'images'), exist_ok=True)
|
||
|
self.config = {
|
||
|
'root': self.openpose,
|
||
|
'res': 1,
|
||
|
'hand': hand, # detect hand when in fullbody mode
|
||
|
'face': face,
|
||
|
'vis': False,
|
||
|
}
|
||
|
|
||
|
def detect_foot(self, image_root, annot_root, ext):
|
||
|
# TODO:换成取heatmap的最大值
|
||
|
THRES = 0.3
|
||
|
imgnames = sorted(glob(join(image_root, '*'+ext)))
|
||
|
if len(imgnames) == 0:
|
||
|
# 尝试换成png格式
|
||
|
ext = '.png'
|
||
|
imgnames = sorted(glob(join(image_root, '*'+ext)))
|
||
|
infos = {}
|
||
|
crop_counts = 0
|
||
|
for imgname in tqdm(imgnames, desc='{:10s}'.format(os.path.basename(annot_root))):
|
||
|
base = os.path.basename(imgname).replace(ext, '')
|
||
|
annotname = join(annot_root, base+'.json')
|
||
|
annots = read_json(annotname)
|
||
|
image = cv2.imread(imgname)
|
||
|
if 'detect_feet' in annots.keys() and annots['detect_feet'] and False:
|
||
|
continue
|
||
|
infos[base] = {}
|
||
|
for i, annot in enumerate(annots['annots']):
|
||
|
bbox = annot['bbox']
|
||
|
# 判断bbox大小
|
||
|
width, height = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
||
|
if width < 100 and height < 100:
|
||
|
continue
|
||
|
rot = 0
|
||
|
if not self.fullbody:
|
||
|
kpts = np.array(annot['keypoints'])
|
||
|
# 判断有没有脚的关键点
|
||
|
if kpts[11][-1] < THRES and kpts[14][-1] < THRES:
|
||
|
continue
|
||
|
# 判断(1, 8)的朝向
|
||
|
if kpts[1][-1] < THRES or kpts[8][-1] < THRES:
|
||
|
continue
|
||
|
dir_1_8 = np.array([kpts[1][0]-kpts[8][0], kpts[1][1]-kpts[8][1]])
|
||
|
dir_1_8 = dir_1_8 / np.linalg.norm(dir_1_8)
|
||
|
if dir_1_8[1] > 0.8:
|
||
|
rot = 180
|
||
|
crop, start = get_crop(image, bbox, rot)
|
||
|
cropname = join(self.tmpdir, 'images', f'{base}_{i}.jpg')
|
||
|
infos[base][i] = {
|
||
|
'crop_shape': crop.shape,
|
||
|
'start': start,
|
||
|
'rot': rot,
|
||
|
'name': f'{base}_{i}.json'
|
||
|
}
|
||
|
cv2.imwrite(cropname, crop)
|
||
|
crop_counts += 1
|
||
|
tmp_image_root = join(self.tmpdir, 'images')
|
||
|
tmp_annot_root = join(self.tmpdir, 'annots')
|
||
|
tmp_tmp_root = join(self.tmpdir, 'openpose')
|
||
|
print(len(os.listdir(tmp_image_root)), crop_counts)
|
||
|
run_openpose(tmp_image_root, tmp_tmp_root, self.config)
|
||
|
convert_from_openpose(tmp_tmp_root, tmp_annot_root, tmp_image_root, '.jpg') # 应该不存在任何数据竞争
|
||
|
for imgname in tqdm(imgnames, desc='{:10s}'.format(os.path.basename(annot_root))):
|
||
|
base = os.path.basename(imgname).replace(ext, '')
|
||
|
annotname = join(annot_root, base+'.json')
|
||
|
annots_ori = read_json(annotname)
|
||
|
if base not in infos.keys():
|
||
|
continue
|
||
|
for i, annots in enumerate(annots_ori['annots']):
|
||
|
if i not in infos[base].keys():
|
||
|
continue
|
||
|
info = infos[base][i]
|
||
|
cropname = join(tmp_annot_root, info['name'])
|
||
|
if not os.path.exists(cropname):
|
||
|
print('[WARN] {} not exists!'.format(cropname))
|
||
|
continue
|
||
|
annots_sub = read_json(cropname)['annots']
|
||
|
if len(annots_sub) < 1:
|
||
|
continue
|
||
|
keypoints = np.stack([np.array(d['keypoints']) for d in annots_sub])
|
||
|
if self.hand or self.face:
|
||
|
for key in ['handl2d', 'handr2d', 'face2d']:
|
||
|
if key in annots_sub[0].keys():
|
||
|
khand = np.array(annots_sub[0][key])[None]
|
||
|
annots[key] = transoform_foot(info['crop_shape'], info['start'], info['rot'], khand, None)
|
||
|
annots['bbox_'+key] = bbox_from_keypoints(annots[key])
|
||
|
if self.fullbody:
|
||
|
kpts_np = transoform_foot(info['crop_shape'], info['start'], info['rot'], keypoints, None)
|
||
|
else:
|
||
|
kpts = annots['keypoints']
|
||
|
kpts_np = transoform_foot(info['crop_shape'], info['start'], info['rot'], keypoints, kpts)
|
||
|
if False: # WARN: disable filter feet
|
||
|
kpts_np = filter_feet(kpts_np)
|
||
|
annots['keypoints'] = kpts_np
|
||
|
annots_ori['detect_feet'] = True
|
||
|
save_annot(annotname, annots_ori)
|
||
|
|
||
|
class FeetEstimator:
|
||
|
def __init__(self, openpose='/media/qing/Project/openpose') -> None:
|
||
|
import sys
|
||
|
sys.path.append('{}/build_py/python'.format(openpose))
|
||
|
from openpose import pyopenpose as op
|
||
|
opWrapper = op.WrapperPython()
|
||
|
params = dict()
|
||
|
params["model_folder"] = "{}/models".format(openpose)
|
||
|
opWrapper.configure(params)
|
||
|
opWrapper.start()
|
||
|
self.wrapper = opWrapper
|
||
|
self.rect = op.Rectangle
|
||
|
self.datum = op.Datum
|
||
|
try:
|
||
|
self.vec = op.VectorDatum
|
||
|
except:
|
||
|
self.vec = lambda x:x
|
||
|
|
||
|
def detect_foot(self, image_root, annot_root, ext):
|
||
|
# TODO:换成取heatmap的最大值
|
||
|
THRES = 0.3
|
||
|
imgnames = sorted(glob(join(image_root, '*'+ext)))
|
||
|
for imgname in tqdm(imgnames, desc='{:10s}'.format(os.path.basename(annot_root))):
|
||
|
base = os.path.basename(imgname).replace(ext, '')
|
||
|
annotname = join(annot_root, base+'.json')
|
||
|
annots = read_json(annotname)
|
||
|
image = cv2.imread(imgname)
|
||
|
for annot in annots['annots']:
|
||
|
bbox = annot['bbox']
|
||
|
kpts = np.array(annot['keypoints'])
|
||
|
# 判断(1, 8)的朝向
|
||
|
if kpts[1][-1] < THRES or kpts[8][-1] < THRES:
|
||
|
continue
|
||
|
dir_1_8 = np.array([kpts[1][0]-kpts[8][0], kpts[1][1]-kpts[8][1]])
|
||
|
dir_1_8 = dir_1_8 / np.linalg.norm(dir_1_8)
|
||
|
if dir_1_8[1] > 0.8:
|
||
|
rot = 180
|
||
|
else:
|
||
|
rot = 0
|
||
|
kpts = self._detect_with_bbox(image, kpts, bbox, rot=rot)
|
||
|
kpts = filter_feet()
|
||
|
annot['keypoints'] = kpts
|
||
|
save_annot(annotname, annots)
|
||
|
|
||
|
def _detect_with_bbox(self, image, kpts, bbox, rot=0):
|
||
|
crop, start = get_crop(image, bbox, rot)
|
||
|
datum = self.datum()
|
||
|
datum.cvInputData = crop
|
||
|
self.wrapper.emplaceAndPop(self.vec([datum]))
|
||
|
# keypoints: (N, 25, 3)
|
||
|
keypoints = datum.poseKeypoints
|
||
|
if len(keypoints.shape) < 3:
|
||
|
print(keypoints)
|
||
|
print('Not detect person!')
|
||
|
return kpts
|
||
|
kpts_np = transoform_foot(crop.shape, start, rot, keypoints, kpts)
|
||
|
return kpts_np
|