From d565eaca3f109d63566ff89ca7fcb47b92d56614 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sun, 13 Oct 2024 15:50:25 +1100 Subject: [PATCH] Cleanup Betfair clients --- nautilus_trader/adapters/betfair/data.py | 38 ++++++++++++------- nautilus_trader/adapters/betfair/execution.py | 8 ++-- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/nautilus_trader/adapters/betfair/data.py b/nautilus_trader/adapters/betfair/data.py index 2496443fca1..3353374658e 100644 --- a/nautilus_trader/adapters/betfair/data.py +++ b/nautilus_trader/adapters/betfair/data.py @@ -76,7 +76,7 @@ def __init__( clock: LiveClock, instrument_provider: BetfairInstrumentProvider, account_currency: Currency, - ): + ) -> None: super().__init__( loop=loop, client_id=ClientId(BETFAIR_VENUE.value), @@ -104,7 +104,7 @@ def __init__( def instrument_provider(self) -> BetfairInstrumentProvider: return self._instrument_provider - async def _connect(self): + async def _connect(self) -> None: self._log.info("Connecting to BetfairHttpClient...") await self._client.connect() self._log.info("BetfairClient login successful", LogColor.GREEN) @@ -137,12 +137,12 @@ async def _connect(self): ) self.subscription_status = SubscriptionStatus.SUBSCRIBED - async def _post_connect_heartbeat(self): + async def _post_connect_heartbeat(self) -> None: for _ in range(3): await self._stream.send(msgspec.json.encode({"op": "heartbeat"})) await asyncio.sleep(5) - async def _disconnect(self): + async def _disconnect(self) -> None: # Close socket self._log.info("Closing streaming socket") await self._stream.disconnect() @@ -151,14 +151,14 @@ async def _disconnect(self): self._log.info("Closing BetfairClient") await self._client.disconnect() - def _reset(self): + def _reset(self) -> None: if self.is_connected: self._log.error("Cannot reset a connected data client") return self._subscribed_instrument_ids = set() - def _dispose(self): + def _dispose(self) -> None: if self.is_connected: self._log.error("Cannot dispose a connected data client") return @@ -204,21 +204,21 @@ async def _subscribe_order_book_deltas( f"Added market_id {instrument.market_id} for {instrument_id.symbol} data", ) - async def delayed_subscribe(self, delay=0): + async def delayed_subscribe(self, delay=0) -> None: self._log.debug(f"Scheduling subscribe for delay={delay}") await asyncio.sleep(delay) self._log.info(f"Sending subscribe for market_ids {self._subscribed_market_ids}") await self._stream.send_subscription_message(market_ids=list(self._subscribed_market_ids)) self._log.info(f"Added market_ids {self._subscribed_market_ids} for data") - async def _subscribe_ticker(self, instrument_id: InstrumentId) -> None: - pass # Subscribed as part of orderbook + async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: + self._log.info("Skipping subscribe_instrument, Betfair subscribes as part of orderbook") - async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: - pass # Subscribed as part of orderbook + async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + self._log.info("Skipping subscribe_quote_ticks, Betfair subscribes as part of orderbook") - async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None: - self._log.debug("Skipping subscribe_instrument, betfair subscribes as part of orderbook") + async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + self._log.info("Skipping subscribe_trade_ticks, Betfair subscribes as part of orderbook") async def _subscribe_instruments(self) -> None: for instrument in self._instrument_provider.list_all(): @@ -241,7 +241,17 @@ async def _unsubscribe_order_book_deltas(self, instrument_id: InstrumentId) -> N # subscription message - when we have a use case self._log.warning("Betfair does not support unsubscribing from instruments") + async def _unsubscribe_instrument(self, instrument_id: InstrumentId) -> None: + self._log.info("Skipping unsubscribe_instrument, not applicable for Betfair") + + async def _unsubscribe_quote_ticks(self, instrument_id: InstrumentId) -> None: + self._log.info("Skipping unsubscribe_quote_ticks, not applicable for Betfair") + + async def _unsubscribe_trade_ticks(self, instrument_id: InstrumentId) -> None: + self._log.info("Skipping unsubscribe_trade_ticks, not applicable for Betfair") + # -- STREAMS ---------------------------------------------------------------------------------- + def on_market_update(self, raw: bytes) -> None: """ Handle an update from the data stream socket. @@ -285,6 +295,6 @@ def _handle_status_message(self, update: Status) -> None: else: self._log.info("Attempting reconnect") if self._stream.is_connected: - self._log.info("stream connected, disconnecting.") + self._log.info("Stream connected, disconnecting") self.create_task(self._stream.disconnect()) self.create_task(self._connect()) diff --git a/nautilus_trader/adapters/betfair/execution.py b/nautilus_trader/adapters/betfair/execution.py index 72622578c37..231a340d01d 100644 --- a/nautilus_trader/adapters/betfair/execution.py +++ b/nautilus_trader/adapters/betfair/execution.py @@ -168,7 +168,7 @@ def instrument_provider(self) -> BetfairInstrumentProvider: async def _connect(self) -> None: self._log.info("Connecting to BetfairHttpClient...") await self._client.connect() - self._log.info("BetfairHttpClient login successful.", LogColor.GREEN) + self._log.info("BetfairHttpClient login successful", LogColor.GREEN) # Connections and start-up checks self._log.debug( @@ -202,7 +202,7 @@ async def _disconnect(self) -> None: async def on_api_exception(self, error: BetfairError) -> None: if "INVALID_SESSION_INFORMATION" in error.args[0] or "NO_SESSION" in error.args[0]: if self._reconnect_in_progress: - self._log.info("Reconnect already in progress.") + self._log.info("Reconnect already in progress") return # Avoid multiple reconnection attempts when multiple INVALID_SESSION_INFORMATION errors @@ -212,7 +212,7 @@ async def on_api_exception(self, error: BetfairError) -> None: try: # Session is invalid, need to reconnect - self._log.warning("Invalid session error, reconnecting..") + self._log.warning("Invalid session error, reconnecting...") await self._disconnect() await self._connect() self._log.info("Reconnected.") @@ -390,7 +390,7 @@ async def generate_position_status_reports( start: pd.Timestamp | None = None, end: pd.Timestamp | None = None, ) -> list[PositionStatusReport]: - self._log.warning("Cannot generate `PositionStatusReports`: not yet implemented") + self._log.info("Skipping generate_position_status_reports, not implemented for Betfair") return []