Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Make switch a state machine #278

Merged
merged 27 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c47ab13
feat: add basic state machine functionality to switch
jacobheun Sep 5, 2018
a97c386
fix: linting
jacobheun Sep 5, 2018
b4d1602
refactor: move connection.js to connection-manager.js
jacobheun Sep 5, 2018
41e4a88
feat: add outgoing connection state machine
jacobheun Sep 11, 2018
5848e6d
feat: functioning incoming connection fsm
jacobheun Sep 25, 2018
45ae65b
fix: linting
jacobheun Sep 25, 2018
78203fd
fix: stats
jacobheun Sep 26, 2018
0b9f5f1
docs: remove notes
jacobheun Sep 26, 2018
ef92ee2
test: bump circuit shutdown timeout
jacobheun Sep 26, 2018
4b62917
fix: node 8 support
jacobheun Sep 26, 2018
7ab2151
feat: add class-is support for connections
jacobheun Sep 26, 2018
674d55c
refactor: clean up some logic and make inc muxed conns FSMs
jacobheun Sep 27, 2018
f4a1806
fix: cleanup todos, logic and event handlers
jacobheun Sep 27, 2018
dd12ad1
refactor: clean up logs
jacobheun Oct 2, 2018
56ea400
feat: add dialFSM to the switch
jacobheun Oct 3, 2018
2e1f7b5
refactor: rename test file
jacobheun Oct 3, 2018
75f1370
feat: add better support for closing connections
jacobheun Oct 3, 2018
0ee8157
test: add tests for some uncovered lines
jacobheun Oct 3, 2018
d921c2d
refactor: do some cleanup
jacobheun Oct 4, 2018
6351365
feat: add additional fsm user support
jacobheun Oct 5, 2018
3d469d0
feat: add warning emitter for muxer upgrade failed
jacobheun Oct 5, 2018
662a58f
refactor: cleanup and add some tests
jacobheun Oct 5, 2018
730c2b6
test: add test for failed muxer upgrade
jacobheun Oct 5, 2018
6dd8f39
test: add more error state tests for connectionfsm
jacobheun Oct 5, 2018
a2e995e
docs: update readme
jacobheun Oct 17, 2018
ef85443
docs: fix readme link
jacobheun Oct 17, 2018
8ff0375
docs: clean up readme and jsdocs
jacobheun Oct 19, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
"dependencies": {
"async": "^2.6.1",
"big.js": "^5.1.2",
"class-is": "^1.1.0",
"debug": "^3.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
Expand Down
91 changes: 91 additions & 0 deletions src/connection/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const withIs = require('class-is')

class BaseConnection extends EventEmitter {
constructor ({ _switch, name }) {
super()

this.switch = _switch
this.ourPeerInfo = this.switch._peerInfo
this.log = debug(`libp2p:conn:${name}`)
}

/**
* Gets the current state of the connection
*
* @returns {string} The current state of the connection
*/
getState () {
return this._state._state
}

/**
* Puts the state into encrypting mode
*
* @returns {void}
*/
encrypt () {
this._state('encrypt')
}

/**
* Puts the state into privatizing mode
*
* @returns {void}
*/
protect () {
this._state('privatize')
}

/**
* Puts the state into muxing mode
*
* @returns {void}
*/
upgrade () {
this._state('upgrade')
}

/**
* Event handler for disconneced.
*
* @returns {void}
*/
_onDisconnected () {
this.log(`disconnected from ${this.theirB58Id}`)
this.emit('close')
this.removeAllListeners()
}

/**
* Wraps this.conn with the Switch.protector for private connections
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onPrivatizing () {
if (!this.switch.protector) {
return this._state('done')
}

this.conn = this.switch.protector.protect(this.conn, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
}
}

module.exports = withIs(BaseConnection, {
className: 'BaseConnection',
symbolName: 'libp2p-switch/BaseConnection'
})
46 changes: 46 additions & 0 deletions src/connection/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const debug = require('debug')
const IncomingConnection = require('./incoming')
const observeConn = require('../observe-connection')

function listener (_switch) {
const log = debug(`libp2p:switch:listener`)

/**
* Takes a transport key and returns a connection handler function
*
* @param {string} transportKey The key of the transport to handle connections for
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return (transportKey, handler) => {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return (conn) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn

log('received incoming connection')
const connFSM = new IncomingConnection({ connection, _switch, transportKey })

connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.protect()
}
}
}

module.exports = listener
123 changes: 123 additions & 0 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
'use strict'

const FSM = require('fsm-event')
const multistream = require('multistream-select')
const withIs = require('class-is')

const BaseConnection = require('./base')

class IncomingConnectionFSM extends BaseConnection {
constructor ({ connection, _switch, transportKey }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
this.msListener = new multistream.Listener()

this._state = FSM('DIALED', {
DISCONNECTED: { },
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
},
PRIVATIZING: { // Protecting the base connection
done: 'PRIVATIZED',
disconnect: 'DISCONNECTING'
},
PRIVATIZED: { // Base connection is protected
encrypt: 'ENCRYPTING'
},
ENCRYPTING: { // Encrypting the base connection
done: 'ENCRYPTED',
disconnect: 'DISCONNECTING'
},
ENCRYPTED: { // Upgrading could not happen, the connection is encrypted and waiting
upgrade: 'UPGRADING',
disconnect: 'DISCONNECTING'
},
UPGRADING: { // Attempting to upgrade the connection with muxers
done: 'MUXED'
},
MUXED: {
disconnect: 'DISCONNECTING'
},
DISCONNECTING: { // Shutting down the connection
done: 'DISCONNECTED'
}
})

this._state.on('PRIVATIZING', () => this._onPrivatizing())
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log(`successfully encrypted connection to ${this.theirB58Id || 'unknown peer'}`)
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id || 'unknown peer'}`)
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}
this._state('done')
})
}

/**
* Gets the current state of the connection
*
* @returns {string} The current state of the connection
*/
getState () {
return this._state._state
}

// TODO: We need to handle N+1 crypto libraries
_onEncrypting () {
this.log(`encrypting connection via ${this.switch.crypto.tag}`)

this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
}
this.conn.getPeerInfo((_, peerInfo) => {
this.theirPeerInfo = peerInfo
this._state('done')
})
})
}, null)

// Start handling the connection
this.msListener.handle(this.conn, (err) => {
if (err) {
this.emit('crypto handshaking failed', err)
}
})
}

_onPrivatized () {
this.log(`successfully privatized incoming connection`)
this.emit('private', this.conn)
}

_onUpgrading () {
this.log('adding the protocol muxer to the connection')
this.protocolMuxer(this.conn, this.msListener)
this._state('done')
}
}

module.exports = withIs(IncomingConnectionFSM, {
className: 'IncomingConnectionFSM',
symbolName: 'libp2p-switch/IncomingConnectionFSM'
})
Loading