diff --git a/redis_streamer/agents/__init__.py b/redis_streamer/agents/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/redis_streamer/agents/record.py b/redis_streamer/agents/record.py deleted file mode 100644 index 27ba184..0000000 --- a/redis_streamer/agents/record.py +++ /dev/null @@ -1,256 +0,0 @@ -# Class to record all desired redis activity - -import base64 -import os -import re -import time -import json -import tqdm -import codecs -import redis - -from mcap.writer import Writer -from redis.client import Monitor as Monitor_ - -from redis_streamer import Agent, ctx, utils - - -class Monitor(Monitor_): - monitor_re = re.compile(rb"\[(\d+) (.*?)\] (.*)") - command_re = re.compile(rb'"(.*?(? stream_refresh): - keys = {x.decode('utf-8') async for x in ctx.r.scan_iter(match=stream_ids or None, _type='stream')} - for k in keys - set(cursor): - cursor[k] = '$' - t0s = t1s - - # read data from redis and write to file - results, cursor = await agent.read(cursor, latest=False, count=1, block=5000) - for sid, xs in results: - rec.open_writer(record_name, sid) - for t, x in xs: - pbar.set_description(f'{sid} {t}') - rec.write(utils.parse_epoch_time(t), sid, **{k.decode(): base64.b64encode(v).decode() for k, v in x.items()}) - pbar.update() - - - -def watch(): - with redis.Redis(host='localhost', port=6379, db=0) as r: - with Monitor(r.connection_pool) as m: - for data in m.listen(): - cmd_items = data['args'] - print(*[x[:30] for x in cmd_items], max(len(x) for x in cmd_items)) - - -def read(fname): - from mcap.reader import make_reader - with open(fname, "rb") as f: - reader = make_reader(f) - print(reader.get_header()) - # summary = reader.get_summary() - # print(summary) - for schema, channel, message in reader.iter_messages(): - print(f"{channel.topic} ({schema.name}): {message.data}") - - -if __name__ == '__main__': - import fire - fire.Fire({ - "monitor": record_monitor, - "streams": record_streams, - 'watch': watch, - "read": read, - }) \ No newline at end of file diff --git a/redis_streamer/agents/replay.py b/redis_streamer/agents/replay.py deleted file mode 100644 index d63ad2b..0000000 --- a/redis_streamer/agents/replay.py +++ /dev/null @@ -1,45 +0,0 @@ -import time -import json -import tqdm -import base64 -import redis -from mcap.reader import make_reader - - -def read(fname): - with open(fname, "rb") as f: - reader = make_reader(f) - print(reader.get_header()) - for schema, channel, message in reader.iter_messages(): - print(f"{channel.topic} ({schema.name}): {message.data}") - - -def replay(fname, host='localhost', port=6379, db=0, realtime=True, speed_fudge=1): - r = redis.Redis(host=host, port=port, db=db) - - pbar = tqdm.tqdm() - t0 = 0 - with open(fname, "rb") as f: - reader = make_reader(f) - print(reader.get_header()) - for schema, channel, message in reader.iter_messages(): - args = json.loads(message.data)['cmd'] - args = [base64.b64decode(x.encode()) for x in args] - t1 = message.publish_time*10e-9 * 10e-3 # for some reason it's -12 not -9?? - - # real-time - if realtime: - delay = max(t1 - t0, 0) - if delay > 1: - pbar.set_description(f'sleeping for {delay}s {t1} {args[0]} - {t0}') - if t0 and delay: - time.sleep(delay/speed_fudge) # FIXME recalc speed accounting for lost time. /speed_fudge is a quick fix. - t0 = t1 - - pbar.update() - pbar.set_description(f'{t1:.3f} {b" ".join(args[:2])}') - r.execute_command(*args) - -if __name__ == '__main__': - import fire - fire.Fire(replay) \ No newline at end of file diff --git a/redis_streamer/cli.py b/redis_streamer/cli.py index ed88f4c..712e3b1 100644 --- a/redis_streamer/cli.py +++ b/redis_streamer/cli.py @@ -66,21 +66,21 @@ def ls(self, *fields): } }''', {'fields': fields}) - def record(self, name): - return self.cmd('set', 'RECORD:NAME', name) + # def record(self, name): + # return self.cmd('set', 'RECORD:NAME', name) - def stop_record(self): - return self.cmd('del', 'RECORD:NAME') + # def stop_record(self): + # return self.cmd('del', 'RECORD:NAME') - def xrecord(self, name): - return self.cmd('XADD', 'XRECORD:NAME', "MAXLEN", "~", 100, '*', 'd', '"asdf"') + # def xrecord(self, name): + # return self.cmd('XADD', 'XRECORD:NAME', "MAXLEN", "~", 100, '*', 'd', name) - def stop_xrecord(self): - return self.cmd('XADD', 'XRECORD:NAME', "MAXLEN", "~", 100, '*', 'd', '') + # def stop_xrecord(self): + # return self.cmd('XADD', 'XRECORD:NAME', "MAXLEN", "~", 100, '*', 'd', '') def cmd(self, *cmd): print(cmd)#/{shlex.join(map(str, cmd))} - return self.sess.put(self.asurl('cmd'), json={'cmd': cmd}).text + return self.sess.put(self.asurl('redis'), json={'cmd': cmd}).text def last(self, key): return self.cmd('xrevrange', key, '+', '-', 'COUNT', '1') @@ -109,24 +109,25 @@ async def pull_raw(self, sid, **kw): # ---------------------------------------------------------------------------- # @async2sync - async def push_image(self, sid, shape=(100, 100, 3), fps=100, **kw): + async def push_image(self, sid, size=700, shape=None, fps=None, **kw): + # import pyinstrument + # p=pyinstrument.Profiler() + # try: + # with p: t0 = time.time() - + shape = shape or (size, size, 3) + print(f"Pushing image with shape: {shape}") + im = np.random.randint(0, 255, size=shape).astype('uint8') + im = format_image(im) async with self.push_connect_async(sid, **kw) as ws: while True: - # im = np.random.randint(0, 255, size=shape).astype('uint8') - im = np.zeros(shape, dtype=np.uint8) - im = format_image(im) await ws.send_data(im) - # print(im) - # print(len(im)) - # print(im[:30], im[-30:]) - # print(len(im)) - # input() if fps: t1 = time.time() await asyncio.sleep(max(0, 1/fps-(t1 - t0))) t0 = t1 + # finally: + # p.print() @async2sync async def pull_image(self, sid, **kw): @@ -149,9 +150,6 @@ async def pull_video(self, sid, fps=30, **kw): header, entries = await ws.recv_data() entries = unpack_entries(header, entries) for sid, t, data in entries: - # data = data.decode('unicode_escape').encode() - # data = data.replace(b'\\\\', b'\\') - print(sid, t, len(data)) im = load_image(data) if s is None: s = sv.VideoSink(f'{sid}.mp4', video_info=sv.VideoInfo(width=im.shape[1], height=im.shape[0], fps=fps))