From 8d229d66ac0db7fe666ece28383dff91b5beab29 Mon Sep 17 00:00:00 2001 From: m5l14i11 Date: Thu, 12 Sep 2024 00:33:35 +0300 Subject: [PATCH] upd --- feed/_historical.py | 6 ++---- feed/_realtime.py | 35 ++++++++++++++++++++--------------- system/trading.py | 24 ++++++++++++------------ 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/feed/_historical.py b/feed/_historical.py index dad26348..c5bc4d27 100644 --- a/feed/_historical.py +++ b/feed/_historical.py @@ -92,12 +92,10 @@ async def on_receive(self, msg: StartHistoricalFeed): await asyncio.gather(producer, consumer) async def _producer(self, msg: StartHistoricalFeed): - symbol, timeframe = msg.symbol, msg.timeframe - async with AsyncHistoricalData( self.exchange, - symbol, - timeframe, + self.symbol, + self.timeframe, msg.in_sample, msg.out_sample, self.batch_size, diff --git a/feed/_realtime.py b/feed/_realtime.py index c0dd36c3..4cf8acbf 100644 --- a/feed/_realtime.py +++ b/feed/_realtime.py @@ -60,42 +60,47 @@ def __init__( self.queue = asyncio.Queue() self.ws = ws self.ts = ts + self.producer = None + self.consumer = None - async def on_receive(self, msg: StartRealtimeFeed): - symbol, timeframe = msg.symbol, msg.timeframe + def on_stop(self): + if self.producer: + self.producer.cancel() - producer = asyncio.create_task(self._producer(symbol, timeframe)) - consumer = asyncio.create_task(self._consumer(symbol, timeframe)) + if self.consumer: + self.consumer.cancel() - await asyncio.gather(producer, consumer) + async def on_receive(self, _msg: StartRealtimeFeed): + self.producer = asyncio.create_task(self._producer()) + self.consumer = asyncio.create_task(self._consumer()) - async def _producer(self, symbol: Symbol, timeframe: Timeframe): - async with AsyncRealTimeData(self.ws, symbol, timeframe) as stream: + async def _producer(self): + async with AsyncRealTimeData(self.ws, self.symbol, self.timeframe) as stream: async for bars in stream: await self.queue.put(bars) await self.queue.put(None) - async def _consumer(self, symbol: Symbol, timeframe: Timeframe): + async def _consumer(self): while True: bars = await self.queue.get() if bars is None: break - await self._process_bars(symbol, timeframe, bars) + await self._process_bars(bars) self.queue.task_done() - async def _process_bars( - self, symbol: Symbol, timeframe: Timeframe, bars: List[Bar] - ): + async def _process_bars(self, bars: List[Bar]): for bar in bars: if bar.closed: - await self.ts.upsert(symbol, timeframe, bar.ohlcv) + await self.ts.upsert(self.symbol, self.timeframe, bar.ohlcv) - logger.info(f"{symbol}_{timeframe}:{bar}") + logger.info(f"{self.symbol}_{self.timeframe}:{bar}") await self.tell( - NewMarketDataReceived(symbol, timeframe, bar.ohlcv, bar.closed) + NewMarketDataReceived( + self.symbol, self.timeframe, bar.ohlcv, bar.closed + ) ) diff --git a/system/trading.py b/system/trading.py index a026e0fd..abf064ff 100644 --- a/system/trading.py +++ b/system/trading.py @@ -49,8 +49,8 @@ def __init__( exchange_type: ExchangeType, ): super().__init__() - self.active_strategy = [] - self.next_strategy = defaultdict(list) + self.active_strategy = set() + self.next_strategy = defaultdict(set) self.event_queue = asyncio.Queue() self.state = SystemState.IDLE self.config = config_service.get("system") @@ -70,9 +70,7 @@ async def _deploy_strategy(self, event: DeployStrategy): ) for symbol, timeframe, strategy in event.strategy: - self.next_strategy[(symbol, timeframe)].append( - (symbol, timeframe, strategy) - ) + self.next_strategy[(symbol, timeframe)].add((symbol, timeframe, strategy)) await self.event_queue.put(Event.CHANGE) @@ -113,7 +111,7 @@ async def _handle_state(self): await state_handlers[self.state]() async def _run_pretrading(self): - signal_actors = defaultdict(list) + signal_actors = defaultdict(set) for _, strategies in self.next_strategy.items(): for symbol, timeframe, strategy in strategies: @@ -139,7 +137,7 @@ async def _run_pretrading(self): await asyncio.sleep(1.0) - signal_actors[(symbol, timeframe)].append(signal_actor) + signal_actors[(symbol, timeframe)].add(signal_actor) for (symbol, timeframe), _ in self.next_strategy.items(): await self.execute( @@ -168,14 +166,16 @@ async def _run_pretrading(self): ), ) - self.active_strategy.append(actors) + self.active_strategy.add(actors) await self.event_queue.put(Event.TRADING) async def _run_trading(self): logger.info("Start trading") - for actors in self.active_strategy: - await self.execute(StartRealtimeFeed(actors[0].symbol, actors[0].timeframe)) - - logger.info(f"Started feed: {actors[0].symbol}_{actors[0].timeframe}") + await asyncio.gather( + *[ + self.execute(StartRealtimeFeed(actor[0].symbol, actor[0].timeframe)) + for actor in self.active_strategy + ] + )