diff --git a/chia/wallet/wallet_node.py b/chia/wallet/wallet_node.py index 3822c390f817..dd636ac56118 100644 --- a/chia/wallet/wallet_node.py +++ b/chia/wallet/wallet_node.py @@ -600,6 +600,7 @@ async def receive_state_from_peer( cache: PeerRequestCache = self.get_cache_for_peer(peer) if fork_height is not None: cache.clear_after_height(fork_height) + self.log.info(f"Rolling back to {fork_height}") all_tasks: List[asyncio.Task] = [] target_concurrent_tasks: int = 20 @@ -617,21 +618,18 @@ async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: i for inner_state in inner_states: self.add_state_to_race_cache(header_hash, height, inner_state) self.log.info(f"Added to race cache: {height}, {inner_state}") - if trusted: - valid_states = inner_states - else: - valid_states = [ - inner_state - for inner_state in inner_states - if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height) - ] + valid_states = [ + inner_state + for inner_state in inner_states + if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height) + ] if len(valid_states) > 0: - self.log.info( - f"new coin state received ({inner_idx_start}-" - f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})" - ) assert self.new_state_lock is not None async with self.new_state_lock: + self.log.info( + f"new coin state received ({inner_idx_start}-" + f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})" + ) if self.wallet_state_manager is None: return await self.wallet_state_manager.new_coin_state(valid_states, peer, fork_height) @@ -662,18 +660,31 @@ async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: i if peer.peer_node_id not in self.server.all_connections: self.log.error(f"Disconnected from peer {peer.peer_node_id} host {peer.peer_host}") return False - while len(concurrent_tasks_cs_heights) >= target_concurrent_tasks: - await asyncio.sleep(0.1) - if self._shut_down: - self.log.info("Terminating receipt and validation due to shut down request") + if trusted: + try: + self.log.info(f"new coin state received ({idx}-" f"{idx + len(states) - 1}/ {len(items)})") + await self.wallet_state_manager.new_coin_state(states, peer, fork_height) + await self.wallet_state_manager.blockchain.set_finished_sync_up_to( + last_change_height_cs(states[-1]) - 1 + ) + except Exception as e: + tb = traceback.format_exc() + self.log.error(f"Error adding states.. {e} {tb}") return False - concurrent_tasks_cs_heights.append(last_change_height_cs(states[0])) - all_tasks.append(asyncio.create_task(receive_and_validate(states, idx, concurrent_tasks_cs_heights))) + else: + while len(concurrent_tasks_cs_heights) >= target_concurrent_tasks: + await asyncio.sleep(0.1) + if self._shut_down: + self.log.info("Terminating receipt and validation due to shut down request") + return False + concurrent_tasks_cs_heights.append(last_change_height_cs(states[0])) + all_tasks.append(asyncio.create_task(receive_and_validate(states, idx, concurrent_tasks_cs_heights))) idx += len(states) + still_connected = self.server is not None and peer.peer_node_id in self.server.all_connections await asyncio.gather(*all_tasks) await self.update_ui() - return True + return still_connected and self.server is not None and peer.peer_node_id in self.server.all_connections async def get_coins_with_puzzle_hash(self, puzzle_hash) -> List[CoinState]: assert self.wallet_state_manager is not None @@ -838,6 +849,7 @@ async def new_peak_wallet(self, new_peak: wallet_protocol.NewPeakWallet, peer: W self.wallet_state_manager.set_sync_mode(True) await self.long_sync(new_peak.height, peer, uint32(max(0, current_height - 256)), rollback=True) self.wallet_state_manager.set_sync_mode(False) + else: far_behind: bool = ( new_peak.height - self.wallet_state_manager.blockchain.get_peak_height() > self.LONG_SYNC_THRESHOLD