diff --git a/copilot/_actor.py b/copilot/_actor.py index 90f7667a..8cdae302 100644 --- a/copilot/_actor.py +++ b/copilot/_actor.py @@ -1,9 +1,9 @@ import asyncio import logging +import re from typing import Union import numpy as np -from core.models.strategy_type import StrategyType from scipy.spatial.distance import cdist from sklearn.cluster import KMeans from sklearn.decomposition import PCA, KernelPCA @@ -25,11 +25,14 @@ from core.models.risk_type import SessionRiskType, SignalRiskType from core.models.side import PositionSide, SignalSide from core.models.signal_risk import SignalRisk +from core.models.strategy_type import StrategyType from core.queries.copilot import EvaluateSession, EvaluateSignal from ._prompt import ( signal_contrarian_risk_prompt, + signal_risk_pattern, signal_trend_risk_prompt, + system_prompt, ) CopilotEvent = Union[EvaluateSignal, EvaluateSession] @@ -127,8 +130,6 @@ def _init_centroids(self, X, random_state): class CopilotActor(BaseActor, EventHandlerMixin): - _EVENTS = [EvaluateSignal, EvaluateSession] - def __init__(self, llm: AbstractLLMService): super().__init__() EventHandlerMixin.__init__(self) @@ -171,7 +172,11 @@ async def _evaluate_signal(self, msg: EvaluateSignal) -> SignalRisk: ) bar = sorted(prev_bar + [curr_bar], key=lambda x: x.timestamp) - strategy_type = StrategyType.CONTRARIAN if "SUP" not in str(signal.strategy) else StrategyType.TREND_FOLLOW + strategy_type = ( + StrategyType.CONTRARIAN + if "SUP" not in str(signal.strategy) + else StrategyType.TREND_FOLLOW + ) template = ( signal_contrarian_risk_prompt diff --git a/core/actors/_base_actor.py b/core/actors/_base_actor.py index 641170ab..ea8793a0 100644 --- a/core/actors/_base_actor.py +++ b/core/actors/_base_actor.py @@ -1,4 +1,6 @@ +import inspect import uuid +from typing import Union, get_args, get_origin from core.commands.base import Command from core.interfaces.abstract_actor import AbstractActor, Ask, Message @@ -7,13 +9,12 @@ class BaseActor(AbstractActor): - _EVENTS = [] - def __init__(self): super().__init__() self._running = False self._mailbox = EventDispatcher() self._id = str(uuid.uuid4()) + self._EVENTS = self._discover_events() @property def id(self): @@ -67,3 +68,21 @@ def _register_events(self): def _unregister_events(self): for event in self._EVENTS: self._mailbox.unregister(event, self.on_receive) + + def _discover_events(self): + sig = inspect.signature(self.on_receive) + params = list(sig.parameters.values()) + + if len(params) < 1: + return [] + + event_type = params[0].annotation + + events = [] + + if get_origin(event_type) is Union: + events = get_args(event_type) + else: + events = [event_type] + + return list(set(events)) diff --git a/core/actors/_strategy_actor.py b/core/actors/_strategy_actor.py index 9847f9e4..08afbb9d 100644 --- a/core/actors/_strategy_actor.py +++ b/core/actors/_strategy_actor.py @@ -6,8 +6,6 @@ class StrategyActor(BaseActor): - _EVENTS = [] - def __init__(self, symbol: Symbol, timeframe: Timeframe): super().__init__() self._symbol = symbol diff --git a/core/events/position.py b/core/events/position.py index c2813f8f..d5372d77 100644 --- a/core/events/position.py +++ b/core/events/position.py @@ -54,7 +54,7 @@ class BrokerPositionOpened(PositionEvent): @dataclass(frozen=True) -class BrokerPositionAdjusted(PositionEvent): +class BrokerPositionReduced(PositionEvent): pass diff --git a/core/models/strategy_type.py b/core/models/strategy_type.py index d0f1947c..0103434d 100644 --- a/core/models/strategy_type.py +++ b/core/models/strategy_type.py @@ -1,6 +1,4 @@ - - -from enum import Enum, auto +from enum import Enum class StrategyType(Enum): @@ -8,4 +6,4 @@ class StrategyType(Enum): CONTRARIAN = "Contrarian" def __str__(self): - return self.value \ No newline at end of file + return self.value diff --git a/core/queries/broker.py b/core/queries/broker.py index 687e5749..b5f9f159 100644 --- a/core/queries/broker.py +++ b/core/queries/broker.py @@ -2,6 +2,7 @@ from typing import List from core.events.base import EventMeta +from core.models.position import Position from core.models.symbol import Symbol from .base import Query, QueryGroup @@ -22,3 +23,12 @@ class GetSymbol(Query[Symbol]): default_factory=lambda: EventMeta(priority=3, group=QueryGroup.broker), init=False, ) + + +@dataclass(frozen=True) +class HasPosition(Query[bool]): + position: Position + meta: EventMeta = field( + default_factory=lambda: EventMeta(priority=3, group=QueryGroup.broker), + init=False, + ) diff --git a/executor/_market_actor.py b/executor/_market_actor.py index 808a7104..b8d90560 100644 --- a/executor/_market_actor.py +++ b/executor/_market_actor.py @@ -12,6 +12,7 @@ from core.mixins import EventHandlerMixin from core.models.symbol import Symbol from core.models.timeframe import Timeframe +from core.queries.broker import HasPosition from core.queries.position import GetClosePosition, GetOpenPosition logger = logging.getLogger(__name__) @@ -21,8 +22,6 @@ class MarketOrderActor(StrategyActor, EventHandlerMixin): - _EVENTS = [PositionInitialized, PositionCloseRequested] - def __init__(self, symbol: Symbol, timeframe: Timeframe): super().__init__(symbol, timeframe) EventHandlerMixin.__init__(self) @@ -58,7 +57,8 @@ async def _close_position(self, event: PositionCloseRequested): logger.debug(f"To Close Position: {current_position}") - await self.ask(ClosePosition(current_position)) + while await self.ask(HasPosition(current_position)): + await self.ask(ClosePosition(current_position)) order = await self.ask(GetClosePosition(current_position)) diff --git a/executor/_paper_actor.py b/executor/_paper_actor.py index 4db9a55e..813e2007 100644 --- a/executor/_paper_actor.py +++ b/executor/_paper_actor.py @@ -29,11 +29,6 @@ class PriceDirection(Enum): class PaperOrderActor(StrategyActor, EventHandlerMixin): - _EVENTS = [ - PositionInitialized, - PositionCloseRequested, - ] - def __init__(self, symbol: Symbol, timeframe: Timeframe): super().__init__(symbol, timeframe) EventHandlerMixin.__init__(self) diff --git a/feed/_historical.py b/feed/_historical.py index 25d1cd39..4010a8f3 100644 --- a/feed/_historical.py +++ b/feed/_historical.py @@ -71,8 +71,6 @@ def _next_item_or_end(self): class HistoricalActor(StrategyActor): - _EVENTS = [StartHistoricalFeed] - def __init__( self, symbol: Symbol, diff --git a/feed/_realtime.py b/feed/_realtime.py index 0f7274e0..3d3e28bf 100644 --- a/feed/_realtime.py +++ b/feed/_realtime.py @@ -46,8 +46,6 @@ async def __anext__(self): class RealtimeActor(StrategyActor): - _EVENTS = [StartRealtimeFeed] - def __init__( self, symbol: Symbol, diff --git a/market/_actor.py b/market/_actor.py index 1b3960e2..68e57639 100644 --- a/market/_actor.py +++ b/market/_actor.py @@ -11,8 +11,6 @@ class MarketActor(BaseActor, EventHandlerMixin): - _EVENTS = [NextBar, PrevBar, TA, BackNBars] - def __init__(self, ts: AbstractTimeSeriesService): super().__init__() EventHandlerMixin.__init__(self) diff --git a/position/_actor.py b/position/_actor.py index c42be477..48c9e953 100644 --- a/position/_actor.py +++ b/position/_actor.py @@ -45,15 +45,6 @@ class PositionActor(StrategyActor): - _EVENTS = [ - GoLongSignalReceived, - GoShortSignalReceived, - BrokerPositionOpened, - BrokerPositionClosed, - RiskThresholdBreached, - BacktestEnded, - ] - def __init__( self, symbol: Symbol, @@ -69,7 +60,7 @@ def __init__( self.state = PositionStorage() self.config = config_service.get("position") - async def on_receive(self, event): + async def on_receive(self, event: PositionEvent): symbol, _ = self._get_event_key(event) if hasattr(event, "position"): @@ -103,7 +94,10 @@ async def create_and_store_position(event: SignalEvent): EvaluateSignal(event.signal, back_bars, ta) ) - if signal_risk_level.type in {SignalRiskType.VERY_HIGH}: + if signal_risk_level.type in { + SignalRiskType.UNKNOWN, + SignalRiskType.VERY_HIGH, + }: return False position = await self.position_factory.create( diff --git a/position/_sm.py b/position/_sm.py index e5d793ce..972dc1e9 100644 --- a/position/_sm.py +++ b/position/_sm.py @@ -5,7 +5,6 @@ from core.events.backtest import BacktestEnded from core.events.position import ( - BrokerPositionAdjusted, BrokerPositionClosed, BrokerPositionOpened, ) @@ -29,7 +28,6 @@ class PositionState(Enum): PositionEvent = Union[ BrokerPositionOpened, - BrokerPositionAdjusted, BrokerPositionClosed, GoLongSignalReceived, GoShortSignalReceived, @@ -57,10 +55,6 @@ class PositionState(Enum): PositionState.CLOSE, "handle_backtest", ), - (PositionState.OPENED, BrokerPositionAdjusted): ( - PositionState.OPENED, - "handle_position_adjusted", - ), (PositionState.OPENED, RiskThresholdBreached): ( PositionState.CLOSE, "handle_exit_received", @@ -92,10 +86,6 @@ class PositionState(Enum): PositionState.CLOSE, "handle_backtest", ), - (PositionState.OPENED, BrokerPositionAdjusted): ( - PositionState.OPENED, - "handle_position_adjusted", - ), (PositionState.OPENED, RiskThresholdBreached): ( PositionState.CLOSE, "handle_exit_received", diff --git a/risk/_actor.py b/risk/_actor.py index 308b9681..6d66e7fb 100644 --- a/risk/_actor.py +++ b/risk/_actor.py @@ -63,16 +63,6 @@ def _ema(values, alpha=0.1): class RiskActor(StrategyActor, EventHandlerMixin): - _EVENTS = [ - NewMarketDataReceived, - PositionOpened, - PositionClosed, - ExitLongSignalReceived, - ExitShortSignalReceived, - GoLongSignalReceived, - GoShortSignalReceived, - ] - def __init__( self, symbol: Symbol, timeframe: Timeframe, config_service: AbstractConfig ): diff --git a/sor/_router.py b/sor/_router.py index 583aef9f..fdfd89c6 100644 --- a/sor/_router.py +++ b/sor/_router.py @@ -14,7 +14,7 @@ from core.models.order import Order, OrderStatus from core.models.side import PositionSide from core.queries.account import GetBalance -from core.queries.broker import GetSymbol, GetSymbols +from core.queries.broker import GetSymbol, GetSymbols, HasPosition from core.queries.position import GetClosePosition, GetOpenPosition from ._twap import TWAP @@ -84,6 +84,19 @@ def get_symbol(self, query: GetSymbol): def get_account_balance(self, query: GetBalance): return self.exchange.fetch_account_balance(query.currency) + @query_handler(HasPosition) + def has_position(self, query: HasPosition): + position = query.position + symbol = position.signal.symbol + + existing_position = self.exchange.fetch_position(symbol, position.side) + + if existing_position: + logging.info(f"Position for {symbol} on {position.side} already exists") + return True + + return False + @command_handler(UpdateSettings) def update_symbol_settings(self, command: UpdateSettings): self.exchange.update_symbol_settings( @@ -93,21 +106,18 @@ def update_symbol_settings(self, command: UpdateSettings): @command_handler(OpenPosition) async def open_position(self, command: OpenPosition): position = command.position - - logger.info(f"Try to open position: {position}") - symbol = position.signal.symbol - stop_loss = position.stop_loss - pending_order = position.entry_order() - entry_price = pending_order.price - size = pending_order.size + logger.info(f"Try to open position: {position}") if self.exchange.fetch_position(symbol, position.side): logging.info("Position already exists") return - distance_to_stop_loss = abs(entry_price - stop_loss) + pending_order = position.entry_order() + + entry_price = pending_order.price + size = 2 * pending_order.size num_orders = min( max(1, int(size / symbol.min_position_size)), self.config["max_order_slice"] @@ -116,11 +126,14 @@ async def open_position(self, command: OpenPosition): order_counter = 0 num_order_breach = 0 order_timestamps = {} + exp_time = self.config["order_expiration_time"] async for bid, ask in self.algo_price.next_value(symbol, self.exchange): price = ask if position.side == PositionSide.LONG else bid + stop_loss = position.stop_loss current_distance_to_stop_loss = abs(stop_loss - price) + distance_to_stop_loss = abs(entry_price - stop_loss) threshold_breach = ( self.config["stop_loss_threshold"] * distance_to_stop_loss @@ -156,7 +169,7 @@ async def open_position(self, command: OpenPosition): expired_orders = [ order_id for order_id, timestamp in order_timestamps.items() - if curr_time - timestamp > self.config["order_expiration_time"] + if curr_time - timestamp > exp_time ] for order_id in expired_orders: @@ -187,14 +200,15 @@ async def open_position(self, command: OpenPosition): @command_handler(ClosePosition) async def close_position(self, command: ClosePosition): position = command.position + symbol = position.signal.symbol + position_side = position.side - if not self.exchange.fetch_position(symbol, position.side): + if not self.exchange.fetch_position(symbol, position_side): logging.info("Position is not existed") return exit_order = position.exit_order() - position_side = position.side num_orders = min( max(1, int(exit_order.size / symbol.min_position_size)), @@ -204,6 +218,7 @@ async def close_position(self, command: ClosePosition): order_counter = 0 order_timestamps = {} max_spread = float("-inf") + exp_time = self.config["order_expiration_time"] async for bid, ask in self.algo_price.next_value(symbol, self.exchange): if not self.exchange.fetch_position(symbol, position_side): @@ -226,7 +241,7 @@ async def close_position(self, command: ClosePosition): expired_orders = [ order_id for order_id, timestamp in order_timestamps.items() - if curr_time - timestamp > self.config["order_expiration_time"] + if curr_time - timestamp > exp_time ] for order_id in expired_orders: @@ -252,14 +267,5 @@ async def close_position(self, command: ClosePosition): if order_id: order_timestamps[order_id] = time.time() - if spread < max_spread and not len(order_timestamps.keys()): - if num_orders > 2: - self.exchange.close_half_position(symbol, position_side) - else: - self.exchange.close_full_position(symbol, position_side) - for order_id in list(order_timestamps.keys()): self.exchange.cancel_order(order_id, symbol) - - if self.exchange.fetch_position(symbol, position_side): - self.exchange.close_full_position(symbol, position_side) diff --git a/strategy/_actor.py b/strategy/_actor.py index 674b9b33..a69198c0 100644 --- a/strategy/_actor.py +++ b/strategy/_actor.py @@ -16,8 +16,6 @@ class SignalActor(StrategyActor): - _EVENTS = [NewMarketDataReceived] - def __init__( self, symbol: Symbol,