Skip to content

Commit

Permalink
Peering - Find and remove peers from the peer table that share the sa…
Browse files Browse the repository at this point in the history
…me IP and TCP port with different discovery ports (hyperledger#7089)

Find and remove peers from the peer table that share the same IP and TCP port with different discovery ports
---------

Signed-off-by: [email protected] <[email protected]>
Signed-off-by: Sally MacFarlane <[email protected]>
Co-authored-by: Sally MacFarlane <[email protected]>
Signed-off-by: Justin Florentine <[email protected]>
  • Loading branch information
2 people authored and jflo committed May 28, 2024
1 parent 61115d4 commit 4c986cd
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- Remove deprecated Goerli testnet [#7049](https://github.com/hyperledger/besu/pull/7049)
- Default bonsai to use full-flat db and code-storage-by-code-hash [#6984](https://github.com/hyperledger/besu/pull/6894)
- New RPC methods miner_setExtraData and miner_getExtraData [#7078](https://github.com/hyperledger/besu/pull/7078)
- Disconnect peers that have multiple discovery ports since they give us bad neighbours [#7089](https://github.com/hyperledger/besu/pull/7089)

### Bug fixes
- Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665)
Expand Down
1 change: 1 addition & 0 deletions ethereum/p2p/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation 'io.tmio:tuweni-io'
implementation 'io.tmio:tuweni-rlp'
implementation 'io.tmio:tuweni-units'
implementation 'org.apache.commons:commons-collections4'
implementation 'org.jetbrains.kotlin:kotlin-stdlib'
implementation 'org.owasp.encoder:encoder'
implementation 'org.xerial.snappy:snappy-java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
}

final DiscoveryPeer peer = resolvePeer(sender);
if (peer == null) {
return;
}
final Bytes peerId = peer.getId();
switch (packet.getType()) {
case PING:
Expand Down Expand Up @@ -399,30 +402,33 @@ private List<DiscoveryPeer> getPeersFromNeighborsPacket(final Packet packet) {
}

private boolean addToPeerTable(final DiscoveryPeer peer) {
// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
peer.setFirstDiscovered(now);
}
peer.setLastSeen(now);
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() != PeerTable.AddResult.AddOutcome.INVALID) {

if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
connectOnRlpxLayer(peer);
}
// Reset the last seen timestamp.
final long now = System.currentTimeMillis();
if (peer.getFirstDiscovered() == 0) {
peer.setFirstDiscovered(now);
}
peer.setLastSeen(now);

final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (peer.getStatus() != PeerDiscoveryStatus.BONDED) {
peer.setStatus(PeerDiscoveryStatus.BONDED);
connectOnRlpxLayer(peer);
}

if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}
if (result.getOutcome() == PeerTable.AddResult.AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == PeerTable.AddResult.AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}

return true;
return true;
}
return false;
}

