Skip to content

Commit

Permalink
Cleanup Betfair clients
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Oct 13, 2024
1 parent f663dad commit d565eac
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
38 changes: 24 additions & 14 deletions nautilus_trader/adapters/betfair/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
clock: LiveClock,
instrument_provider: BetfairInstrumentProvider,
account_currency: Currency,
):
) -> None:
super().__init__(
loop=loop,
client_id=ClientId(BETFAIR_VENUE.value),
Expand Down Expand Up @@ -104,7 +104,7 @@ def __init__(
def instrument_provider(self) -> BetfairInstrumentProvider:
return self._instrument_provider

async def _connect(self):
async def _connect(self) -> None:
self._log.info("Connecting to BetfairHttpClient...")
await self._client.connect()
self._log.info("BetfairClient login successful", LogColor.GREEN)
Expand Down Expand Up @@ -137,12 +137,12 @@ async def _connect(self):
)
self.subscription_status = SubscriptionStatus.SUBSCRIBED

async def _post_connect_heartbeat(self):
async def _post_connect_heartbeat(self) -> None:
for _ in range(3):
await self._stream.send(msgspec.json.encode({"op": "heartbeat"}))
await asyncio.sleep(5)

async def _disconnect(self):
async def _disconnect(self) -> None:
# Close socket
self._log.info("Closing streaming socket")
await self._stream.disconnect()
Expand All @@ -151,14 +151,14 @@ async def _disconnect(self):
self._log.info("Closing BetfairClient")
await self._client.disconnect()

def _reset(self):
def _reset(self) -> None:
if self.is_connected:
self._log.error("Cannot reset a connected data client")
return

self._subscribed_instrument_ids = set()

def _dispose(self):
def _dispose(self) -> None:
if self.is_connected:
self._log.error("Cannot dispose a connected data client")
return
Expand Down Expand Up @@ -204,21 +204,21 @@ async def _subscribe_order_book_deltas(
f"Added market_id {instrument.market_id} for {instrument_id.symbol} <OrderBook> data",
)

async def delayed_subscribe(self, delay=0):
async def delayed_subscribe(self, delay=0) -> None:
self._log.debug(f"Scheduling subscribe for delay={delay}")
await asyncio.sleep(delay)
self._log.info(f"Sending subscribe for market_ids {self._subscribed_market_ids}")
await self._stream.send_subscription_message(market_ids=list(self._subscribed_market_ids))
self._log.info(f"Added market_ids {self._subscribed_market_ids} for <OrderBook> data")

async def _subscribe_ticker(self, instrument_id: InstrumentId) -> None:
pass # Subscribed as part of orderbook
async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping subscribe_instrument, Betfair subscribes as part of orderbook")

async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
pass # Subscribed as part of orderbook
async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping subscribe_quote_ticks, Betfair subscribes as part of orderbook")

async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
self._log.debug("Skipping subscribe_instrument, betfair subscribes as part of orderbook")
async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping subscribe_trade_ticks, Betfair subscribes as part of orderbook")

async def _subscribe_instruments(self) -> None:
for instrument in self._instrument_provider.list_all():
Expand All @@ -241,7 +241,17 @@ async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> N
# subscription message - when we have a use case
self._log.warning("Betfair does not support unsubscribing from instruments")

async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping unsubscribe_instrument, not applicable for Betfair")

async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping unsubscribe_quote_ticks, not applicable for Betfair")

async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
self._log.info("Skipping unsubscribe_trade_ticks, not applicable for Betfair")

# -- STREAMS ----------------------------------------------------------------------------------

def on_market_update(self, raw: bytes) -> None:
"""
Handle an update from the data stream socket.
Expand Down Expand Up @@ -285,6 +295,6 @@ def _handle_status_message(self, update: Status) -> None:
else:
self._log.info("Attempting reconnect")
if self._stream.is_connected:
self._log.info("stream connected, disconnecting.")
self._log.info("Stream connected, disconnecting")
self.create_task(self._stream.disconnect())
self.create_task(self._connect())
8 changes: 4 additions & 4 deletions nautilus_trader/adapters/betfair/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def instrument_provider(self) -> BetfairInstrumentProvider:
async def _connect(self) -> None:
self._log.info("Connecting to BetfairHttpClient...")
await self._client.connect()
self._log.info("BetfairHttpClient login successful.", LogColor.GREEN)
self._log.info("BetfairHttpClient login successful", LogColor.GREEN)

# Connections and start-up checks
self._log.debug(
Expand Down Expand Up @@ -202,7 +202,7 @@ async def _disconnect(self) -> None:
async def on_api_exception(self, error: BetfairError) -> None:
if "INVALID_SESSION_INFORMATION" in error.args[0] or "NO_SESSION" in error.args[0]:
if self._reconnect_in_progress:
self._log.info("Reconnect already in progress.")
self._log.info("Reconnect already in progress")
return

# Avoid multiple reconnection attempts when multiple INVALID_SESSION_INFORMATION errors
Expand All @@ -212,7 +212,7 @@ async def on_api_exception(self, error: BetfairError) -> None:

try:
# Session is invalid, need to reconnect
self._log.warning("Invalid session error, reconnecting..")
self._log.warning("Invalid session error, reconnecting...")
await self._disconnect()
await self._connect()
self._log.info("Reconnected.")
Expand Down Expand Up @@ -390,7 +390,7 @@ async def generate_position_status_reports(
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> list[PositionStatusReport]:
self._log.warning("Cannot generate `PositionStatusReports`: not yet implemented")
self._log.info("Skipping generate_position_status_reports, not implemented for Betfair")

return []

Expand Down

0 comments on commit d565eac

Please sign in to comment.