Skip to content

Commit

Permalink
messing around with triggering the job
Browse files Browse the repository at this point in the history
  • Loading branch information
beasteers committed Apr 25, 2023
1 parent e9f1f8f commit df235b7
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 69 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ shhhh/
# app data

app_data/
db.sqlite

# C extensions
*.so
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN pip install -r requirements.txt && rm -rf ~/.cache/pip /var/cache/apt/
ENV PYTHONPATH "${PYTHONPATH}:/src"

ADD ./redis_streamer /src/redis_streamer
RUN mkdir -p ./data

# ENTRYPOINT [ "uvicorn" ]
# CMD ["redis_streamer.main:app", "--host", "0.0.0.0", "--reload"]
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ services:
- 8000:8000
volumes:
- ./redis_streamer:/src/redis_streamer
- ./data:/src/redis_streamer/data
environment:
REDIS_URL: redis://redis:6379
# DISABLE_MULTI_DEVICE_PREFIXING: "1"
PYTHONUNBUFFERED: "1"
depends_on:
- redis
restart: unless-stopped
Expand Down
9 changes: 5 additions & 4 deletions redis_streamer/graphql_schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from ..core import ctx
from . import devices
from . import streams
from . import recorder
from ..config import ENABLE_MULTI_DEVICE_PREFIXING

if ENABLE_MULTI_DEVICE_PREFIXING:
_Query = type('_Query', (devices.Devices, streams.Streams), {})
_Mutation = type('_Mutation', (streams.StreamMutation, devices.DeviceMutation), {})
_Query = type('_Query', (devices.Devices, streams.Streams, recorder.Recordings), {})
_Mutation = type('_Mutation', (streams.StreamMutation, devices.DeviceMutation, recorder.RecordingMutation), {})
else:
_Query = type('_Query', (streams.Streams,), {})
_Mutation = type('_Mutation', (streams.StreamMutation,), {})
_Query = type('_Query', (streams.Streams, recorder.Recordings), {})
_Mutation = type('_Mutation', (streams.StreamMutation, recorder.RecordingMutation), {})

@strawberry.type
class Query(_Query):
Expand Down
58 changes: 41 additions & 17 deletions redis_streamer/graphql_schema/recorder.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,70 @@
from __future__ import annotations
import typing
import base64

import asyncio
# import ray
import strawberry
from strawberry.scalars import JSON, Base64
from strawberry_sqlalchemy_mapper import strawberry_dataclass_from_model
import orjson
from redis_streamer import utils, ctx
from . import streams
from strawberry_sqlalchemy_mapper import StrawberrySQLAlchemyMapper
from redis_streamer.config import *
from redis_streamer.models import session, RecordingModel
from redis_streamer.models import session, RecordingModel, get_recording, create_recording, end_recording, rename_recording, delete_recording
from .. import recorder

strawberry_sqlalchemy_mapper = StrawberrySQLAlchemyMapper()

# ---------------------------------------------------------------------------- #
# Queries #
# ---------------------------------------------------------------------------- #

@strawberry.type
@strawberry_dataclass_from_model(RecordingModel)
@strawberry_sqlalchemy_mapper.type(RecordingModel)
class Recording:
pass

@strawberry.type
class RecordingsQuery:
class Recordings:
@strawberry.field
def recordings(self) -> typing.List[Recording]:
return session.query(RecordingModel).all()

@strawberry.field
def recording(self, id: strawberry.ID) -> Recording:
return session.query(RecordingModel).get(id)
def recording(self, name: strawberry.ID) -> Recording:
return session.query(RecordingModel).get(name)

@strawberry.field
async def current_recording(self) -> str|None:
return await writer.current_recording()


# writer = recorder.RecordingWriter.remote()
writer = recorder.RecordingWriter()

@strawberry.type
class RecordingMutation:
@strawberry.mutation
async def start(self, device_id: str, meta: JSON) -> JSON:
return
async def start_recording(self, name: str='') -> JSON:
name = create_recording(name)
await writer.start(name)
return get_recording(name).as_dict()

@strawberry.mutation
async def stop_recording(self) -> JSON:
# name = ray.get(writer.get_current_recording_name.remote())
name = await writer.current_recording()
print(name)
if not name:
raise RuntimeError("No recording running")
print('before stop')
await writer.stop()
print('after stop')
end_recording(name)
return get_recording(name).as_dict()

@strawberry.mutation
async def stop(self, device_id: str, meta: JSON) -> JSON:
return
async def rename_recording(self, name: str, new_name: str) -> JSON:
rename_recording(name, new_name)
return get_recording(new_name).as_dict()

@strawberry.mutation
async def rename(self, device_id: str) -> JSON:
return await disconnect_device(device_id)
async def delete_recording(self, name: str) -> None:
delete_recording(name)
return
7 changes: 5 additions & 2 deletions redis_streamer/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
from sqlalchemy.orm import scoped_session, sessionmaker


engine = create_engine(os.getenv("DB_CONNECTION") or "sqlite:///")
engine = create_engine(os.getenv("DB_CONNECTION") or "sqlite:///data/db.sqlite")
session = scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=engine))


Base = declarative_base()
from .recording import RecordingModel
from .recording import *


Base.metadata.create_all(engine)
45 changes: 36 additions & 9 deletions redis_streamer/models/recording.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,46 @@
import datetime
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
from . import Base
from . import Base, session

