Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
m5l14i11 committed Sep 11, 2024
1 parent 3cb32b6 commit 8d229d6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 31 deletions.
6 changes: 2 additions & 4 deletions feed/_historical.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,10 @@ async def on_receive(self, msg: StartHistoricalFeed):
await asyncio.gather(producer, consumer)

async def _producer(self, msg: StartHistoricalFeed):
symbol, timeframe = msg.symbol, msg.timeframe

async with AsyncHistoricalData(
self.exchange,
symbol,
timeframe,
self.symbol,
self.timeframe,
msg.in_sample,
msg.out_sample,
self.batch_size,
Expand Down
35 changes: 20 additions & 15 deletions feed/_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,42 +60,47 @@ def __init__(
self.queue = asyncio.Queue()
self.ws = ws
self.ts = ts
self.producer = None
self.consumer = None

async def on_receive(self, msg: StartRealtimeFeed):
symbol, timeframe = msg.symbol, msg.timeframe
def on_stop(self):
if self.producer:
self.producer.cancel()

producer = asyncio.create_task(self._producer(symbol, timeframe))
consumer = asyncio.create_task(self._consumer(symbol, timeframe))
if self.consumer:
self.consumer.cancel()

await asyncio.gather(producer, consumer)
async def on_receive(self, _msg: StartRealtimeFeed):
self.producer = asyncio.create_task(self._producer())
self.consumer = asyncio.create_task(self._consumer())

async def _producer(self, symbol: Symbol, timeframe: Timeframe):
async with AsyncRealTimeData(self.ws, symbol, timeframe) as stream:
async def _producer(self):
async with AsyncRealTimeData(self.ws, self.symbol, self.timeframe) as stream:
async for bars in stream:
await self.queue.put(bars)

await self.queue.put(None)

async def _consumer(self, symbol: Symbol, timeframe: Timeframe):
async def _consumer(self):
while True:
bars = await self.queue.get()

if bars is None:
break

await self._process_bars(symbol, timeframe, bars)
await self._process_bars(bars)

self.queue.task_done()

async def _process_bars(
self, symbol: Symbol, timeframe: Timeframe, bars: List[Bar]
):
async def _process_bars(self, bars: List[Bar]):
for bar in bars:
if bar.closed:
await self.ts.upsert(symbol, timeframe, bar.ohlcv)
await self.ts.upsert(self.symbol, self.timeframe, bar.ohlcv)

logger.info(f"{symbol}_{timeframe}:{bar}")
logger.info(f"{self.symbol}_{self.timeframe}:{bar}")

await self.tell(
NewMarketDataReceived(symbol, timeframe, bar.ohlcv, bar.closed)
NewMarketDataReceived(
self.symbol, self.timeframe, bar.ohlcv, bar.closed
)
)
24 changes: 12 additions & 12 deletions system/trading.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(
exchange_type: ExchangeType,
):
super().__init__()
self.active_strategy = []
self.next_strategy = defaultdict(list)
self.active_strategy = set()
self.next_strategy = defaultdict(set)
self.event_queue = asyncio.Queue()
self.state = SystemState.IDLE
self.config = config_service.get("system")
Expand All @@ -70,9 +70,7 @@ async def _deploy_strategy(self, event: DeployStrategy):
)

for symbol, timeframe, strategy in event.strategy:
self.next_strategy[(symbol, timeframe)].append(
(symbol, timeframe, strategy)
)
self.next_strategy[(symbol, timeframe)].add((symbol, timeframe, strategy))

await self.event_queue.put(Event.CHANGE)

Expand Down Expand Up @@ -113,7 +111,7 @@ async def _handle_state(self):
await state_handlers[self.state]()

async def _run_pretrading(self):
signal_actors = defaultdict(list)
signal_actors = defaultdict(set)

for _, strategies in self.next_strategy.items():
for symbol, timeframe, strategy in strategies:
Expand All @@ -139,7 +137,7 @@ async def _run_pretrading(self):

await asyncio.sleep(1.0)

signal_actors[(symbol, timeframe)].append(signal_actor)
signal_actors[(symbol, timeframe)].add(signal_actor)

for (symbol, timeframe), _ in self.next_strategy.items():
await self.execute(
Expand Down Expand Up @@ -168,14 +166,16 @@ async def _run_pretrading(self):
),
)

self.active_strategy.append(actors)
self.active_strategy.add(actors)

await self.event_queue.put(Event.TRADING)

async def _run_trading(self):
logger.info("Start trading")

for actors in self.active_strategy:
await self.execute(StartRealtimeFeed(actors[0].symbol, actors[0].timeframe))

logger.info(f"Started feed: {actors[0].symbol}_{actors[0].timeframe}")
await asyncio.gather(
*[
self.execute(StartRealtimeFeed(actor[0].symbol, actor[0].timeframe))
for actor in self.active_strategy
]
)

0 comments on commit 8d229d6

Please sign in to comment.