diff --git a/redis_streamer/core.py b/redis_streamer/core.py index 9380b7a..6851529 100644 --- a/redis_streamer/core.py +++ b/redis_streamer/core.py @@ -2,6 +2,7 @@ import os import time import asyncio +from concurrent.futures import ProcessPoolExecutor from redis import asyncio as aioredis from redis_streamer import utils @@ -14,6 +15,8 @@ async def init(self): print("Connecting to", url, '...') self.r = await aioredis.from_url(url=url, max_connections=max_connections) print("Connected?", await self.r.ping()) + self.pool = ProcessPoolExecutor() + ctx = Context() META_PREFIX = 'XMETA' diff --git a/redis_streamer/graphql_schema/recorder.py b/redis_streamer/graphql_schema/recorder.py index 8f4557e..73dcf96 100644 --- a/redis_streamer/graphql_schema/recorder.py +++ b/redis_streamer/graphql_schema/recorder.py @@ -1,13 +1,18 @@ from __future__ import annotations import typing import asyncio +from fastapi import Request # import ray import strawberry from strawberry.scalars import JSON, Base64 from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper from redis_streamer.config import * -from redis_streamer.models import session, RecordingModel, get_recording, create_recording, end_recording, rename_recording, delete_recording -from .. import recorder +from redis_streamer.models import ( + session, RecordingModel, + get_recording, get_last_recording, + create_recording, end_recording, rename_recording, + delete_recording, delete_all_recordings) +from ..recorder import RecordingWriter strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper() @@ -32,39 +37,50 @@ def recording(self, name: strawberry.ID) -> Recording: @strawberry.field async def current_recording(self) -> str|None: - return await writer.current_recording() + return await RecordingWriter.current_recording() # writer = recorder.RecordingWriter.remote() -writer = recorder.RecordingWriter() +# Writer = recorder.RecordingWriter @strawberry.type class RecordingMutation: @strawberry.mutation async def start_recording(self, name: str='') -> JSON: name = create_recording(name) - await writer.start(name) + await RecordingWriter.start(name) return get_recording(name).as_dict() @strawberry.mutation async def stop_recording(self) -> JSON: # name = ray.get(writer.get_current_recording_name.remote()) - name = await writer.current_recording() - print(name) + name = await RecordingWriter.current_recording() if not name: raise RuntimeError("No recording running") - print('before stop') - await writer.stop() + print('stop', name) + await RecordingWriter.stop() print('after stop') end_recording(name) return get_recording(name).as_dict() @strawberry.mutation - async def rename_recording(self, name: str, new_name: str) -> JSON: - rename_recording(name, new_name) - return get_recording(new_name).as_dict() + async def rename_recording(self, name: str, previous_name: str='') -> JSON: + if not previous_name: + previous_name = get_last_recording().name + + print('rename', previous_name, '->', name) + rename_recording(previous_name, name) + return get_recording(name).as_dict() @strawberry.mutation async def delete_recording(self, name: str) -> None: + print('delete', name) delete_recording(name) - return \ No newline at end of file + return + + @strawberry.mutation + async def delete_all_recordings(self) -> None: + delete_all_recordings(confirm=True) + return + + \ No newline at end of file diff --git a/redis_streamer/models/recording.py b/redis_streamer/models/recording.py index a1d0905..1390902 100644 --- a/redis_streamer/models/recording.py +++ b/redis_streamer/models/recording.py @@ -1,5 +1,6 @@ import datetime from sqlalchemy import Column, Integer, String, ForeignKey, DateTime +from sqlalchemy.sql import func from . import Base, session class RecordingModel(Base): @@ -8,6 +9,7 @@ class RecordingModel(Base): start_time: DateTime = Column(DateTime, nullable=True) end_time: DateTime = Column(DateTime, nullable=True) device_name: str = Column(String, nullable=True) + alias_name: str = Column(String, nullable=True) def as_dict(self): start = self.start_time @@ -35,12 +37,29 @@ def end_recording(name): def get_recording(name): return session.query(RecordingModel).get(name) +def get_last_recording(): + return session.query(RecordingModel).order_by(RecordingModel.start_time.desc()).first() + +def free_up_name(name): + new_name_q = RecordingModel.name + " " + func.to_char(RecordingModel.start_time, "%Y-%m-%dT%H-%M-%S") + named = session.query([RecordingModel.name, new_name_q]).filter(RecordingModel.name == name).all() + session.query(RecordingModel).filter(RecordingModel.name == name).update({ + 'name': new_name_q, + 'alias_name': RecordingModel.name, + }) + session.commit() + return named + def rename_recording(old_name, new_name): - rec = session.query(RecordingModel).filter(RecordingModel.name == old_name).update({'name': new_name}) - # rec.name = new_name + session.query(RecordingModel).filter(RecordingModel.name == old_name).update({'name': new_name}) session.commit() def delete_recording(name): session.query(RecordingModel).filter(RecordingModel.name == name).delete() session.commit() +def delete_all_recordings(*, confirm=False): + if not confirm: + raise RuntimeError("you must pass confirm=True") + session.query(RecordingModel).delete() + session.commit() diff --git a/redis_streamer/recorder.py b/redis_streamer/recorder.py index 3e7c6a7..45f2469 100644 --- a/redis_streamer/recorder.py +++ b/redis_streamer/recorder.py @@ -3,7 +3,6 @@ import asyncio # import ray from multiprocessing import Event -from concurrent.futures import ProcessPoolExecutor import datetime from .core import ctx, Agent from .models import session, RecordingModel, create_recording, end_recording @@ -16,21 +15,24 @@ class RecordingWriter: def __init__(self) -> None: - self.pool = ProcessPoolExecutor() + pass #self.pool = ProcessPoolExecutor() - async def current_recording(self): - return (await ctx.r.get(RECORDING_NAME) or b'').decode('utf-8') or None + @classmethod + async def current_recording(cls, key=RECORDING_NAME): + return (await ctx.r.get(key) or b'').decode('utf-8') or None - async def start(self, name): - current_name = await self.current_recording() + @classmethod + async def start(cls, name): + current_name = await cls.current_recording() if current_name: raise RuntimeError(f"already recording {current_name}") await ctx.r.set(RECORDING_NAME, name) - self.pool.submit(self._record, name, RECORDING_NAME) + ctx.pool.submit(cls._record, name, RECORDING_NAME) print("submitteds") - async def stop(self): - current_name = await self.current_recording() + @classmethod + async def stop(cls): + current_name = await cls.current_recording() print('stop') if not current_name: raise RuntimeError("not recording") diff --git a/redis_streamer/sample.gql b/redis_streamer/sample.gql new file mode 100644 index 0000000..04b3e4b --- /dev/null +++ b/redis_streamer/sample.gql @@ -0,0 +1,28 @@ +query ls { + recordings { + name + startTime + endTime + } + currentRecording +} + +mutation start { + startRecording +} + +mutation stop { + stopRecording +} + +mutation rename { + renameRecording(name: "aaaa") +} + +mutation delete { + deleteRecording(name: "aaaa") +} + +mutation delete_all { + deleteAllRecordings +} \ No newline at end of file