Skip to content

Commit

Permalink
Improve Betfair clients configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 26, 2025
1 parent 6e03c27 commit 2d48745
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 78 deletions.
2 changes: 2 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ This release will be the final version that uses Poetry for package and dependen
- Added `venue_position_id` parameter for `OrderStatusReport`
- Added bars update support for `Portfolio` PnLs (#2239), thanks @faysou
- Added optional `params` for `Strategy` order management methods (symmetry with `Actor` data methods) (#2251), thanks @faysou
- Improved Betfair clients configuration

### Breaking Changes
- Renamed `event_logging` config option to `log_events`
- Renamed `BetfairExecClientConfig.request_account_state_period` to `request_account_state_secs`
- Moved SQL schema directory to `schemas/sql` (reinstall the Nautilus CLI with `make install-cli`)
- Changed `BettingInstrument` default `min_notional` to `None`

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/betfair.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ config = TradingNodeConfig(
..., # Omitted
data_clients={
"BETFAIR": {
"account_currency": "AUD",
# username=None, # 'BETFAIR_USERNAME' env var
# password=None, # 'BETFAIR_PASSWORD' env var
# app_key=None, # 'BETFAIR_APP_KEY' env var
Expand All @@ -90,11 +91,11 @@ config = TradingNodeConfig(
},
exec_clients={
"BETFAIR": {
"account_currency": "AUD",
# username=None, # 'BETFAIR_USERNAME' env var
# password=None, # 'BETFAIR_PASSWORD' env var
# app_key=None, # 'BETFAIR_APP_KEY' env var
# cert_dir=None, # 'BETFAIR_CERT_DIR' env var
"base_currency": "AUD",
},
}
)
Expand Down
27 changes: 22 additions & 5 deletions nautilus_trader/adapters/betfair/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# -------------------------------------------------------------------------------------------------

from nautilus_trader.adapters.betfair.providers import BetfairInstrumentProviderConfig
from nautilus_trader.common.config import PositiveInt
from nautilus_trader.config import LiveDataClientConfig
from nautilus_trader.config import LiveExecClientConfig

Expand All @@ -24,14 +25,22 @@ class BetfairDataClientConfig(LiveDataClientConfig, kw_only=True, frozen=True):
Parameters
----------
account_currency : str
The currency for the Betfair account.
username : str, optional
The Betfair account username.
password : str, optional
The Betfair account password.
app_key : str, optional
The betfair application key.
The Betfair application key.
cert_dir : str, optional
The local directory that contains the betfair certificates.
The local directory that contains the Betfair certificates.
instrument_config : BetfairInstrumentProviderConfig, None
The Betfair instrument provider config.
subscription_delay_secs : PositiveInt, default 3
The delay (seconds) to delay sending the *initial* subscription message.
keep_alive_secs : PositiveInt, default 36_000 (10 hours)
The keep alive interval (seconds) for the HTTP client.
"""

Expand All @@ -41,6 +50,8 @@ class BetfairDataClientConfig(LiveDataClientConfig, kw_only=True, frozen=True):
app_key: str | None = None
cert_dir: str | None = None
instrument_config: BetfairInstrumentProviderConfig | None = None
subscription_delay_secs: PositiveInt | None = 3
keep_alive_secs: PositiveInt = 36_000 # 10 hours


class BetfairExecClientConfig(LiveExecClientConfig, kw_only=True, frozen=True):
Expand All @@ -49,14 +60,20 @@ class BetfairExecClientConfig(LiveExecClientConfig, kw_only=True, frozen=True):
Parameters
----------
account_currency : str
The currency for the Betfair account.
username : str, optional
The Betfair account username.
password : str, optional
The Betfair account password.
app_key : str, optional
The betfair application key.
The Betfair application key.
cert_dir : str, optional
The local directory that contains the betfair certificates.
The local directory that contains the Betfair certificates.
instrument_config : BetfairInstrumentProviderConfig, None
The Betfair instrument provider config.
request_account_state_secs : PositiveInt, default 300 (5 minutes)
The request interval (seconds) for account state checks.
"""

Expand All @@ -66,4 +83,4 @@ class BetfairExecClientConfig(LiveExecClientConfig, kw_only=True, frozen=True):
app_key: str | None = None
cert_dir: str | None = None
instrument_config: BetfairInstrumentProviderConfig | None = None
request_account_state_period: int | None = None
request_account_state_secs: PositiveInt = 300
83 changes: 28 additions & 55 deletions nautilus_trader/adapters/betfair/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import asyncio
from typing import Any

import msgspec
from betfair_parser.spec.streaming import MCM
from betfair_parser.spec.streaming import Connection
from betfair_parser.spec.streaming import Status
from betfair_parser.spec.streaming import stream_decode

from nautilus_trader.adapters.betfair.client import BetfairHttpClient
from nautilus_trader.adapters.betfair.config import BetfairDataClientConfig
from nautilus_trader.adapters.betfair.constants import BETFAIR_VENUE
from nautilus_trader.adapters.betfair.data_types import SubscriptionStatus
from nautilus_trader.adapters.betfair.parsing.common import merge_instrument_fields
Expand All @@ -40,7 +40,6 @@
from nautilus_trader.model.identifiers import ClientId
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.instruments.betting import BettingInstrument
from nautilus_trader.model.objects import Currency


class BetfairDataClient(LiveMarketDataClient):
Expand All @@ -61,10 +60,8 @@ class BetfairDataClient(LiveMarketDataClient):
The clock for the client.
instrument_provider : BetfairInstrumentProvider, optional
The instrument provider.
account_currency : Currency
The currency for the Betfair account.
keep_alive_period : int, default 36_000 (10 hours)
The keep alive period (seconds) for the socket client.
config : BetfairDataClientConfig
The configuration for the client.
"""

Expand All @@ -76,8 +73,7 @@ def __init__(
cache: Cache,
clock: LiveClock,
instrument_provider: BetfairInstrumentProvider,
account_currency: Currency,
keep_alive_period: int = 3600 * 10, # 10 hours
config: BetfairDataClientConfig,
) -> None:
super().__init__(
loop=loop,
Expand All @@ -91,8 +87,10 @@ def __init__(
self._instrument_provider: BetfairInstrumentProvider = instrument_provider

# Configuration
self.keep_alive_period = keep_alive_period
self._log.info(f"{keep_alive_period=}", LogColor.BLUE)
self.config = config
self._log.info(f"{config.account_currency=}", LogColor.BLUE)
self._log.info(f"{config.subscription_delay_secs=}", LogColor.BLUE)
self._log.info(f"{config.keep_alive_secs=}", LogColor.BLUE)

# Clients
self._client: BetfairHttpClient = client
Expand All @@ -102,16 +100,13 @@ def __init__(
)
self._reconnect_in_progress = False

self._parser = BetfairParser(currency=account_currency.code)
self.subscription_status = SubscriptionStatus.UNSUBSCRIBED
self._parser = BetfairParser(currency=config.account_currency)

# Async tasks
self._keep_alive_task: asyncio.Task | None = None

# TODO: Move heartbeat down to Rust socket
self._heartbeat_task: asyncio.Task | None = None

# Subscriptions
self._subscription_status = SubscriptionStatus.UNSUBSCRIBED
self._subscribed_instrument_ids: set[InstrumentId] = set()
self._subscribed_market_ids: set[InstrumentId] = set()

Expand All @@ -135,11 +130,6 @@ async def _connect(self) -> None:
f"DataEngine has {len(self._cache.instruments(BETFAIR_VENUE))} Betfair instruments",
)

# Schedule a heartbeat in 10s to give us a little more time to load instruments
self._log.debug("Scheduling heartbeat")
if not self._heartbeat_task:
self._heartbeat_task = self.create_task(self._post_connect_heartbeat())

if not self._keep_alive_task:
self._keep_alive_task = self.create_task(self._keep_alive())

Expand All @@ -150,26 +140,13 @@ async def _connect(self) -> None:
country_codes=self.instrument_provider._config.country_codes,
market_types=self.instrument_provider._config.market_types,
)
self.subscription_status = SubscriptionStatus.SUBSCRIBED

async def _post_connect_heartbeat(self) -> None:
try:
for _ in range(3):
try:
await self._stream.send(msgspec.json.encode({"op": "heartbeat"}))
await asyncio.sleep(5)
except BrokenPipeError:
self._log.warning("Heartbeat failed, reconnecting")
await self._reconnect()
except asyncio.CancelledError:
self._log.debug("Canceled task 'post_connect_heartbeat'")
return
self._subscription_status = SubscriptionStatus.SUBSCRIBED

async def _keep_alive(self) -> None:
self._log.info(f"Starting keep-alive every {self.keep_alive_period}s")
self._log.info(f"Starting keep-alive every {self.config.keep_alive_secs}s")
while True:
try:
await asyncio.sleep(self.keep_alive_period)
await asyncio.sleep(self.config.keep_alive_secs)
self._log.info("Sending keep-alive")
await self._client.keep_alive()
except asyncio.CancelledError:
Expand All @@ -178,11 +155,6 @@ async def _keep_alive(self) -> None:

async def _disconnect(self) -> None:
# Cancel tasks
if self._heartbeat_task:
self._log.debug("Canceling task 'heartbeat'")
self._heartbeat_task.cancel()
self._heartbeat_task = None

if self._keep_alive_task:
self._log.debug("Canceling task 'keep_alive'")
self._keep_alive_task.cancel()
Expand Down Expand Up @@ -216,6 +188,13 @@ def _dispose(self) -> None:

# -- SUBSCRIPTIONS ----------------------------------------------------------------------------

async def _delayed_subscribe(self, delay: int = 0) -> None:
self._log.debug(f"Scheduling subscribe for delay={delay}")
await asyncio.sleep(delay)
self._log.info(f"Sending subscribe for market_ids {self._subscribed_market_ids}")
await self._stream.send_subscription_message(market_ids=list(self._subscribed_market_ids))
self._log.info(f"Added market_ids {self._subscribed_market_ids} for <OrderBook> data")

async def _subscribe_order_book_deltas(
self,
instrument_id: InstrumentId,
Expand All @@ -234,7 +213,7 @@ async def _subscribe_order_book_deltas(
)
return

if self.subscription_status == SubscriptionStatus.SUBSCRIBED:
if self._subscription_status == SubscriptionStatus.SUBSCRIBED:
self._log.debug("Already subscribed")
return

Expand All @@ -243,25 +222,19 @@ async def _subscribe_order_book_deltas(
# their subscriptions (every change triggers a full snapshot).
self._subscribed_market_ids.add(instrument.market_id)
self._subscribed_instrument_ids.add(instrument.id)
if self.subscription_status == SubscriptionStatus.UNSUBSCRIBED:
self.create_task(self.delayed_subscribe(delay=3))
self.subscription_status = SubscriptionStatus.PENDING_STARTUP
elif self.subscription_status == SubscriptionStatus.PENDING_STARTUP:
if self._subscription_status == SubscriptionStatus.UNSUBSCRIBED:
delay = self.config.subscription_delay_secs or 0
self.create_task(self._delayed_subscribe(delay=delay))
self._subscription_status = SubscriptionStatus.PENDING_STARTUP
elif self._subscription_status == SubscriptionStatus.PENDING_STARTUP:
pass
elif self.subscription_status == SubscriptionStatus.RUNNING:
self.create_task(self.delayed_subscribe(delay=0))
elif self._subscription_status == SubscriptionStatus.RUNNING:
self.create_task(self._delayed_subscribe(delay=0))

self._log.info(
f"Added market_id {instrument.market_id} for {instrument_id.symbol} <OrderBook> data",
)

async def delayed_subscribe(self, delay=0) -> None:
self._log.debug(f"Scheduling subscribe for delay={delay}")
await asyncio.sleep(delay)
self._log.info(f"Sending subscribe for market_ids {self._subscribed_market_ids}")
await self._stream.send_subscription_message(market_ids=list(self._subscribed_market_ids))
self._log.info(f"Added market_ids {self._subscribed_market_ids} for <OrderBook> data")

async def _subscribe_instrument(
self,
instrument_id: InstrumentId,
Expand Down
19 changes: 9 additions & 10 deletions nautilus_trader/adapters/betfair/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from nautilus_trader.accounting.factory import AccountFactory
from nautilus_trader.adapters.betfair.client import BetfairHttpClient
from nautilus_trader.adapters.betfair.common import OrderSideParser
from nautilus_trader.adapters.betfair.config import BetfairExecClientConfig
from nautilus_trader.adapters.betfair.constants import BETFAIR_VENUE
from nautilus_trader.adapters.betfair.orderbook import betfair_float_to_price
from nautilus_trader.adapters.betfair.orderbook import betfair_float_to_quantity
Expand Down Expand Up @@ -102,8 +103,6 @@ class BetfairExecutionClient(LiveExecutionClient):
The event loop for the client.
client : BetfairHttpClient
The Betfair HttpClient.
account_currency : Currency
The account base currency for the client.
msgbus : MessageBus
The message bus for the client.
cache : Cache
Expand All @@ -112,29 +111,28 @@ class BetfairExecutionClient(LiveExecutionClient):
The clock for the client.
instrument_provider : BetfairInstrumentProvider
The instrument provider.
request_account_state_period : int
The period (seconds) between checking account state.
config : BetfairExecClientConfig
The configuration for the client.
"""

def __init__(
self,
loop: asyncio.AbstractEventLoop,
client: BetfairHttpClient,
account_currency: Currency,
msgbus: MessageBus,
cache: Cache,
clock: LiveClock,
instrument_provider: BetfairInstrumentProvider,
request_account_state_period: int,
config: BetfairExecClientConfig,
) -> None:
super().__init__(
loop=loop,
client_id=ClientId(BETFAIR_VENUE.value),
venue=BETFAIR_VENUE,
oms_type=OmsType.NETTING,
account_type=AccountType.BETTING,
base_currency=account_currency,
base_currency=Currency.from_str(config.account_currency),
instrument_provider=instrument_provider,
msgbus=msgbus,
cache=cache,
Expand All @@ -145,8 +143,9 @@ def __init__(
AccountFactory.register_calculated_account(BETFAIR_VENUE.value)

# Configuration
self.request_account_state_period = request_account_state_period or 300
self._log.info(f"{self.request_account_state_period=}", LogColor.BLUE)
self.config = config
self._log.info(f"{config.account_currency=}", LogColor.BLUE)
self._log.info(f"{config.request_account_state_secs=}", LogColor.BLUE)

# Clients
self._client: BetfairHttpClient = client
Expand Down Expand Up @@ -241,7 +240,7 @@ async def update_account_state():
while not self._is_closing:
try:
await update_account_state()
await asyncio.sleep(self.request_account_state_period)
await asyncio.sleep(self.config.request_account_state_secs)
except asyncio.CancelledError:
self._log.debug("Canceled task 'account_state_updates'")

Expand Down
6 changes: 2 additions & 4 deletions nautilus_trader/adapters/betfair/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from nautilus_trader.common.component import MessageBus
from nautilus_trader.live.factories import LiveDataClientFactory
from nautilus_trader.live.factories import LiveExecClientFactory
from nautilus_trader.model.objects import Currency


@lru_cache(1)
Expand Down Expand Up @@ -161,7 +160,7 @@ def create( # type: ignore
cache=cache,
clock=clock,
instrument_provider=provider,
account_currency=Currency.from_str(config.account_currency),
config=config,
)
return data_client

Expand Down Expand Up @@ -217,11 +216,10 @@ def create( # type: ignore
exec_client = BetfairExecutionClient(
loop=loop,
client=client,
account_currency=Currency.from_str(config.account_currency),
msgbus=msgbus,
cache=cache,
clock=clock,
instrument_provider=provider,
request_account_state_period=config.request_account_state_period,
config=config,
)
return exec_client
Loading

0 comments on commit 2d48745

Please sign in to comment.