fix outputs initiation

This commit is contained in:
Laizo 2024-11-22 10:27:13 +01:00
parent 00a1320eed
commit f21fea13d3

View File

@ -38,7 +38,6 @@ import json
import logging
import cv2
import time
import threading
import queue
import multiprocessing
@ -157,9 +156,19 @@ def rtm_estimator(config_dict):
shared_buffers[unique_name] = shm
available_buffers.put(unique_name)
# Create dictionaries to hold outputs for each source
source_outputs = {}
# Start reading processes for each source
reading_processes = []
for source in sources:
# Initialize outputs for each source
outputs = setup_capture_directories(
source['path'], pose_dir, 'to_images' in config_dict['project'].get('save_video', [])
)
# Correctly store the outputs in the dictionary
source_outputs[source['id']] = outputs
process = SourceProcess(
source,
config_dict,
@ -171,6 +180,7 @@ def rtm_estimator(config_dict):
process.start()
reading_processes.append(process)
# Compute the maximum number of worker processes
cpu_count = multiprocessing.cpu_count()
num_sources = len(sources)
@ -187,7 +197,8 @@ def rtm_estimator(config_dict):
shared_buffers,
shared_counts,
pose_tracker_settings,
display_queue
display_queue,
source_outputs
)
worker.start()
worker_processes.append(worker)
@ -196,7 +207,7 @@ def rtm_estimator(config_dict):
display_thread = None
if show_realtime_results:
input_size = config_dict['pose'].get('input_size', (640, 480))
display_thread = CombinedDisplayThread(sources, input_size, display_queue)
display_thread = DisplayProcess(sources, input_size, display_queue)
display_thread.start()
# Initialize progress bars
@ -237,6 +248,7 @@ def rtm_estimator(config_dict):
pb.close()
logging.shutdown()
def process_single_frame(config_dict, frame, source_id, frame_idx, output_dirs, pose_tracker, multi_person, save_video, save_images, show_realtime_results, output_format, out_vid):
'''
Processes a single frame from a source.
@ -285,7 +297,7 @@ def process_single_frame(config_dict, frame, source_id, frame_idx, output_dirs,
return source_id, img_show
class CombinedDisplayThread(threading.Thread):
class DisplayProcess(multiprocessing.Process):
'''
Thread for displaying combined images to avoid thread-safety issues with OpenCV.
'''
@ -342,7 +354,7 @@ class CombinedDisplayThread(threading.Thread):
class WorkerProcess(multiprocessing.Process):
def __init__(self, config_dict, frame_queue, available_buffers, shared_buffers, shared_counts, pose_tracker_settings, display_queue):
def __init__(self, config_dict, frame_queue, available_buffers, shared_buffers, shared_counts, pose_tracker_settings, display_queue, source_outputs):
super().__init__()
self.config_dict = config_dict
self.frame_queue = frame_queue
@ -352,6 +364,7 @@ class WorkerProcess(multiprocessing.Process):
self.pose_tracker_settings = pose_tracker_settings
self.display_queue = display_queue
self.stopped = False
self.source_outputs = source_outputs
def run(self):
try:
@ -362,16 +375,11 @@ class WorkerProcess(multiprocessing.Process):
multi_person = self.config_dict['project'].get('multi_person', False)
save_video = 'to_video' in self.config_dict['project'].get('save_video', [])
save_images = 'to_images' in self.config_dict['project'].get('save_video', [])
show_realtime_results = self.config_dict['project'].get('show_realtime_results', False)
show_realtime_results = self.config_dict['pose'].get('show_realtime_results', False)
output_format = self.config_dict['project'].get('output_format', 'openpose')
out_vid = None
if save_video:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = self.config_dict['pose'].get('fps', 30)
input_size = self.config_dict['pose'].get('input_size', (640, 480))
H, W = input_size[1], input_size[0]
output_video_path = self.outputs[4]
out_vid = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H))
# Initialize a dictionary to store out_vids for each source inside the process
local_out_vids = {}
while True:
item = self.frame_queue.get()
@ -382,13 +390,29 @@ class WorkerProcess(multiprocessing.Process):
shm = self.shared_buffers[buffer_name]
frame = np.ndarray(frame_shape, dtype=np.dtype(frame_dtype_str), buffer=shm.buf)
# Get the outputs for this source
outputs = self.source_outputs[source_id]
# Create out_vid for this source if needed and not already created
if save_video and source_id not in local_out_vids:
# Create VideoWriter for this source
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = self.config_dict['pose'].get('fps', 30)
input_size = self.config_dict['pose'].get('input_size', (640, 480))
H, W = input_size[1], input_size[0]
output_video_path = outputs[4]
out_vid = cv2.VideoWriter(output_video_path, fourcc, fps, (W, H))
local_out_vids[source_id] = out_vid
else:
out_vid = local_out_vids.get(source_id)
# Process the frame
result = process_single_frame(
self.config_dict,
frame,
source_id,
frame_idx,
self.outputs,
outputs,
pose_tracker,
multi_person,
save_video,
@ -409,6 +433,10 @@ class WorkerProcess(multiprocessing.Process):
if self.display_queue:
self.display_queue.put({source_id: result[1]})
# Release VideoWriters
for out_vid in local_out_vids.values():
out_vid.release()
except Exception as e:
logging.error(f"Error in WorkerProcess: {e}")
self.stopped = True
@ -416,6 +444,7 @@ class WorkerProcess(multiprocessing.Process):
class SourceProcess(multiprocessing.Process):
def __init__(self, source, config_dict, frame_queue, available_buffers, shared_buffers, shared_counts):
super().__init__()
self.source = source
self.config_dict = config_dict
self.frame_queue = frame_queue
@ -432,9 +461,6 @@ class SourceProcess(multiprocessing.Process):
self.available_buffers = available_buffers
self.shared_buffers = shared_buffers
self.shared_counts = shared_counts
# Initialize other variables as needed
def parse_frame_ranges(self, frame_ranges):
if self.source['type'] != 'webcam':