Skip to content

Commit

Permalink
Refine LiveExecutionEngine reconciliation procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Aug 11, 2024
1 parent 026369c commit 4d75b89
Showing 1 changed file with 31 additions and 37 deletions.
68 changes: 31 additions & 37 deletions nautilus_trader/live/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
from nautilus_trader.model.events import OrderRejected
from nautilus_trader.model.events import OrderTriggered
from nautilus_trader.model.events import OrderUpdated
from nautilus_trader.model.identifiers import ClientId
from nautilus_trader.model.identifiers import ClientOrderId
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import PositionId
from nautilus_trader.model.identifiers import StrategyId
from nautilus_trader.model.identifiers import TradeId
Expand Down Expand Up @@ -435,7 +437,13 @@ async def _check_inflight_orders(self) -> None:

# -- RECONCILIATION -------------------------------------------------------------------------------

async def reconcile_state(self, timeout_secs: float = 10.0) -> bool: # noqa: C901 (too complex)
def _log_reconciliation_result(self, value: ClientId | InstrumentId, result: bool) -> None:
if result:
self._log.info(f"Reconciliation for {value} succeeded", LogColor.GREEN)
else:
self._log.warning(f"Reconciliation for {value} failed")

async def reconcile_state(self, timeout_secs: float = 10.0) -> bool: # (too complex)
"""
Reconcile the internal execution state with all execution clients (external
state).
Expand Down Expand Up @@ -485,55 +493,41 @@ async def reconcile_state(self, timeout_secs: float = 10.0) -> bool: # noqa: C9
client_id = mass_status.client_id
venue = mass_status.venue
result = self._reconcile_mass_status(mass_status)
positions = self._cache.positions_open(venue)

if result:
if not positions:
results.append(result)
self._log.info(f"Reconciliation for {client_id} succeeded", LogColor.GREEN)
continue
else: # Reconciliation failed
self._log.warning(f"Reconciliation for {client_id} initially failed")
if self.filter_position_reports:
self._log.warning(
"`filter_position_reports` enabled, skipping further reconciliation",
)
continue
if not result and self.filter_position_reports:
self._log_reconciliation_result(client_id, result)
results.append(result)
self._log.warning(
"`filter_position_reports` enabled, skipping further reconciliation",
)
continue

client = self._clients.get(client_id)
client = self._clients[client_id]

# Reconcile specific internal open positions
# Check internal and external position reconciliation
report_tasks: list[asyncio.Task] = []
for position in positions:
for position in self._cache.positions_open(venue):
instrument_id = position.instrument_id
if instrument_id in mass_status.position_reports:
self._log.debug(f"Position {instrument_id} for {client_id} already reconciled")
continue # Already reconciled
self._log.info(f"{position} pending reconciliation")
report_tasks.append(client.generate_position_status_reports(instrument_id))

if not report_tasks:
# No specific position reports to continue reconciliation
continue
if report_tasks:
# Reconcile specific internal open positions
self._log.info(f"Awaiting {len(report_tasks)} position reports for {client_id}")
position_results: list[bool] = []
for task_result in await asyncio.gather(*report_tasks):
for report in task_result:
position_result = self._reconcile_position_report(report)
self._log_reconciliation_result(report.instrument_id, position_result)
position_results.append(position_result)

self._log.info(
f"Awaiting reconciliation for {len(report_tasks)} position reports for {client_id}",
)
result = all(position_results)

position_results: list[bool] = []
for task_result in await asyncio.gather(*report_tasks):
for report in task_result:
position_result = self._reconcile_position_report(report)
if position_result:
self._log.info(
f"Reconciliation for {report.instrument_id} succeeded",
LogColor.GREEN,
)
else:
self._log.warning(f"Reconciliation for {report.instrument_id} failed")
position_results.append(position_result)

results.append(all(position_results))
self._log_reconciliation_result(client_id, result)
results.append(result)

return all(results)

Expand Down

0 comments on commit 4d75b89

Please sign in to comment.