Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 11, 2024
1 parent 2fa8408 commit 3cb32b6
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions feed/_realtime.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
import logging
from typing import List

from core.actors import StrategyActor
from core.commands.feed import StartRealtimeFeed
from core.events.ohlcv import NewMarketDataReceived
from core.interfaces.abstract_timeseries import AbstractTimeSeriesService
from core.interfaces.abstract_ws import AbstractWS
from core.models.bar import Bar
from core.models.symbol import Symbol
from core.models.timeframe import Timeframe

Expand Down Expand Up @@ -52,33 +54,48 @@ def __init__(
symbol: Symbol,
timeframe: Timeframe,
ws: AbstractWS,
ts_service: AbstractTimeSeriesService,
ts: AbstractTimeSeriesService,
):
super().__init__(symbol, timeframe)
self.queue = asyncio.Queue()
self.ws = ws
self.task = None
self.ts_service = ts_service

def on_stop(self):
if self.task:
self.task.cancel()

asyncio.create_task(self.ws.unsubscribe(self.symbol, self.timeframe))
self.ts = ts

async def on_receive(self, msg: StartRealtimeFeed):
self.task = asyncio.create_task(self._run_realtime_feed(msg))

async def _run_realtime_feed(self, msg: StartRealtimeFeed):
symbol, timeframe = msg.symbol, msg.timeframe

producer = asyncio.create_task(self._producer(symbol, timeframe))
consumer = asyncio.create_task(self._consumer(symbol, timeframe))

await asyncio.gather(producer, consumer)

async def _producer(self, symbol: Symbol, timeframe: Timeframe):
async with AsyncRealTimeData(self.ws, symbol, timeframe) as stream:
async for bars in stream:
for bar in bars:
if bar.closed:
logger.info(f"{symbol}_{timeframe}:{bar}")
await self.queue.put(bars)

await self.queue.put(None)

async def _consumer(self, symbol: Symbol, timeframe: Timeframe):
while True:
bars = await self.queue.get()

if bars is None:
break

await self._process_bars(symbol, timeframe, bars)

self.queue.task_done()

async def _process_bars(
self, symbol: Symbol, timeframe: Timeframe, bars: List[Bar]
):
for bar in bars:
if bar.closed:
await self.ts.upsert(symbol, timeframe, bar.ohlcv)

await self.ts_service.upsert(symbol, timeframe, bar.ohlcv)
logger.info(f"{symbol}_{timeframe}:{bar}")

await self.tell(
NewMarketDataReceived(symbol, timeframe, bar.ohlcv, bar.closed)
)
await self.tell(
NewMarketDataReceived(symbol, timeframe, bar.ohlcv, bar.closed)
)

0 comments on commit 3cb32b6

Please sign in to comment.