diff --git a/argussight/core/video_processes/savers/stream_buffer.py b/argussight/core/video_processes/savers/stream_buffer.py index af2cf32..b9ddb48 100644 --- a/argussight/core/video_processes/savers/stream_buffer.py +++ b/argussight/core/video_processes/savers/stream_buffer.py @@ -18,7 +18,8 @@ def create_commands_dict(cls) -> Dict[str, Any]: return {"save": cls.save_queue} def save_queue(self, save_format: str, personnal_folder: str) -> None: - self.save_iterable(self._queue, save_format, personnal_folder) + queue = self._queue.copy() + self.executor.submit(self.save_iterable, queue, save_format, personnal_folder) def add_to_iterable(self, frame: Dict) -> None: self._queue.append(frame) diff --git a/argussight/core/video_processes/savers/video_recorder.py b/argussight/core/video_processes/savers/video_recorder.py index 41ec28a..4de6ff9 100644 --- a/argussight/core/video_processes/savers/video_recorder.py +++ b/argussight/core/video_processes/savers/video_recorder.py @@ -4,24 +4,27 @@ import os import glob from PIL import Image +import shutil def remove_start_end(main: str, start: str, end: str) -> str: - if main.startswith(start): - main = main[len(start) :] + if start in main: + main = main.rsplit(start, 1)[1] if main.endswith(end): main = main[: -len(end)] return main def delete_all_files(folder_path: str) -> None: - files = glob.glob(os.path.join(folder_path, "*")) - - for file in files: + if os.path.exists(folder_path): try: - os.remove(file) + # Remove the entire directory tree + shutil.rmtree(folder_path) + print(f"Deleted {folder_path} and all its contents.") except Exception as e: - print(f"Failed to delete {file}: {e}") + print(f"Failed to delete {folder_path}: {e}") + else: + print(f"The folder {folder_path} does not exist.") class Recorder(VideoSaver): @@ -33,10 +36,13 @@ def __init__( ) -> None: super().__init__(collector_config, main_save_folder) self._recording = False - self._temp_folder = temp_folder + self._temp_counter = 0 # Make sure that there are no files in the temp folder from old recording failures - delete_all_files(self._temp_folder) + delete_all_files(temp_folder) + os.makedirs(temp_folder) + + self._temp_folder = os.path.join(temp_folder, f"{self._temp_counter}") @classmethod def create_commands_dict(cls) -> Dict[str, Any]: @@ -53,8 +59,7 @@ def add_to_iterable(self, frame: Dict) -> None: def get_frame_from_element( self, element: Any ) -> Tuple[Tuple[int, int], bytes, str]: - img_fpath = os.path.join(self._temp_folder, element) - frame = Image.open(img_fpath, "r") + frame = Image.open(element, "r") raw_data = frame.convert("RGB").tobytes() return frame.size, raw_data, remove_start_end(element, "img", ".jpg") @@ -69,11 +74,26 @@ def stop_record(self, save_format, personnal_folder) -> None: raise ProcessError("There is no recording to stop") image_names = [ - os.path.basename(image) + os.path.join(self._temp_folder, os.path.basename(image)) for image in glob.glob(os.path.join(self._temp_folder, "*jpg")) ] - print(image_names) - self.save_iterable(image_names, save_format, personnal_folder) - delete_all_files(self._temp_folder) + # create a process to make a video from the recorded frames + self.executor.submit( + self._stop_record, + image_names, + save_format, + personnal_folder, + self._temp_folder, + ) + + # go to next recording folder + self._temp_folder = self._temp_folder.rsplit("/")[0] + self._temp_counter += 1 + self._temp_folder = os.path.join(self._temp_folder, f"{self._temp_counter}") + self._recording = False + + def _stop_record(self, images_names, save_format, personnal_folder, temp_folder): + self.save_iterable(images_names, save_format, personnal_folder) + delete_all_files(temp_folder) diff --git a/argussight/core/video_processes/savers/video_saver.py b/argussight/core/video_processes/savers/video_saver.py index 0366393..178f675 100644 --- a/argussight/core/video_processes/savers/video_saver.py +++ b/argussight/core/video_processes/savers/video_saver.py @@ -8,6 +8,8 @@ import numpy as np import cv2 import os +from multiprocessing import Queue +import concurrent.futures class SaveFormat(Enum): @@ -25,6 +27,11 @@ def __init__(self, collector_config, main_save_folder: str) -> None: True # change this value for stopping to save frames to iterable ) + # saving videos and frames might take some time, the ThreadPool can be used + # to excute these processes in a seperate thread + # Normal threading doesn't work due to redis pubsub listener blocking + self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) + def save_frame(self, frame: dict, folder_path: str): img = Image.frombytes("RGB", frame["size"], frame["frame"], "raw") img.save( @@ -113,5 +120,12 @@ def read_frame(self, frame) -> None: frame["time_stamp"] = datetime.strptime( frame["time"], self._date_format ).strftime(self._date_format) - frame["data"] = base64.b64decode(frame["data"]) + frame["frame"] = base64.b64decode(frame["data"]) self.add_to_iterable(frame) + + # override run to correctly shutdown executor + def run(self, command_queue: Queue, response_queue: Queue) -> None: + try: + super().run(command_queue, response_queue) + finally: + self.executor.shutdown(wait=True)