Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Introduce peer populator - Closes #3648 #3764

Merged
merged 17 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions commander/test/utils/core/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@ export const config = {
access: {
whiteList: ['127.0.0.1'],
},
enabled: true,
httpPort: 4000,
address: '0.0.0.0',
trustProxy: false,
ssl: {
enabled: false,
options: {
port: 443,
address: '0.0.0.0',
key: './ssl/lisk.key',
cert: './ssl/lisk.crt',
},
},
options: {
limits: {
max: 0,
delayMs: 0,
delayAfter: 0,
windowMs: 60000,
headersTimeout: 5000,
serverSetTimeout: 20000,
},
cors: {
origin: '*',
methods: ['GET', 'POST', 'PUT'],
},
},
forging: {
access: {
whiteList: ['127.0.0.1'],
},
},
},
network: {
seedPeers: [
{
ip: '127.0.0.1',
wsPort: 5000,
},
],
wsPort: 5000,
address: '0.0.0.0',
discoveryInterval: 30000,
populatorInterval: 10000,
blacklistedPeers: [],
ackTimeout: 20000,
connectTimeout: 5000,
wsEngine: 'ws',
},
},
network: {
Expand All @@ -81,6 +129,7 @@ export const config = {
wsPort: 5000,
address: '0.0.0.0',
discoveryInterval: 30000,
populatorInterval: 10000,
blacklistedPeers: [],
ackTimeout: 20000,
connectTimeout: 5000,
Expand Down
39 changes: 37 additions & 2 deletions elements/lisk-p2p/src/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ export const EVENT_NEW_PEER = 'newPeer';
export const NODE_HOST_IP = '0.0.0.0';
export const DEFAULT_DISCOVERY_INTERVAL = 30000;
export const DEFAULT_BAN_TIME = 86400;
export const DEFAULT_POPULATOR_INTERVAL = 10000;
diego-G marked this conversation as resolved.
Show resolved Hide resolved
export const DEFAULT_SEND_PEER_LIMIT = 25;

const BASE_10_RADIX = 10;
const MAX_OUTBOUND_CONNECTIONS = 20;
diego-G marked this conversation as resolved.
Show resolved Hide resolved

const selectRandomPeerSample = (
peerList: ReadonlyArray<P2PDiscoveredPeerInfo>,
Expand All @@ -133,6 +135,8 @@ export class P2P extends EventEmitter {
private readonly _bannedPeers: Set<string>;
private readonly _discoveryInterval: number;
private _discoveryIntervalId: NodeJS.Timer | undefined;
private readonly _populatorInterval: number;
private _populatorIntervalId: NodeJS.Timer | undefined;

private _nodeInfo: P2PNodeInfo;
private readonly _peerPool: PeerPool;
Expand Down Expand Up @@ -332,6 +336,9 @@ export class P2P extends EventEmitter {
? DEFAULT_SEND_PEER_LIMIT
: config.sendPeerLimit,
peerBanTime: config.peerBanTime ? config.peerBanTime : DEFAULT_BAN_TIME,
maxOutboundConnections: config.maxOutboundConnections
diego-G marked this conversation as resolved.
Show resolved Hide resolved
? config.maxOutboundConnections
: MAX_OUTBOUND_CONNECTIONS,
});

this._bindHandlersToPeerPool(this._peerPool);
Expand All @@ -352,6 +359,10 @@ export class P2P extends EventEmitter {
? config.discoveryInterval
: DEFAULT_DISCOVERY_INTERVAL;

this._populatorInterval = config.populatorInterval
? config.populatorInterval
: DEFAULT_POPULATOR_INTERVAL;

this._peerHandshakeCheck = config.peerHandshakeCheck
? config.peerHandshakeCheck
: checkPeerCompatibility;
Expand Down Expand Up @@ -641,8 +652,6 @@ export class P2P extends EventEmitter {
this._newPeers.set(peerId, peerInfo);
}
});

this._peerPool.selectPeersAndConnect([...this._newPeers.values()]);
}

private async _startDiscovery(): Promise<void> {
Expand All @@ -663,6 +672,30 @@ export class P2P extends EventEmitter {
clearInterval(this._discoveryIntervalId);
}

private _startPopulator(): void {
if (this._populatorIntervalId) {
throw new Error('Populator is already running');
}
this._populatorIntervalId = setInterval(() => {
this._peerPool.triggerNewConnections([
...this._newPeers.values(),
...this._triedPeers.values(),
]);
}, this._populatorInterval);

this._peerPool.triggerNewConnections([
...this._newPeers.values(),
...this._triedPeers.values(),
]);
}

private _stopPopulator(): void {
if (!this._populatorIntervalId) {
throw new Error('Populator is not running');
}
clearInterval(this._populatorIntervalId);
}

private async _fetchSeedPeerStatus(
seedPeers: ReadonlyArray<P2PPeerInfo>,
): Promise<ReadonlyArray<P2PDiscoveredPeerInfo>> {
Expand Down Expand Up @@ -747,13 +780,15 @@ export class P2P extends EventEmitter {
});

await this._startDiscovery();
this._startPopulator();
}

public async stop(): Promise<void> {
if (!this._isActive) {
throw new Error('Cannot stop the node because it is not active');
}
this._stopDiscovery();
this._stopPopulator();
this._peerPool.removeAllPeers();
await this._stopPeerServer();
}
Expand Down
7 changes: 7 additions & 0 deletions elements/lisk-p2p/src/p2p_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export interface P2PPeerInfo {
readonly wsPort: number;
}

export interface P2PPeersCount {
readonly outbound: number;
readonly inbound: number;
}

export interface P2PDiscoveredPeerInfo extends P2PPeerInfo {
readonly height: number;
readonly updatedAt?: Date;
Expand Down Expand Up @@ -84,6 +89,8 @@ export interface P2PConfig {
readonly nodeInfo: P2PNodeInfo;
readonly wsEngine?: string;
readonly discoveryInterval?: number;
readonly populatorInterval?: number;
readonly maxOutboundConnections: number;
readonly peerSelectionForSend?: P2PPeerSelectionForSendFunction;
readonly peerSelectionForRequest?: P2PPeerSelectionForRequestFunction;
readonly peerSelectionForConnection?: P2PPeerSelectionForConnectionFunction;
Expand Down
50 changes: 38 additions & 12 deletions elements/lisk-p2p/src/peer_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
P2PMessagePacket,
P2PNodeInfo,
P2PPeerInfo,
P2PPeersCount,
P2PPeerSelectionForConnectionFunction,
P2PPeerSelectionForRequestFunction,
P2PPeerSelectionForSendFunction,
Expand Down Expand Up @@ -87,6 +88,7 @@ interface PeerPoolConfig {
readonly peerSelectionForConnection: P2PPeerSelectionForConnectionFunction;
readonly sendPeerLimit: number;
readonly peerBanTime?: number;
readonly maxOutboundConnections: number;
}

export const MAX_PEER_LIST_BATCH_SIZE = 100;
Expand Down Expand Up @@ -123,6 +125,7 @@ export class PeerPool extends EventEmitter {
private readonly _handleBanPeer: (peerId: string) => void;
private readonly _handleUnbanPeer: (peerId: string) => void;
private _nodeInfo: P2PNodeInfo | undefined;
private readonly _maxOutboundConnections: number;
private readonly _peerSelectForSend: P2PPeerSelectionForSendFunction;
private readonly _peerSelectForRequest: P2PPeerSelectionForRequestFunction;
private readonly _peerSelectForConnection: P2PPeerSelectionForConnectionFunction;
Expand All @@ -135,6 +138,7 @@ export class PeerPool extends EventEmitter {
this._peerSelectForSend = peerPoolConfig.peerSelectionForSend;
this._peerSelectForRequest = peerPoolConfig.peerSelectionForRequest;
this._peerSelectForConnection = peerPoolConfig.peerSelectionForConnection;
this._maxOutboundConnections = peerPoolConfig.maxOutboundConnections;
this._sendPeerLimit = peerPoolConfig.sendPeerLimit;

// This needs to be an arrow function so that it can be used as a listener.
Expand Down Expand Up @@ -239,9 +243,7 @@ export class PeerPool extends EventEmitter {
return this._nodeInfo;
}

public async request(
packet: P2PRequestPacket,
): Promise<P2PResponsePacket> {
public async request(packet: P2PRequestPacket): Promise<P2PResponsePacket> {
const listOfPeerInfo = [...this._peerMap.values()].map(
(peer: Peer) => peer.peerInfo,
);
Expand Down Expand Up @@ -373,18 +375,27 @@ export class PeerPool extends EventEmitter {
return discoveredPeers;
}

public selectPeersAndConnect(
public triggerNewConnections(
peers: ReadonlyArray<P2PDiscoveredPeerInfo>,
diego-G marked this conversation as resolved.
Show resolved Hide resolved
): ReadonlyArray<P2PDiscoveredPeerInfo> {
const peersToConnect = this._peerSelectForConnection({ peers });

peersToConnect.forEach((peerInfo: P2PDiscoveredPeerInfo) => {
const peerId = constructPeerIdFromPeerInfo(peerInfo);
): void {
const peersCount = this.getPeersCountByKind();
// Trigger new connections only if the maximum of outbound connections has not been reached
if (peersCount.outbound < this._maxOutboundConnections) {
diego-G marked this conversation as resolved.
Show resolved Hide resolved
// Try to connect to as many peers as possible without surpassing the maximum
const shuffledPeers = shuffle(peers).slice(
0,
this._maxOutboundConnections - peersCount.outbound,
);

return this.addOutboundPeer(peerId, peerInfo);
});
const peersToConnect = this._peerSelectForConnection({
peers: shuffledPeers,
});
peersToConnect.forEach((peerInfo: P2PDiscoveredPeerInfo) => {
const peerId = constructPeerIdFromPeerInfo(peerInfo);

return peersToConnect;
return this.addOutboundPeer(peerId, peerInfo);
diego-G marked this conversation as resolved.
Show resolved Hide resolved
});
}
}

public addInboundPeer(
Expand Down Expand Up @@ -439,6 +450,21 @@ export class PeerPool extends EventEmitter {
return peer;
}

public getPeersCountByKind(): P2PPeersCount {
const peersCount = { outbound: 0, inbound: 0 };
diego-G marked this conversation as resolved.
Show resolved Hide resolved
this._peerMap.forEach((peer: Peer) => {
if (peer instanceof OutboundPeer) {
return (peersCount.outbound += 1);
} else if (peer instanceof InboundPeer) {
return (peersCount.inbound += 1);
}

throw new Error('A non-identified peer exists in the pool.');
});

return peersCount;
}

public removeAllPeers(): void {
this._peerMap.forEach((peer: Peer) => {
this.removePeer(peer.id);
Expand Down
Loading