class RecordingModel(Base):
__tablename__ = "recordings"
id: int = Column(Integer, primary_key=True, index=True)
name: str = Column(String, nullable=True)
start_time: DateTime = Column(DateTime, nullable=False)
end_time: DateTime = Column(DateTime, nullable=False)
name: str = Column(String, primary_key=True, index=True)
start_time: DateTime = Column(DateTime, nullable=True)
end_time: DateTime = Column(DateTime, nullable=True)
device_name: str = Column(String, nullable=True)

def as_dict(self):
start = self.start_time
end = self.end_time
return {
"id": self.id,
"name": self.name,
"start_time": self.start_time,
"end_time": self.end_time,
"start_time": start.isoformat() if start else None,
"end_time": end.isoformat() if end else None,
"device_name": self.device_name,
}
}



def create_recording(name=''):
start_time = datetime.datetime.now()
name = name or start_time.strftime('%Y-%m-%dT%H-%M-%S')
session.add(RecordingModel(name=name, start_time=start_time))
session.commit()
return name

def end_recording(name):
session.query(RecordingModel).filter(RecordingModel.name == name).update({'end_time': datetime.datetime.now()})
session.commit()

def get_recording(name):
return session.query(RecordingModel).get(name)

def rename_recording(old_name, new_name):
rec = session.query(RecordingModel).filter(RecordingModel.name == old_name).update({'name': new_name})
# rec.name = new_name
session.commit()

def delete_recording(name):
session.query(RecordingModel).filter(RecordingModel.name == name).delete()
session.commit()

121 changes: 85 additions & 36 deletions redis_streamer/recorder.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,94 @@
import os
import tqdm
import asyncio
import ray
# import ray
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor
import datetime
from .core import ctx, Agent
from .models import session, RecordingModel
from .models import session, RecordingModel, create_recording, end_recording
from .graphql_schema.streams import get_stream_ids

ray.init()


@ray.remote
class Recorder:
def __init__(self):
pass

async def record(self, name, prefix='', last_entry_id="$", batch=1, block=5000):
self.stopped = False

session.add(RecordingModel(name=name, start_time=datetime.datetime.now()))
session.commit()
rec_entry = session.query(RecordingModel).filter(RecordingModel.name == name).order_by(RecordingModel.start_time).last()

agent = Agent()
stream_ids = await get_stream_ids()
cursor = agent.init_cursor({s: last_entry_id for s in stream_ids})
try:
while not self.stopped:
# read data from redis
results, cursor = await agent.read(cursor, latest=False, count=batch or 1, block=block)
for sid, xs in results:
for ts, data in xs:
await writer[sid].write(data, ts)
finally:
rec_entry.end_time = datetime.datetime.now()
session.commit()

def stop(self):
self.stopped = True

def replay(self):
pass
# ray.init()

PREFIX = ':recording:current'
RECORDING_NAME = f'{PREFIX}:name'

class RecordingWriter:
def __init__(self) -> None:
self.pool = ProcessPoolExecutor()

async def current_recording(self):
return (await ctx.r.get(RECORDING_NAME) or b'').decode('utf-8') or None

async def start(self, name):
current_name = await self.current_recording()
if current_name:
raise RuntimeError(f"already recording {current_name}")
await ctx.r.set(RECORDING_NAME, name)
self.pool.submit(self._record, name, RECORDING_NAME)
print("submitteds")

async def stop(self):
current_name = await self.current_recording()
print('stop')
if not current_name:
raise RuntimeError("not recording")
print('before')
await ctx.r.delete(RECORDING_NAME)
print('after')

@classmethod
def _record(cls, name: str, key: str):
return asyncio.run(cls._record_async(name, key))

@classmethod
async def _record_async(cls, name: str, key: str):
name = name.encode()
print("started")
while await ctx.r.get(key) == name:
print(f'recording {name} :)')
await asyncio.sleep(1)
print("done")



# @ray.remote
# class RecordingWriter:
# def __init__(self):
# self.current_recording = None
# self.last_recording = None

# def get_current_recording_name(self):
# return self.current_recording

# async def record(self, name, prefix='', last_entry_id="$", batch=10):
# self.current_recording = name
# self.stopped = False
# while not self.stopped:
# print(f'recording {self.current_recording} :)')
# await asyncio.sleep(1)
# print("done")

# # agent = Agent()
# # cursor = agent.init_cursor({s: last_entry_id for s in await get_stream_ids(prefix=prefix)})
# # try:
# # while not self.stopped:
# # # read data from redis
# # results, cursor = await agent.read(cursor, count=batch or 1)
# # for sid, xs in results:
# # for ts, data in xs:
# # await writer.write(sid, data, ts)
# # finally:
# # pass

# def stop(self):
# self.stopped = True
# self.last_recording = self.current_recording
# self.current_recording = False

# def replay(self):
# pass


class Writers:
Expand Down Expand Up @@ -70,6 +118,7 @@ async def __aexit__(self, *a):
self.is_entered = False


MB = 1024*1024

class RawWriter:
raw=True
Expand Down
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ websockets
redis
orjson
gunicorn
uvicorn
uvicorn
strawberry_sqlalchemy_mapper
ray
tqdm

0 comments on commit df235b7

Please sign in to comment.