void connectOnRlpxLayer(final DiscoveryPeer peer) {
Expand Down Expand Up @@ -688,7 +694,9 @@ public void setRetryDelayFunction(final RetryDelayFunction retryDelayFunction) {

public void handleBondingRequest(final DiscoveryPeer peer) {
final DiscoveryPeer peerToBond = resolvePeer(peer);

if (peerToBond == null) {
return;
}
if (peerPermissions.allowOutboundBonding(peerToBond)
&& PeerDiscoveryStatus.KNOWN.equals(peerToBond.getStatus())) {
bond(peerToBond);
Expand All @@ -697,6 +705,9 @@ public void handleBondingRequest(final DiscoveryPeer peer) {

// Load the peer first from the table, then from bonding cache or use the instance that comes in.
private DiscoveryPeer resolvePeer(final DiscoveryPeer peer) {
if (peerTable.ipAddressIsInvalid(peer.getEndpoint())) {
return null;
}
final Optional<DiscoveryPeer> maybeKnownPeer =
peerTable.get(peer).filter(known -> known.discoveryEndpointMatches(peer));
DiscoveryPeer resolvedPeer = maybeKnownPeer.orElse(peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.hyperledger.besu.crypto.Hash;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.Endpoint;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryStatus;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.AddResult.AddOutcome;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -33,6 +35,7 @@
import java.util.stream.Stream;

import com.google.common.hash.BloomFilter;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.tuweni.bytes.Bytes;

/**
Expand All @@ -51,6 +54,10 @@ public class PeerTable {
private final Map<Bytes, Integer> distanceCache;
private BloomFilter<Bytes> idBloom;
private int evictionCnt = 0;
private final LinkedHashMapWithMaximumSize<String, Integer> ipAddressCheckMap =
new LinkedHashMapWithMaximumSize<>(DEFAULT_BUCKET_SIZE * N_BUCKETS);
private final CircularFifoQueue<String> invalidIPs =
new CircularFifoQueue<>(DEFAULT_BUCKET_SIZE * N_BUCKETS);

/**
* Builds a new peer table, where distance is calculated using the provided nodeId as a baseline.
Expand Down Expand Up @@ -97,13 +104,17 @@ public Optional<DiscoveryPeer> get(final PeerId peer) {
* <li>the operation failed because the k-bucket was full, in which case a candidate is proposed
* for eviction.
* <li>the operation failed because the peer already existed.
* <li>the operation failed because the IP address is invalid.
* </ul>
*
* @param peer The peer to add.
* @return An object indicating the outcome of the operation.
* @see AddOutcome
*/
public AddResult tryAdd(final DiscoveryPeer peer) {
if (ipAddressIsInvalid(peer.getEndpoint())) {
return AddResult.invalid();
}
final Bytes id = peer.getId();
final int distance = distanceFrom(peer);

Expand All @@ -129,6 +140,7 @@ public AddResult tryAdd(final DiscoveryPeer peer) {
if (!res.isPresent()) {
idBloom.put(id);
distanceCache.put(id, distance);
ipAddressCheckMap.put(getKey(peer.getEndpoint()), peer.getEndpoint().getUdpPort());
return AddResult.added();
}

Expand Down Expand Up @@ -200,6 +212,34 @@ public Stream<DiscoveryPeer> streamAllPeers() {
return Arrays.stream(table).flatMap(e -> e.getPeers().stream());
}

boolean ipAddressIsInvalid(final Endpoint endpoint) {
final String key = getKey(endpoint);
if (invalidIPs.contains(key)) {
return true;
}
if (ipAddressCheckMap.containsKey(key) && ipAddressCheckMap.get(key) != endpoint.getUdpPort()) {
// This peer has multiple discovery services on the same IP address + TCP port.
invalidIPs.add(key);
for (final Bucket bucket : table) {
bucket.getPeers().stream()
.filter(p -> p.getEndpoint().getHost().equals(endpoint.getHost()))
.forEach(p -> evictAndStore(p, bucket, key));
}
return true;
} else {
return false;
}
}

private void evictAndStore(final DiscoveryPeer peer, final Bucket bucket, final String key) {
bucket.evict(peer);
invalidIPs.add(key);
}

private static String getKey(final Endpoint endpoint) {
return endpoint.getHost() + endpoint.getFunctionalTcpPort();
}

/**
* Calculates the XOR distance between the keccak-256 hashes of our node ID and the provided
* {@link DiscoveryPeer}.
Expand All @@ -216,6 +256,7 @@ private int distanceFrom(final PeerId peer) {

/** A class that encapsulates the result of a peer addition to the table. */
public static class AddResult {

/** The outcome of the operation. */
public enum AddOutcome {

Expand All @@ -229,7 +270,10 @@ public enum AddOutcome {
ALREADY_EXISTED,

/** The caller requested to add ourselves. */
SELF
SELF,

/** The peer was not added because the IP address is invalid. */
INVALID
}

private final AddOutcome outcome;
Expand All @@ -256,6 +300,10 @@ static AddResult self() {
return new AddResult(AddOutcome.SELF, null);
}

public static AddResult invalid() {
return new AddResult((AddOutcome.INVALID), null);
}

public AddOutcome getOutcome() {
return outcome;
}
Expand All @@ -265,6 +313,20 @@ public Peer getEvictionCandidate() {
}
}

private static class LinkedHashMapWithMaximumSize<K, V> extends LinkedHashMap<K, V> {
private final int maxSize;

public LinkedHashMapWithMaximumSize(final int maxSize) {
super(maxSize, 0.75f, false);
this.maxSize = maxSize;
}

@Override
protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
return size() > maxSize;
}
}

static class EvictResult {
public enum EvictOutcome {
EVICTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ private void neighboursCancelOutstandingRequests() {
private boolean satisfiesMapAdditionCriteria(final DiscoveryPeer discoPeer) {
return !oneTrueMap.containsKey(discoPeer.getId())
&& (initialPeers.contains(discoPeer) || !peerTable.get(discoPeer).isPresent())
&& !discoPeer.getId().equals(localPeer.getId());
&& !discoPeer.getId().equals(localPeer.getId())
&& !peerTable.ipAddressIsInvalid(discoPeer.getEndpoint());
}

void onNeighboursReceived(final DiscoveryPeer peer, final List<DiscoveryPeer> peers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,62 @@ public void evictSelfPeerShouldReturnSelfOutcome() {
final EvictResult evictResult = table.tryEvict(peer);
assertThat(evictResult.getOutcome()).isEqualTo(EvictOutcome.SELF);
}

@Test
public void ipAddressIsInvalidReturnsTrue() {
final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1)));
final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(1)));
final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1);
final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2);
final PeerTable table = new PeerTable(Bytes.random(64));

final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());

assertThat(table.ipAddressIsInvalid(peer2.getEndpoint())).isEqualTo(true);
}

@Test
public void ipAddressIsInvalidReturnsFalse() {
final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1)));
final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(2)));
final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1);
final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2);
final PeerTable table = new PeerTable(Bytes.random(64));

