Skip to content

Commit

Permalink
got basic queries working right
Browse files Browse the repository at this point in the history
  • Loading branch information
beasteers committed Apr 26, 2023
1 parent df235b7 commit 06b594c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 24 deletions.
3 changes: 3 additions & 0 deletions redis_streamer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
from redis import asyncio as aioredis

from redis_streamer import utils
Expand All @@ -14,6 +15,8 @@ async def init(self):
print("Connecting to", url, '...')
self.r = await aioredis.from_url(url=url, max_connections=max_connections)
print("Connected?", await self.r.ping())
self.pool = ProcessPoolExecutor()

ctx = Context()

META_PREFIX = 'XMETA'
Expand Down
42 changes: 29 additions & 13 deletions redis_streamer/graphql_schema/recorder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
from __future__ import annotations
import typing
import asyncio
from fastapi import Request
# import ray
import strawberry
from strawberry.scalars import JSON, Base64
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper
from redis_streamer.config import *
from redis_streamer.models import session, RecordingModel, get_recording, create_recording, end_recording, rename_recording, delete_recording
from .. import recorder
from redis_streamer.models import (
session, RecordingModel,
get_recording, get_last_recording,
create_recording, end_recording, rename_recording,
delete_recording, delete_all_recordings)
from ..recorder import RecordingWriter

strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper()

Expand All @@ -32,39 +37,50 @@ def recording(self, name: strawberry.ID) -> Recording:

@strawberry.field
async def current_recording(self) -> str|None:
return await writer.current_recording()
return await RecordingWriter.current_recording()


# writer = recorder.RecordingWriter.remote()
writer = recorder.RecordingWriter()
# Writer = recorder.RecordingWriter

@strawberry.type
class RecordingMutation:
@strawberry.mutation
async def start_recording(self, name: str='') -> JSON:
name = create_recording(name)
await writer.start(name)
await RecordingWriter.start(name)
return get_recording(name).as_dict()

@strawberry.mutation
async def stop_recording(self) -> JSON:
# name = ray.get(writer.get_current_recording_name.remote())
name = await writer.current_recording()
print(name)
name = await RecordingWriter.current_recording()
if not name:
raise RuntimeError("No recording running")
print('before stop')
await writer.stop()
print('stop', name)
await RecordingWriter.stop()
print('after stop')
end_recording(name)
return get_recording(name).as_dict()

@strawberry.mutation
async def rename_recording(self, name: str, new_name: str) -> JSON:
rename_recording(name, new_name)
return get_recording(new_name).as_dict()
async def rename_recording(self, name: str, previous_name: str='') -> JSON:
if not previous_name:
previous_name = get_last_recording().name

print('rename', previous_name, '->', name)
rename_recording(previous_name, name)
return get_recording(name).as_dict()

@strawberry.mutation
async def delete_recording(self, name: str) -> None:
print('delete', name)
delete_recording(name)
return
return

@strawberry.mutation
async def delete_all_recordings(self) -> None:
delete_all_recordings(confirm=True)
return


23 changes: 21 additions & 2 deletions redis_streamer/models/recording.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
from sqlalchemy.sql import func
from . import Base, session

class RecordingModel(Base):
Expand All @@ -8,6 +9,7 @@ class RecordingModel(Base):
start_time: DateTime = Column(DateTime, nullable=True)
end_time: DateTime = Column(DateTime, nullable=True)
device_name: str = Column(String, nullable=True)
alias_name: str = Column(String, nullable=True)

def as_dict(self):
start = self.start_time
Expand Down Expand Up @@ -35,12 +37,29 @@ def end_recording(name):
def get_recording(name):
return session.query(RecordingModel).get(name)

def get_last_recording():
return session.query(RecordingModel).order_by(RecordingModel.start_time.desc()).first()

def free_up_name(name):
new_name_q = RecordingModel.name + " " + func.to_char(RecordingModel.start_time, "%Y-%m-%dT%H-%M-%S")
named = session.query([RecordingModel.name, new_name_q]).filter(RecordingModel.name == name).all()
session.query(RecordingModel).filter(RecordingModel.name == name).update({
'name': new_name_q,
'alias_name': RecordingModel.name,
})
session.commit()
return named

def rename_recording(old_name, new_name):
rec = session.query(RecordingModel).filter(RecordingModel.name == old_name).update({'name': new_name})
# rec.name = new_name
session.query(RecordingModel).filter(RecordingModel.name == old_name).update({'name': new_name})
session.commit()

def delete_recording(name):
session.query(RecordingModel).filter(RecordingModel.name == name).delete()
session.commit()

def delete_all_recordings(*, confirm=False):
if not confirm:
raise RuntimeError("you must pass confirm=True")
session.query(RecordingModel).delete()
session.commit()
20 changes: 11 additions & 9 deletions redis_streamer/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
# import ray
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor
import datetime
from .core import ctx, Agent
from .models import session, RecordingModel, create_recording, end_recording
Expand All @@ -16,21 +15,24 @@

class RecordingWriter:
def __init__(self) -> None:
self.pool = ProcessPoolExecutor()
pass #self.pool = ProcessPoolExecutor()

async def current_recording(self):
return (await ctx.r.get(RECORDING_NAME) or b'').decode('utf-8') or None
@classmethod
async def current_recording(cls, key=RECORDING_NAME):
return (await ctx.r.get(key) or b'').decode('utf-8') or None

async def start(self, name):
current_name = await self.current_recording()
@classmethod
async def start(cls, name):
current_name = await cls.current_recording()
if current_name:
raise RuntimeError(f"already recording {current_name}")
await ctx.r.set(RECORDING_NAME, name)
self.pool.submit(self._record, name, RECORDING_NAME)
ctx.pool.submit(cls._record, name, RECORDING_NAME)
print("submitteds")

async def stop(self):
current_name = await self.current_recording()
@classmethod
async def stop(cls):
current_name = await cls.current_recording()
print('stop')
if not current_name:
raise RuntimeError("not recording")
Expand Down
28 changes: 28 additions & 0 deletions redis_streamer/sample.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
query ls {
recordings {
name
startTime
endTime
}
currentRecording
}

mutation start {
startRecording
}

mutation stop {
stopRecording
}

mutation rename {
renameRecording(name: "aaaa")
}

mutation delete {
deleteRecording(name: "aaaa")
}

mutation delete_all {
deleteAllRecordings
}

0 comments on commit 06b594c

Please sign in to comment.