Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix redis issue for savers #22

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion argussight/core/video_processes/savers/stream_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def create_commands_dict(cls) -> Dict[str, Any]:
return {"save": cls.save_queue}

def save_queue(self, save_format: str, personnal_folder: str) -> None:
self.save_iterable(self._queue, save_format, personnal_folder)
queue = self._queue.copy()
self.executor.submit(self.save_iterable, queue, save_format, personnal_folder)

def add_to_iterable(self, frame: Dict) -> None:
self._queue.append(frame)
50 changes: 35 additions & 15 deletions argussight/core/video_processes/savers/video_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,27 @@
import os
import glob
from PIL import Image
import shutil


def remove_start_end(main: str, start: str, end: str) -> str:
if main.startswith(start):
main = main[len(start) :]
if start in main:
main = main.rsplit(start, 1)[1]
if main.endswith(end):
main = main[: -len(end)]
return main


def delete_all_files(folder_path: str) -> None:
files = glob.glob(os.path.join(folder_path, "*"))

for file in files:
if os.path.exists(folder_path):
try:
os.remove(file)
# Remove the entire directory tree
shutil.rmtree(folder_path)
print(f"Deleted {folder_path} and all its contents.")
except Exception as e:
print(f"Failed to delete {file}: {e}")
print(f"Failed to delete {folder_path}: {e}")
else:
print(f"The folder {folder_path} does not exist.")


class Recorder(VideoSaver):
Expand All @@ -33,10 +36,13 @@ def __init__(
) -> None:
super().__init__(collector_config, main_save_folder)
self._recording = False
self._temp_folder = temp_folder
self._temp_counter = 0

# Make sure that there are no files in the temp folder from old recording failures
delete_all_files(self._temp_folder)
delete_all_files(temp_folder)
os.makedirs(temp_folder)

self._temp_folder = os.path.join(temp_folder, f"{self._temp_counter}")

@classmethod
def create_commands_dict(cls) -> Dict[str, Any]:
Expand All @@ -53,8 +59,7 @@ def add_to_iterable(self, frame: Dict) -> None:
def get_frame_from_element(
self, element: Any
) -> Tuple[Tuple[int, int], bytes, str]:
img_fpath = os.path.join(self._temp_folder, element)
frame = Image.open(img_fpath, "r")
frame = Image.open(element, "r")
raw_data = frame.convert("RGB").tobytes()

return frame.size, raw_data, remove_start_end(element, "img", ".jpg")
Expand All @@ -69,11 +74,26 @@ def stop_record(self, save_format, personnal_folder) -> None:
raise ProcessError("There is no recording to stop")

image_names = [
os.path.basename(image)
os.path.join(self._temp_folder, os.path.basename(image))
for image in glob.glob(os.path.join(self._temp_folder, "*jpg"))
]
print(image_names)
self.save_iterable(image_names, save_format, personnal_folder)

delete_all_files(self._temp_folder)
# create a process to make a video from the recorded frames
self.executor.submit(
self._stop_record,
image_names,
save_format,
personnal_folder,
self._temp_folder,
)

# go to next recording folder
self._temp_folder = self._temp_folder.rsplit("/")[0]
self._temp_counter += 1
self._temp_folder = os.path.join(self._temp_folder, f"{self._temp_counter}")

self._recording = False

def _stop_record(self, images_names, save_format, personnal_folder, temp_folder):
self.save_iterable(images_names, save_format, personnal_folder)
delete_all_files(temp_folder)
16 changes: 15 additions & 1 deletion argussight/core/video_processes/savers/video_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import numpy as np
import cv2
import os
from multiprocessing import Queue
import concurrent.futures


class SaveFormat(Enum):
Expand All @@ -25,6 +27,11 @@ def __init__(self, collector_config, main_save_folder: str) -> None:
True # change this value for stopping to save frames to iterable
)

# saving videos and frames might take some time, the ThreadPool can be used
# to excute these processes in a seperate thread
# Normal threading doesn't work due to redis pubsub listener blocking
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

def save_frame(self, frame: dict, folder_path: str):
img = Image.frombytes("RGB", frame["size"], frame["frame"], "raw")
img.save(
Expand Down Expand Up @@ -113,5 +120,12 @@ def read_frame(self, frame) -> None:
frame["time_stamp"] = datetime.strptime(
frame["time"], self._date_format
).strftime(self._date_format)
frame["data"] = base64.b64decode(frame["data"])
frame["frame"] = base64.b64decode(frame["data"])
self.add_to_iterable(frame)

# override run to correctly shutdown executor
def run(self, command_queue: Queue, response_queue: Queue) -> None:
try:
super().run(command_queue, response_queue)
finally:
self.executor.shutdown(wait=True)