final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());

assertThat(table.ipAddressIsInvalid(peer2.getEndpoint())).isEqualTo(false);
}

@Test
public void invalidIPAddressNotAdded() {
final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1)));
final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(1)));
final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1);
final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2);
final PeerTable table = new PeerTable(Bytes.random(64));

final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());

final PeerTable.AddResult addResult2 = table.tryAdd(peer2);
assertThat(addResult2.getOutcome()).isEqualTo(PeerTable.AddResult.invalid().getOutcome());
}

@Test
public void validIPAddressAdded() {
final Endpoint endpoint1 = new Endpoint("1.1.1.1", 2, Optional.of(Integer.valueOf(1)));
final Endpoint endpoint2 = new Endpoint("1.1.1.1", 3, Optional.of(Integer.valueOf(2)));
final DiscoveryPeer peer1 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint1);
final DiscoveryPeer peer2 = DiscoveryPeer.fromIdAndEndpoint(Peer.randomId(), endpoint2);
final PeerTable table = new PeerTable(Bytes.random(64));

final PeerTable.AddResult addResult1 = table.tryAdd(peer1);
assertThat(addResult1.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());

final PeerTable.AddResult addResult2 = table.tryAdd(peer2);
assertThat(addResult2.getOutcome()).isEqualTo(PeerTable.AddResult.added().getOutcome());
}
}
1 change: 1 addition & 0 deletions gradle/versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ dependencyManagement {
dependency 'org.apache.commons:commons-compress:1.26.0'
dependency 'org.apache.commons:commons-lang3:3.14.0'
dependency 'org.apache.commons:commons-text:1.11.0'
dependency 'org.apache.commons:commons-collections4:4.4'

dependencySet(group: 'org.apache.logging.log4j', version: '2.22.1') {
entry 'log4j-api'
Expand Down

0 comments on commit 4c986cd

Please sign in to comment.