From 3cb32b624f40584a318c16bac88195156aa0f9e7 Mon Sep 17 00:00:00 2001 From: m5l14i11 Date: Wed, 11 Sep 2024 23:17:51 +0300 Subject: [PATCH] upd --- feed/_realtime.py | 55 +++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/feed/_realtime.py b/feed/_realtime.py index 5ad4ce4a..c0dd36c3 100644 --- a/feed/_realtime.py +++ b/feed/_realtime.py @@ -1,11 +1,13 @@ import asyncio import logging +from typing import List from core.actors import StrategyActor from core.commands.feed import StartRealtimeFeed from core.events.ohlcv import NewMarketDataReceived from core.interfaces.abstract_timeseries import AbstractTimeSeriesService from core.interfaces.abstract_ws import AbstractWS +from core.models.bar import Bar from core.models.symbol import Symbol from core.models.timeframe import Timeframe @@ -52,33 +54,48 @@ def __init__( symbol: Symbol, timeframe: Timeframe, ws: AbstractWS, - ts_service: AbstractTimeSeriesService, + ts: AbstractTimeSeriesService, ): super().__init__(symbol, timeframe) + self.queue = asyncio.Queue() self.ws = ws - self.task = None - self.ts_service = ts_service - - def on_stop(self): - if self.task: - self.task.cancel() - - asyncio.create_task(self.ws.unsubscribe(self.symbol, self.timeframe)) + self.ts = ts async def on_receive(self, msg: StartRealtimeFeed): - self.task = asyncio.create_task(self._run_realtime_feed(msg)) - - async def _run_realtime_feed(self, msg: StartRealtimeFeed): symbol, timeframe = msg.symbol, msg.timeframe + producer = asyncio.create_task(self._producer(symbol, timeframe)) + consumer = asyncio.create_task(self._consumer(symbol, timeframe)) + + await asyncio.gather(producer, consumer) + + async def _producer(self, symbol: Symbol, timeframe: Timeframe): async with AsyncRealTimeData(self.ws, symbol, timeframe) as stream: async for bars in stream: - for bar in bars: - if bar.closed: - logger.info(f"{symbol}_{timeframe}:{bar}") + await self.queue.put(bars) + + await self.queue.put(None) + + async def _consumer(self, symbol: Symbol, timeframe: Timeframe): + while True: + bars = await self.queue.get() + + if bars is None: + break + + await self._process_bars(symbol, timeframe, bars) + + self.queue.task_done() + + async def _process_bars( + self, symbol: Symbol, timeframe: Timeframe, bars: List[Bar] + ): + for bar in bars: + if bar.closed: + await self.ts.upsert(symbol, timeframe, bar.ohlcv) - await self.ts_service.upsert(symbol, timeframe, bar.ohlcv) + logger.info(f"{symbol}_{timeframe}:{bar}") - await self.tell( - NewMarketDataReceived(symbol, timeframe, bar.ohlcv, bar.closed) - ) + await self.tell( + NewMarketDataReceived(symbol, timeframe, bar.ohlcv, bar.closed) + )