Skip to content

Commit

Permalink
Abort trusted sync if a state update fails (#10523)
Browse files Browse the repository at this point in the history
* Abort trusted sync if a state update fails

* Fix lint
  • Loading branch information
mariano54 committed Mar 2, 2022
1 parent 51dffd8 commit 206205b
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ async def receive_state_from_peer(
cache: PeerRequestCache = self.get_cache_for_peer(peer)
if fork_height is not None:
cache.clear_after_height(fork_height)
self.log.info(f"Rolling back to {fork_height}")

all_tasks: List[asyncio.Task] = []
target_concurrent_tasks: int = 20
Expand All @@ -617,21 +618,18 @@ async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: i
for inner_state in inner_states:
self.add_state_to_race_cache(header_hash, height, inner_state)
self.log.info(f"Added to race cache: {height}, {inner_state}")
if trusted:
valid_states = inner_states
else:
valid_states = [
inner_state
for inner_state in inner_states
if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
]
valid_states = [
inner_state
for inner_state in inner_states
if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
]
if len(valid_states) > 0:
self.log.info(
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})"
)
assert self.new_state_lock is not None
async with self.new_state_lock:
self.log.info(
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})"
)
if self.wallet_state_manager is None:
return
await self.wallet_state_manager.new_coin_state(valid_states, peer, fork_height)
Expand Down Expand Up @@ -662,18 +660,31 @@ async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: i
if peer.peer_node_id not in self.server.all_connections:
self.log.error(f"Disconnected from peer {peer.peer_node_id} host {peer.peer_host}")
return False
while len(concurrent_tasks_cs_heights) >= target_concurrent_tasks:
await asyncio.sleep(0.1)
if self._shut_down:
self.log.info("Terminating receipt and validation due to shut down request")
if trusted:
try:
self.log.info(f"new coin state received ({idx}-" f"{idx + len(states) - 1}/ {len(items)})")
await self.wallet_state_manager.new_coin_state(states, peer, fork_height)
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(
last_change_height_cs(states[-1]) - 1
)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Error adding states.. {e} {tb}")
return False
concurrent_tasks_cs_heights.append(last_change_height_cs(states[0]))
all_tasks.append(asyncio.create_task(receive_and_validate(states, idx, concurrent_tasks_cs_heights)))
else:
while len(concurrent_tasks_cs_heights) >= target_concurrent_tasks:
await asyncio.sleep(0.1)
if self._shut_down:
self.log.info("Terminating receipt and validation due to shut down request")
return False
concurrent_tasks_cs_heights.append(last_change_height_cs(states[0]))
all_tasks.append(asyncio.create_task(receive_and_validate(states, idx, concurrent_tasks_cs_heights)))
idx += len(states)

still_connected = self.server is not None and peer.peer_node_id in self.server.all_connections
await asyncio.gather(*all_tasks)
await self.update_ui()
return True
return still_connected and self.server is not None and peer.peer_node_id in self.server.all_connections

async def get_coins_with_puzzle_hash(self, puzzle_hash) -> List[CoinState]:
assert self.wallet_state_manager is not None
Expand Down Expand Up @@ -838,6 +849,7 @@ async def new_peak_wallet(self, new_peak: wallet_protocol.NewPeakWallet, peer: W
self.wallet_state_manager.set_sync_mode(True)
await self.long_sync(new_peak.height, peer, uint32(max(0, current_height - 256)), rollback=True)
self.wallet_state_manager.set_sync_mode(False)

else:
far_behind: bool = (
new_peak.height - self.wallet_state_manager.blockchain.get_peak_height() > self.LONG_SYNC_THRESHOLD
Expand Down

0 comments on commit 206205b

Please sign in to comment.