EasyMocap/easymocap/estimator/openpose_wrapper.py

350 lines
14 KiB
Python
Raw Normal View History

2022-09-05 12:47:33 +08:00
'''
@ Date: 2021-08-21 14:16:38
@ Author: Qing Shuai
@ LastEditors: Qing Shuai
@ LastEditTime: 2022-05-23 23:10:43
@ FilePath: /EasyMocapPublic/easymocap/estimator/openpose_wrapper.py
'''
import os
import shutil
from tqdm import tqdm
from .wrapper_base import bbox_from_keypoints, create_annot_file, check_result
from ..mytools import read_json
from ..annotator.file_utils import save_annot
from os.path import join
import numpy as np
import cv2
from glob import glob
from multiprocessing import Process
def run_openpose(image_root, annot_root, config):
2023-02-07 15:36:19 +08:00
image_root = os.path.realpath(image_root)
annot_root = os.path.realpath(annot_root)
2022-09-05 12:47:33 +08:00
os.makedirs(annot_root, exist_ok=True)
pwd = os.getcwd()
if os.name != 'nt':
cmd = './build/examples/openpose/openpose.bin --image_dir {} --write_json {} --display 0'.format(
image_root, annot_root)
else:
cmd = 'bin\\OpenPoseDemo.exe --image_dir {} --write_json {} --display 0'.format(
os.path.abspath(image_root), os.path.abspath(annot_root))
if config['res'] != 1:
cmd = cmd + ' --net_resolution -1x{}'.format(int(16*((368*config['res'])//16)))
if config['hand']:
cmd = cmd + ' --hand'
if config['face']:
cmd = cmd + ' --face'
if config['vis']:
cmd = cmd + ' --write_images {}'.format(annot_root)
else:
cmd = cmd + ' --render_pose 0'
os.chdir(config['root'])
print(cmd)
os.system(cmd)
os.chdir(pwd)
def convert_from_openpose(src, dst, image_root, ext):
# convert the 2d pose from openpose
inputlist = sorted(os.listdir(src))
for inp in tqdm(inputlist, desc='{:10s}'.format(os.path.basename(dst))):
annots = load_openpose(join(src, inp))
base = inp.replace('_keypoints.json', '')
annotname = join(dst, base+'.json')
imgname = join(image_root, inp.replace('_keypoints.json', ext))
annot = create_annot_file(annotname, imgname)
annot['annots'] = annots
save_annot(annotname, annot)
global_tasks = []
def extract_2d(image_root, annot_root, tmp_root, config):
if check_result(image_root, annot_root):
return global_tasks
if not check_result(image_root, tmp_root):
run_openpose(image_root, tmp_root, config)
# TODO: add current task to global_tasks
thread = Process(target=convert_from_openpose,
args=(tmp_root, annot_root, image_root, config['ext'])) # 应该不存在任何数据竞争
thread.start()
global_tasks.append(thread)
return global_tasks
def load_openpose(opname):
mapname = {
'face_keypoints_2d':'face2d',
'hand_left_keypoints_2d':'handl2d',
'hand_right_keypoints_2d':'handr2d'}
assert os.path.exists(opname), opname
data = read_json(opname)
out = []
pid = 0
for i, d in enumerate(data['people']):
keypoints = d['pose_keypoints_2d']
keypoints = np.array(keypoints).reshape(-1, 3)
annot = {
'bbox': bbox_from_keypoints(keypoints),
'personID': pid + i,
'keypoints': keypoints.tolist(),
'isKeyframe': False
}
bbox = annot['bbox']
if bbox[-1] < 0.01:
continue
annot['area'] = (bbox[2] - bbox[0])*(bbox[3] - bbox[1])
for key in mapname.keys():
if len(d[key]) == 0:
continue
kpts = np.array(d[key]).reshape(-1, 3)
annot[mapname[key]] = kpts.tolist()
annot['bbox_'+mapname[key]] = bbox_from_keypoints(kpts)
out.append(annot)
out.sort(key=lambda x:-x['area'])
for i in range(len(out)):
out[i]['personID'] = pid + i
return out
def get_crop(image, bbox, rot, scale=1.2):
l, t, r, b, _ = bbox
cx = (l+r)/2
cy = (t+b)/2
wx = (r-l)*scale/2
wy = (b-t)*scale/2
l = cx - wx
r = cx + wx
t = cy - wy
b = cy + wy
l = max(0, int(l+0.5))
t = max(0, int(t+0.5))
r = min(image.shape[1], int(r+0.5))
b = min(image.shape[0], int(b+0.5))
crop = image[t:b, l:r].copy()
crop = np.ascontiguousarray(crop)
# rotate the image
if rot == 180:
crop = cv2.flip(crop, -1)
return crop, (l, t)
def transoform_foot(crop_shape, start, rot, keypoints, kpts_old=None):
l, t = start
if rot == 180:
keypoints[..., 0] = crop_shape[1] - keypoints[..., 0] - 1
keypoints[..., 1] = crop_shape[0] - keypoints[..., 1] - 1
keypoints[..., 0] += l
keypoints[..., 1] += t
if kpts_old is None:
kpts_op = keypoints[0]
return kpts_op
# 选择最好的
kpts_np = np.array(kpts_old)
dist = np.linalg.norm(kpts_np[None, :15, :2] - keypoints[:, :15, :2], axis=-1)
conf = np.minimum(kpts_np[None, :15, 2], keypoints[:, :15, 2])
dist = (dist * conf).sum(axis=-1)/conf.sum(axis=-1)*conf.shape[1]/(conf>0).sum(axis=-1)
best = dist.argmin()
kpts_op = keypoints[best]
# TODO:判断一下关键点
# 这里以HRNet的估计为准
# WARN: disable feet
# 判断OpenPose的脚与HRNet的脚是否重合
if (kpts_np[[11, 14], -1] > 0.3).all() and (kpts_op[[11, 14], -1] > 0.3).all():
dist_ll = np.linalg.norm(kpts_np[11, :2] - kpts_op[11, :2])
dist_rr = np.linalg.norm(kpts_np[14, :2] - kpts_op[14, :2])
dist_lr = np.linalg.norm(kpts_np[11, :2] - kpts_op[14, :2])
dist_rl = np.linalg.norm(kpts_np[14, :2] - kpts_op[11, :2])
if dist_lr < dist_ll and dist_rl < dist_rr:
kpts_op[[19, 20, 21, 22, 23, 24]] = kpts_op[[22, 23, 24, 19, 20, 21]]
# if (kpts_np[[11, 14], -1] > 0.3).all() and (kpts_op[[19, 22], -1] > 0.3).all():
# if np.linalg.norm(kpts_op[19, :2] - kpts_np[11, :2]) \
# < np.linalg.norm(kpts_op[19, :2] - kpts_np[14, :2])\
# and np.linalg.norm(kpts_op[22, :2] - kpts_np[11, :2]) \
# > np.linalg.norm(kpts_op[22, :2] - kpts_np[14, :2]):
# kpts_op[[19, 20, 21, 22, 23, 24]] = kpts_op[[22, 23, 24, 19, 20, 21]]
# print('[info] swap left and right')
# 直接选择第一个
kpts_np[19:] = kpts_op[19:]
return kpts_np
def filter_feet(kpts):
# 判断左脚
if (kpts[[13, 14, 19], -1]>0).all():
l_feet = ((kpts[[19,20,21],-1]>0)*np.linalg.norm(kpts[[19, 20, 21], :2] - kpts[14, :2], axis=-1)).max()
l_leg = np.linalg.norm(kpts[13, :2] - kpts[14, :2])
if l_leg < 1.5 * l_feet:
kpts[[19, 20, 21]] = 0.
print('[LOG] remove left ankle {} < {}'.format(l_leg, l_feet))
# 判断右脚
if (kpts[[10, 11], -1]>0).all():
l_feet = ((kpts[[22, 23, 24],-1]>0)*np.linalg.norm(kpts[[22, 23, 24], :2] - kpts[11, :2], axis=-1)).max()
l_leg = np.linalg.norm(kpts[10, :2] - kpts[11, :2])
if l_leg < 1.5 * l_feet:
kpts[[22, 23, 24]] = 0.
print('[LOG] remove right ankle {} < {}'.format(l_leg, l_feet))
return kpts
class FeetEstimatorByCrop:
def __init__(self, openpose, tmpdir=None, fullbody=False, hand=False, face=False) -> None:
self.openpose = openpose
if tmpdir is None:
tmpdir = os.path.abspath(join('./', 'tmp'))
else:
tmpdir = os.path.abspath(tmpdir)
self.tmpdir = tmpdir
self.fullbody = fullbody
self.hand = hand
self.face = face
if os.path.exists(tmpdir):
shutil.rmtree(tmpdir)
os.makedirs(join(tmpdir, 'images'), exist_ok=True)
self.config = {
'root': self.openpose,
'res': 1,
'hand': hand, # detect hand when in fullbody mode
'face': face,
'vis': False,
}
def detect_foot(self, image_root, annot_root, ext):
# TODO:换成取heatmap的最大值
THRES = 0.3
imgnames = sorted(glob(join(image_root, '*'+ext)))
if len(imgnames) == 0:
# 尝试换成png格式
ext = '.png'
imgnames = sorted(glob(join(image_root, '*'+ext)))
infos = {}
crop_counts = 0
for imgname in tqdm(imgnames, desc='{:10s}'.format(os.path.basename(annot_root))):
base = os.path.basename(imgname).replace(ext, '')
annotname = join(annot_root, base+'.json')
annots = read_json(annotname)
image = cv2.imread(imgname)
if 'detect_feet' in annots.keys() and annots['detect_feet'] and False:
continue
infos[base] = {}
for i, annot in enumerate(annots['annots']):
bbox = annot['bbox']
# 判断bbox大小
width, height = bbox[2] - bbox[0], bbox[3] - bbox[1]
if width < 100 and height < 100:
continue
rot = 0
if not self.fullbody:
kpts = np.array(annot['keypoints'])
# 判断有没有脚的关键点
if kpts[11][-1] < THRES and kpts[14][-1] < THRES:
continue
# 判断(1, 8)的朝向
if kpts[1][-1] < THRES or kpts[8][-1] < THRES:
continue
dir_1_8 = np.array([kpts[1][0]-kpts[8][0], kpts[1][1]-kpts[8][1]])
dir_1_8 = dir_1_8 / np.linalg.norm(dir_1_8)
if dir_1_8[1] > 0.8:
rot = 180
crop, start = get_crop(image, bbox, rot)
cropname = join(self.tmpdir, 'images', f'{base}_{i}.jpg')
infos[base][i] = {
'crop_shape': crop.shape,
'start': start,
'rot': rot,
'name': f'{base}_{i}.json'
}
cv2.imwrite(cropname, crop)
crop_counts += 1
tmp_image_root = join(self.tmpdir, 'images')
tmp_annot_root = join(self.tmpdir, 'annots')
tmp_tmp_root = join(self.tmpdir, 'openpose')
print(len(os.listdir(tmp_image_root)), crop_counts)
run_openpose(tmp_image_root, tmp_tmp_root, self.config)
convert_from_openpose(tmp_tmp_root, tmp_annot_root, tmp_image_root, '.jpg') # 应该不存在任何数据竞争
for imgname in tqdm(imgnames, desc='{:10s}'.format(os.path.basename(annot_root))):
base = os.path.basename(imgname).replace(ext, '')
annotname = join(annot_root, base+'.json')
annots_ori = read_json(annotname)
if base not in infos.keys():
continue
for i, annots in enumerate(annots_ori['annots']):
if i not in infos[base].keys():
continue
info = infos[base][i]
cropname = join(tmp_annot_root, info['name'])
if not os.path.exists(cropname):
print('[WARN] {} not exists!'.format(cropname))
continue
annots_sub = read_json(cropname)['annots']
if len(annots_sub) < 1:
continue
keypoints = np.stack([np.array(d['keypoints']) for d in annots_sub])
if self.hand or self.face:
for key in ['handl2d', 'handr2d', 'face2d']:
if key in annots_sub[0].keys():
khand = np.array(annots_sub[0][key])[None]
annots[key] = transoform_foot(info['crop_shape'], info['start'], info['rot'], khand, None)
annots['bbox_'+key] = bbox_from_keypoints(annots[key])
if self.fullbody:
kpts_np = transoform_foot(info['crop_shape'], info['start'], info['rot'], keypoints, None)
else:
kpts = annots['keypoints']
kpts_np = transoform_foot(info['crop_shape'], info['start'], info['rot'], keypoints, kpts)
if False: # WARN: disable filter feet
kpts_np = filter_feet(kpts_np)
annots['keypoints'] = kpts_np
annots_ori['detect_feet'] = True
save_annot(annotname, annots_ori)
class FeetEstimator:
def __init__(self, openpose='/media/qing/Project/openpose') -> None:
import sys
sys.path.append('{}/build_py/python'.format(openpose))
from openpose import pyopenpose as op
opWrapper = op.WrapperPython()
params = dict()
params["model_folder"] = "{}/models".format(openpose)
opWrapper.configure(params)
opWrapper.start()
self.wrapper = opWrapper
self.rect = op.Rectangle
self.datum = op.Datum
try:
self.vec = op.VectorDatum
except:
self.vec = lambda x:x
def detect_foot(self, image_root, annot_root, ext):
# TODO:换成取heatmap的最大值
THRES = 0.3
imgnames = sorted(glob(join(image_root, '*'+ext)))
for imgname in tqdm(imgnames, desc='{:10s}'.format(os.path.basename(annot_root))):
base = os.path.basename(imgname).replace(ext, '')
annotname = join(annot_root, base+'.json')
annots = read_json(annotname)
image = cv2.imread(imgname)
for annot in annots['annots']:
bbox = annot['bbox']
kpts = np.array(annot['keypoints'])
# 判断(1, 8)的朝向
if kpts[1][-1] < THRES or kpts[8][-1] < THRES:
continue
dir_1_8 = np.array([kpts[1][0]-kpts[8][0], kpts[1][1]-kpts[8][1]])
dir_1_8 = dir_1_8 / np.linalg.norm(dir_1_8)
if dir_1_8[1] > 0.8:
rot = 180
else:
rot = 0
kpts = self._detect_with_bbox(image, kpts, bbox, rot=rot)
kpts = filter_feet()
annot['keypoints'] = kpts
save_annot(annotname, annots)
def _detect_with_bbox(self, image, kpts, bbox, rot=0):
crop, start = get_crop(image, bbox, rot)
datum = self.datum()
datum.cvInputData = crop
self.wrapper.emplaceAndPop(self.vec([datum]))
# keypoints: (N, 25, 3)
keypoints = datum.poseKeypoints
if len(keypoints.shape) < 3:
print(keypoints)
print('Not detect person!')
return kpts
kpts_np = transoform_foot(crop.shape, start, rot, keypoints, kpts)
return kpts_np