Skip to content

Commit

Permalink
Refine Betfair client reconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 29, 2025
1 parent 55bae96 commit bda5198
Showing 1 changed file with 21 additions and 26 deletions.
47 changes: 21 additions & 26 deletions nautilus_trader/adapters/betfair/sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,11 @@ async def connect(self):
self._client = await SocketClient.connect(
config,
None,
# self.post_connection, # this method itself needs the `self._client` reference
self.post_reconnection,
None,
# TODO - waiting for async handling
# self.post_connection,
# self.post_reconnection,
# self.post_disconnection,
self.post_disconnection,
)
self._log.debug("Running post connect")
await self.post_connection()
await self._post_connection()

self.is_connected = True
self._log.info("Connected")
Expand All @@ -108,9 +104,6 @@ async def reconnect(self):

await self._client.reconnect()

self._log.debug("Running post connection")
await self.post_connection()

self.is_connected = True
self._log.info("Reconnected")

Expand All @@ -124,13 +117,13 @@ async def disconnect(self):

await self._client.close()

self._log.debug("Running post disconnect")
await self.post_disconnection()

self.is_connected = False
self._log.info("Disconnected")

async def post_connection(self) -> None:
async def _post_connection(self) -> None:
pass

def post_connection(self) -> None:
"""
Actions to be performed post connection.
"""
Expand All @@ -140,13 +133,12 @@ def post_reconnection(self) -> None:
Actions to be performed post connection.
"""

async def post_disconnection(self) -> None:
def post_disconnection(self) -> None:
"""
Actions to be performed post disconnection.
"""

async def send(self, message: bytes) -> None:
self._log.debug(f"[SEND] {message.decode()}")
if self._client is None:
raise RuntimeError("Cannot send message: no client")

Expand Down Expand Up @@ -201,8 +193,13 @@ def __init__(
"partitionMatchedByStrategyRef": partition_matched_by_strategy_ref,
}

async def post_connection(self):
await super().post_connection()
def post_connection(self):
self._loop.create_task(self._post_connection())

def post_reconnection(self):
self._loop.create_task(self._post_connection())

async def _post_connection(self):
subscribe_msg = {
"op": "orderSubscription",
"id": self.unique_id,
Expand All @@ -213,10 +210,6 @@ async def post_connection(self):
await self.send(msgspec.json.encode(self.auth_message()))
await self.send(msgspec.json.encode(subscribe_msg))

def post_reconnection(self):
super().post_reconnection()
self._loop.create_task(self.post_connection())


class BetfairMarketStreamClient(BetfairStreamClient):
"""
Expand Down Expand Up @@ -319,10 +312,12 @@ async def send_subscription_message(
}
await self.send(msgspec.json.encode(message))

async def post_connection(self) -> None:
await super().post_connection()
await self.send(msgspec.json.encode(self.auth_message()))
def post_connection(self) -> None:
self._loop.create_task(self._post_connection())

def post_reconnection(self) -> None:
super().post_reconnection()
self._loop.create_task(self.post_connection())
self._loop.create_task(self._post_connection())

async def _post_connection(self) -> None:
await self.send(msgspec.json.encode(self.auth_message()))

0 comments on commit bda5198

Please sign in to comment.