From a9c9a8b84c4b575951e32aca340ab0b1614fae88 Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Thu, 22 Dec 2016 14:01:54 +0100 Subject: [PATCH] Untangle `update` method (WIP) #50 The `update()` method of `KademliaProtocol` is far too complex for sensible refactoring, this is a first step to untangle all involved state transitions. --- devp2p/kademlia.py | 139 +++++++++++++++++++++++++++++++-------------- 1 file changed, 95 insertions(+), 44 deletions(-) diff --git a/devp2p/kademlia.py b/devp2p/kademlia.py index 0c8be8f..06448ab 100644 --- a/devp2p/kademlia.py +++ b/devp2p/kademlia.py @@ -380,7 +380,8 @@ def bond(self, node): self.ping(node) elif not node.ping_recv: self.waiting_for_ping.append(node) - assert node.bonded + else: + assert node.bonded def update(self, node, pingid=None): """ @@ -434,48 +435,35 @@ def update(self, node, pingid=None): if not node.bonded: self.bond(node) - def _expected_pongs(): - return set(v[1] for v in self._expected_pongs.values()) - if pingid and (pingid not in self._expected_pongs): - assert pingid not in self._expected_pongs - log.debug('surprising pong', remoteid=node, - expected=_expected_pongs(), pingid=pingid.encode('hex')[:8]) - if pingid in self._deleted_pingids: - log.debug('surprising pong was deleted') - else: - for key in self._expected_pongs: - if key.endswith(node.pubkey): - log.debug('waiting for ping from node, but echo mismatch', node=node, - expected_echo=key[:len(node.pubkey)][:8].encode('hex'), - received_echo=pingid[:len(node.pubkey)][:8].encode('hex')) - return + return self._process_surprising_pong(node, pingid) - # check for timed out pings and eventually evict them - for _pingid, (timeout, _node, replacement) in self._expected_pongs.items(): - if time.time() > timeout: - log.debug('deleting timedout node', remoteid=_node, - pingid=_pingid.encode('hex')[:8]) - self._deleted_pingids.add(_pingid) # FIXME this is for testing - del self._expected_pongs[_pingid] - self.routing.remove_node(_node) - if replacement: - log.debug('adding replacement', remoteid=replacement) - self.update(replacement) - return - if _node == node: # prevent node from being added later - return + if self._check_timed_out_pings(node): + # prevent node from being added later + return # if we had registered this node for eviction test - if pingid in self._expected_pongs: - timeout, _node, replacement = self._expected_pongs[pingid] - log.debug('received expected pong', remoteid=node) - if replacement: - log.debug('adding replacement to cache', remoteid=replacement) - self.routing.bucket_by_node(replacement).replacement_cache.append(replacement) - del self._expected_pongs[pingid] + if pingid and (pingid in self._expected_pongs): + self._handle_eviction_test_ping(node, pingid) # add node + self._add_or_queue_eviction_test(node) + + # check for not full buckets and ping replacements + self._ping_potential_replacements() + + # check idle buckets + self._check_idle_buckets() + + # check and remove timedout find requests + self._check_and_remove_timed_out_find_requests() + + log.debug('updated', num_nodes=len(self.routing), num_buckets=len(self.routing.buckets)) + + def _add_or_queue_eviction_test(self, node): + """Try to add the node. If routing proposes an eviction candidate instead, + add queue the eviction test. + """ eviction_candidate = self.routing.add_node(node) if eviction_candidate: log.debug('could not add', remoteid=node, pinging=eviction_candidate) @@ -484,12 +472,31 @@ def _expected_pongs(): else: log.debug('added', remoteid=node) - # check for not full buckets and ping replacements + def _handle_eviction_test_ping(self, node, pingid): + """Consume expected pong (by pingid) and handle potential eviction replacement. + """ + timeout, _node, replacement = self._expected_pongs[pingid] + log.debug('received expected pong', remoteid=node) + if replacement: + log.debug('adding replacement to cache', remoteid=replacement) + self.routing.bucket_by_node(replacement).replacement_cache.append(replacement) + del self._expected_pongs[pingid] + + def _ping_potential_replacements(self): + """Find unfilled buckets and ping potential replacements. + """ for bucket in self.routing.not_full_buckets: for node in bucket.replacement_cache: self.ping(node) - # check idle buckets + def _check_and_remove_timed_out_find_requests(self): + """Cleanup find_requests if timed out + """ + for nodeid, timeout in self._find_requests.items(): + if time.time() > timeout: + del self._find_requests[nodeid] + + def _check_idle_buckets(self): """ idle bucket refresh: for each bucket which hasn't been touched in 3600 seconds @@ -499,12 +506,56 @@ def _expected_pongs(): rid = random.randint(bucket.start, bucket.end) self.find_node(rid) - # check and removed timedout find requests - for nodeid, timeout in self._find_requests.items(): - if time.time() > timeout: - del self._find_requests[nodeid] + def _expected_pongs(self): + return set(v[1] for v in self._expected_pongs.values()) - log.debug('updated', num_nodes=len(self.routing), num_buckets=len(self.routing.buckets)) + def _process_surprising_pong(self, node, pingid): + """Receive an unknown pingid (from pong) + Args: + node: the node in scope + pingid: the pingid + """ + assert pingid not in self._expected_pongs + log.debug('surprising pong', remoteid=node, + expected=self._expected_pongs(), pingid=pingid.encode('hex')[:8]) + if pingid in self._deleted_pingids: + log.debug('surprising pong was deleted') + else: + for key in self._expected_pongs: + if key.endswith(node.pubkey): + log.debug('waiting for ping from node, but echo mismatch', node=node, + expected_echo=key[:len(node.pubkey)][:8].encode('hex'), + received_echo=pingid[:len(node.pubkey)][:8].encode('hex')) + + def _check_timed_out_pings(self, node): + """Check for timed out pings and eventually evict them. + If there are replacements registered for timed out pings, recurse into + `self.update` with the replacements. + Args: + node: the node in scope + Return: + timed_out (boolean): if a ping to the node in scope timed out + """ + timed_out = False + replacements = [] + # check for timed out pings and eventually evict them + for _pingid, (timeout, _node, replacement) in self._expected_pongs.items(): + if time.time() > timeout: + log.debug('deleting timedout node', remoteid=_node, + pingid=_pingid.encode('hex')[:8]) + self._deleted_pingids.add(_pingid) # FIXME this is for testing + del self._expected_pongs[_pingid] + self.routing.remove_node(_node) + if replacement: + log.debug('adding replacement', remoteid=replacement) + assert replacement != node + replacements.append(replacement) + if _node == node: # prevent node from being added later + timed_out = True + if replacements: + for replacement in replacements: + self.update(replacement) + return timed_out def _mkpingid(self, echoed, node): assert node.pubkey