Skip to content

Commit

Permalink
Refine Betfair clients task handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 24, 2025
1 parent cdc1b83 commit 61f70c5
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 94 deletions.
5 changes: 3 additions & 2 deletions examples/live/betfair/betfair.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from nautilus_trader.adapters.betfair.config import BetfairDataClientConfig
from nautilus_trader.adapters.betfair.config import BetfairExecClientConfig
from nautilus_trader.adapters.betfair.constants import BETFAIR
from nautilus_trader.adapters.betfair.factories import BetfairLiveDataClientFactory
from nautilus_trader.adapters.betfair.factories import BetfairLiveExecClientFactory
from nautilus_trader.adapters.betfair.factories import get_cached_betfair_client
Expand Down Expand Up @@ -109,8 +110,8 @@ async def main(
node.trader.add_strategies(strategies)

# Register your client factories with the node (can take user-defined factories)
node.add_data_client_factory("BETFAIR", BetfairLiveDataClientFactory)
node.add_exec_client_factory("BETFAIR", BetfairLiveExecClientFactory)
node.add_data_client_factory(BETFAIR, BetfairLiveDataClientFactory)
node.add_exec_client_factory(BETFAIR, BetfairLiveExecClientFactory)
node.build()

try:
Expand Down
88 changes: 57 additions & 31 deletions nautilus_trader/adapters/betfair/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@

from nautilus_trader.adapters.betfair.client import BetfairHttpClient
from nautilus_trader.adapters.betfair.constants import BETFAIR_VENUE
from nautilus_trader.adapters.betfair.data_types import BetfairStartingPrice
from nautilus_trader.adapters.betfair.data_types import BetfairTicker
from nautilus_trader.adapters.betfair.data_types import BSPOrderBookDelta
from nautilus_trader.adapters.betfair.data_types import SubscriptionStatus
from nautilus_trader.adapters.betfair.parsing.common import merge_instrument_fields
from nautilus_trader.adapters.betfair.parsing.core import BetfairParser
Expand Down Expand Up @@ -55,7 +52,7 @@ class BetfairDataClient(LiveMarketDataClient):
loop : asyncio.AbstractEventLoop
The event loop for the client.
client : BetfairClient
The betfair HttpClient
The Betfair HttpClient
msgbus : MessageBus
The message bus for the client.
cache : Cache
Expand All @@ -64,11 +61,13 @@ class BetfairDataClient(LiveMarketDataClient):
The clock for the client.
instrument_provider : BetfairInstrumentProvider, optional
The instrument provider.
account_currency : Currency
The currency for the Betfair account.
keep_alive_period : int, default 36_000 (10 hours)
The keep alive period (seconds) for the socket client.
"""

custom_data_types = (BetfairTicker, BSPOrderBookDelta, BetfairStartingPrice)

def __init__(
self,
loop: asyncio.AbstractEventLoop,
Expand All @@ -89,18 +88,29 @@ def __init__(
clock=clock,
instrument_provider=instrument_provider,
)

self._instrument_provider: BetfairInstrumentProvider = instrument_provider

# Configuration
self.keep_alive_period = keep_alive_period
self._log.info(f"{keep_alive_period=}", LogColor.BLUE)

# Clients
self._client: BetfairHttpClient = client
self._stream = BetfairMarketStreamClient(
http_client=self._client,
message_handler=self.on_market_update,
)
self.parser = BetfairParser(currency=account_currency.code)
self.subscription_status = SubscriptionStatus.UNSUBSCRIBED
self.keep_alive_period = keep_alive_period
self._reconnect_in_progress = False

self._parser = BetfairParser(currency=account_currency.code)
self.subscription_status = SubscriptionStatus.UNSUBSCRIBED

# Async tasks
self._keep_alive_task: asyncio.Task | None = None

# TODO: Move heartbeat down to Rust socket
self._heartbeat_task: asyncio.Task | None = None

# Subscriptions
self._subscribed_instrument_ids: set[InstrumentId] = set()
self._subscribed_market_ids: set[InstrumentId] = set()
Expand All @@ -110,11 +120,7 @@ def instrument_provider(self) -> BetfairInstrumentProvider:
return self._instrument_provider

async def _connect(self) -> None:
self._log.info("Connecting to BetfairHttpClient...")
await self._client.connect()
self._log.info("BetfairClient login successful", LogColor.GREEN)

# Connect market data socket
await self._stream.connect()

# Pass any preloaded instruments into the engine
Expand All @@ -130,9 +136,12 @@ async def _connect(self) -> None:
)

# Schedule a heartbeat in 10s to give us a little more time to load instruments
self._log.debug("scheduling heartbeat")
self.create_task(self._post_connect_heartbeat())
self.create_task(self._keep_alive())
self._log.debug("Scheduling heartbeat")
if not self._heartbeat_task:
self._heartbeat_task = self.create_task(self._post_connect_heartbeat())

if not self._keep_alive_task:
self._keep_alive_task = self.create_task(self._keep_alive())

# Check for any global filters in instrument provider to subscribe
if self.instrument_provider._config.event_type_ids:
Expand All @@ -144,34 +153,51 @@ async def _connect(self) -> None:
self.subscription_status = SubscriptionStatus.SUBSCRIBED

async def _post_connect_heartbeat(self) -> None:
for _ in range(3):
try:
await self._stream.send(msgspec.json.encode({"op": "heartbeat"}))
await asyncio.sleep(5)
except BrokenPipeError:
self._log.warning("Heartbeat failed, reconnecting")
await self._reconnect()
try:
for _ in range(3):
try:
await self._stream.send(msgspec.json.encode({"op": "heartbeat"}))
await asyncio.sleep(5)
except BrokenPipeError:
self._log.warning("Heartbeat failed, reconnecting")
await self._reconnect()
except asyncio.CancelledError:
self._log.debug("Canceled task 'post_connect_heartbeat'")
return

async def _keep_alive(self) -> None:
self._log.info(f"Starting keep-alive every {self.keep_alive_period}s")
while True:
await asyncio.sleep(self.keep_alive_period)
self._log.info("Sending keep-alive")
await self._client.keep_alive()
try:
await asyncio.sleep(self.keep_alive_period)
self._log.info("Sending keep-alive")
await self._client.keep_alive()
except asyncio.CancelledError:
self._log.debug("Canceled task 'keep_alive'")
return

async def _disconnect(self) -> None:
# Close socket
# Cancel tasks
if self._heartbeat_task:
self._log.debug("Canceling task 'heartbeat'")
self._heartbeat_task.cancel()
self._heartbeat_task = None

if self._keep_alive_task:
self._log.debug("Canceling task 'keep_alive'")
self._keep_alive_task.cancel()
self._keep_alive_task = None

self._log.info("Closing streaming socket")
await self._stream.disconnect()

# Ensure client closed
self._log.info("Closing BetfairClient")
await self._client.disconnect()

async def _reconnect(self) -> None:
self._log.info("Attempting reconnect")
if self._stream.is_connected:
self._log.info("Stream connected, disconnecting")
self._log.info("Stream connected: disconnecting")
await self._stream.disconnect()
await self._stream.connect()
self._reconnect_in_progress = False
Expand Down Expand Up @@ -334,7 +360,7 @@ def on_market_update(self, raw: bytes) -> None:

def _on_market_update(self, mcm: MCM) -> None:
self._check_stream_unhealthy(update=mcm)
updates = self.parser.parse(mcm=mcm)
updates = self._parser.parse(mcm=mcm)
for data in updates:
self._log.debug(f"{data=}")
PyCondition.type(data, Data, "data")
Expand Down
54 changes: 27 additions & 27 deletions nautilus_trader/adapters/betfair/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,24 @@ def from_batch(batch: pa.RecordBatch) -> list[BSPOrderBookDelta]:
return data

@staticmethod
def to_batch(self: BSPOrderBookDelta) -> pa.RecordBatch:
def to_batch(obj: BSPOrderBookDelta) -> pa.RecordBatch:
metadata = {
b"instrument_id": self.instrument_id.value.encode(),
b"price_precision": str(self.order.price.precision).encode(),
b"size_precision": str(self.order.size.precision).encode(),
b"instrument_id": obj.instrument_id.value.encode(),
b"price_precision": str(obj.order.price.precision).encode(),
b"size_precision": str(obj.order.size.precision).encode(),
}
schema = BSPOrderBookDelta.schema().with_metadata(metadata)
return pa.RecordBatch.from_pylist(
[
{
"action": self.action,
"side": self.order.side,
"price": self.order.price.raw,
"size": self.order.size.raw,
"order_id": self.order.order_id,
"flags": self.flags,
"ts_event": self.ts_event,
"ts_init": self.ts_init,
"action": obj.action,
"side": obj.order.side,
"price": obj.order.price.raw,
"size": obj.order.size.raw,
"order_id": obj.order.order_id,
"flags": obj.flags,
"ts_event": obj.ts_event,
"ts_init": obj.ts_init,
},
],
schema=schema,
Expand Down Expand Up @@ -208,16 +208,16 @@ def from_dict(cls, values: dict):
)

@staticmethod
def to_dict(self: BetfairTicker):
def to_dict(obj: BetfairTicker):
return {
"type": type(self).__name__,
"instrument_id": self.instrument_id.value,
"ts_event": self._ts_event,
"ts_init": self._ts_init,
"last_traded_price": self.last_traded_price,
"traded_volume": self.traded_volume,
"starting_price_near": self.starting_price_near,
"starting_price_far": self.starting_price_far,
"type": type(obj).__name__,
"instrument_id": obj.instrument_id.value,
"ts_event": obj._ts_event,
"ts_init": obj._ts_init,
"last_traded_price": obj.last_traded_price,
"traded_volume": obj.traded_volume,
"starting_price_near": obj.starting_price_near,
"starting_price_far": obj.starting_price_far,
}

def __repr__(self):
Expand Down Expand Up @@ -291,13 +291,13 @@ def from_dict(cls, values: dict):
)

@staticmethod
def to_dict(self):
def to_dict(obj):
return {
"type": type(self).__name__,
"instrument_id": self.instrument_id.value,
"ts_event": self.ts_event,
"ts_init": self.ts_init,
"bsp": self.bsp,
"type": type(obj).__name__,
"instrument_id": obj.instrument_id.value,
"ts_event": obj.ts_event,
"ts_init": obj.ts_init,
"bsp": obj.bsp,
}


Expand Down
Loading

0 comments on commit 61f70c5

Please sign in to comment.