Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort trusted sync if a state update fails #10523

Merged
merged 2 commits into from
Mar 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions chia/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ async def receive_state_from_peer(
cache: PeerRequestCache = self.get_cache_for_peer(peer)
if fork_height is not None:
cache.clear_after_height(fork_height)
self.log.info(f"Rolling back to {fork_height}")

all_tasks: List[asyncio.Task] = []
target_concurrent_tasks: int = 20
Expand All @@ -617,21 +618,18 @@ async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: i
for inner_state in inner_states:
self.add_state_to_race_cache(header_hash, height, inner_state)
self.log.info(f"Added to race cache: {height}, {inner_state}")
if trusted:
valid_states = inner_states
else:
valid_states = [
inner_state
for inner_state in inner_states
if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
]
valid_states = [
inner_state
for inner_state in inner_states
if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
]
if len(valid_states) > 0:
self.log.info(
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})"
)
assert self.new_state_lock is not None
async with self.new_state_lock:
self.log.info(
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})"
)
if self.wallet_state_manager is None:
return
await self.wallet_state_manager.new_coin_state(valid_states, peer, fork_height)
Expand Down Expand Up @@ -662,18 +660,31 @@ async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: i
if peer.peer_node_id not in self.server.all_connections:
self.log.error(f"Disconnected from peer {peer.peer_node_id} host {peer.peer_host}")
return False
while len(concurrent_tasks_cs_heights) >= target_concurrent_tasks:
await asyncio.sleep(0.1)
if self._shut_down:
self.log.info("Terminating receipt and validation due to shut down request")
if trusted:
try:
self.log.info(f"new coin state received ({idx}-" f"{idx + len(states) - 1}/ {len(items)})")
await self.wallet_state_manager.new_coin_state(states, peer, fork_height)
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(
last_change_height_cs(states[-1]) - 1
)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Error adding states.. {e} {tb}")
return False
concurrent_tasks_cs_heights.append(last_change_height_cs(states[0]))
all_tasks.append(asyncio.create_task(receive_and_validate(states, idx, concurrent_tasks_cs_heights)))
else:
while len(concurrent_tasks_cs_heights) >= target_concurrent_tasks:
await asyncio.sleep(0.1)
if self._shut_down:
self.log.info("Terminating receipt and validation due to shut down request")
return False
concurrent_tasks_cs_heights.append(last_change_height_cs(states[0]))
all_tasks.append(asyncio.create_task(receive_and_validate(states, idx, concurrent_tasks_cs_heights)))
idx += len(states)

still_connected = self.server is not None and peer.peer_node_id in self.server.all_connections
await asyncio.gather(*all_tasks)
await self.update_ui()
return True
return still_connected and self.server is not None and peer.peer_node_id in self.server.all_connections

async def get_coins_with_puzzle_hash(self, puzzle_hash) -> List[CoinState]:
assert self.wallet_state_manager is not None
Expand Down Expand Up @@ -838,6 +849,7 @@ async def new_peak_wallet(self, new_peak: wallet_protocol.NewPeakWallet, peer: W
self.wallet_state_manager.set_sync_mode(True)
await self.long_sync(new_peak.height, peer, uint32(max(0, current_height - 256)), rollback=True)
self.wallet_state_manager.set_sync_mode(False)

else:
far_behind: bool = (
new_peak.height - self.wallet_state_manager.blockchain.get_peak_height() > self.LONG_SYNC_THRESHOLD
Expand Down