Skip to content

Commit

Permalink
feat!: close streams gracefully (#344)
Browse files Browse the repository at this point in the history
* feat!: close streams gracefully

- Updates all libp2p related deps
- Stream close methods are now async

BREAKING CHANGE: stream close methods are now asyc, requires [email protected] or later

* chore: update yamux
  • Loading branch information
achingbrain authored Aug 3, 2023
1 parent e0f8ff4 commit b267e7a
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 87 deletions.
41 changes: 20 additions & 21 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,40 @@
"prepublish": "npm run build"
},
"dependencies": {
"@libp2p/crypto": "^1.0.11",
"@libp2p/interface-connection-encrypter": "^4.0.0",
"@libp2p/interface-keys": "^1.0.6",
"@libp2p/interface-metrics": "^4.0.4",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/logger": "^2.0.5",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/crypto": "^2.0.0",
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/peer-id": "^3.0.0",
"@noble/ciphers": "^0.1.4",
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"@noble/ciphers": "^0.1.4",
"it-byte-stream": "^1.0.0",
"it-length-prefixed": "^9.0.1",
"it-pair": "^2.0.2",
"it-pb-stream": "^4.0.1",
"it-length-prefixed-stream": "^1.0.0",
"it-pair": "^2.0.6",
"it-pipe": "^3.0.1",
"it-stream-types": "^2.0.1",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^4.0.2"
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"@chainsafe/libp2p-yamux": "^4.0.1",
"@libp2p/daemon-client": "^6.0.3",
"@libp2p/daemon-server": "^5.0.2",
"@libp2p/interface-connection-encrypter-compliance-tests": "^5.0.0",
"@libp2p/interop": "^8.0.1",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/tcp": "^7.0.0",
"@chainsafe/libp2p-yamux": "^5.0.0",
"@libp2p/daemon-client": "^7.0.0",
"@libp2p/daemon-server": "^6.0.0",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interop": "^9.0.0",
"@libp2p/peer-id-factory": "^3.0.0",
"@libp2p/tcp": "^8.0.0",
"@multiformats/multiaddr": "^12.1.0",
"@types/sinon": "^10.0.14",
"aegir": "^39.0.5",
"aegir": "^40.0.8",
"benchmark": "^2.1.4",
"execa": "^7.0.0",
"go-libp2p": "^1.0.3",
"iso-random-stream": "^2.0.2",
"libp2p": "0.45.0",
"libp2p": "^0.46.0",
"mkdirp": "^3.0.0",
"p-defer": "^4.0.0",
"protons": "^7.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/@types/handshake-interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { bytes } from './basic.js'
import type { NoiseSession } from './handshake.js'
import type { NoiseExtensions } from '../proto/payload.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface/peer-id'

export interface IHandshake {
session: NoiseSession
Expand Down
2 changes: 1 addition & 1 deletion src/@types/libp2p.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { bytes32 } from './basic.js'
import type { NoiseExtensions } from '../proto/payload.js'
import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter'
import type { ConnectionEncrypter } from '@libp2p/interface/connection-encrypter'

export interface KeyPair {
publicKey: bytes32
Expand Down
22 changes: 11 additions & 11 deletions src/handshake-xx.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidCryptoExchangeError, UnexpectedPeerError } from '@libp2p/interface-connection-encrypter/errors'
import { InvalidCryptoExchangeError, UnexpectedPeerError } from '@libp2p/interface/errors'
import { decode0, decode1, decode2, encode0, encode1, encode2 } from './encoder.js'
import { XX } from './handshakes/xx.js'
import {
Expand All @@ -20,8 +20,8 @@ import type { CipherState, NoiseSession } from './@types/handshake.js'
import type { KeyPair } from './@types/libp2p.js'
import type { ICryptoInterface } from './crypto.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { ProtobufStream } from 'it-pb-stream'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { LengthPrefixedStream } from 'it-length-prefixed-stream'

export class XXHandshake implements IHandshake {
public isInitiator: boolean
Expand All @@ -30,7 +30,7 @@ export class XXHandshake implements IHandshake {
public remoteExtensions: NoiseExtensions = { webtransportCerthashes: [] }

protected payload: bytes
protected connection: ProtobufStream
protected connection: LengthPrefixedStream
protected xx: XX
protected staticKeypair: KeyPair

Expand All @@ -42,7 +42,7 @@ export class XXHandshake implements IHandshake {
prologue: bytes32,
crypto: ICryptoInterface,
staticKeypair: KeyPair,
connection: ProtobufStream,
connection: LengthPrefixedStream,
remotePeer?: PeerId,
handshake?: XX
) {
Expand All @@ -64,12 +64,12 @@ export class XXHandshake implements IHandshake {
if (this.isInitiator) {
logger.trace('Stage 0 - Initiator starting to send first message.')
const messageBuffer = this.xx.sendMessage(this.session, new Uint8Array(0))
this.connection.writeLP(encode0(messageBuffer))
await this.connection.write(encode0(messageBuffer))
logger.trace('Stage 0 - Initiator finished sending first message.')
logLocalEphemeralKeys(this.session.hs.e)
} else {
logger.trace('Stage 0 - Responder waiting to receive first message...')
const receivedMessageBuffer = decode0((await this.connection.readLP()).subarray())
const receivedMessageBuffer = decode0((await this.connection.read()).subarray())
const { valid } = this.xx.recvMessage(this.session, receivedMessageBuffer)
if (!valid) {
throw new InvalidCryptoExchangeError('xx handshake stage 0 validation fail')
Expand All @@ -83,7 +83,7 @@ export class XXHandshake implements IHandshake {
public async exchange (): Promise<void> {
if (this.isInitiator) {
logger.trace('Stage 1 - Initiator waiting to receive first message from responder...')
const receivedMessageBuffer = decode1((await this.connection.readLP()).subarray())
const receivedMessageBuffer = decode1((await this.connection.read()).subarray())
const { plaintext, valid } = this.xx.recvMessage(this.session, receivedMessageBuffer)
if (!valid) {
throw new InvalidCryptoExchangeError('xx handshake stage 1 validation fail')
Expand All @@ -106,7 +106,7 @@ export class XXHandshake implements IHandshake {
} else {
logger.trace('Stage 1 - Responder sending out first message with signed payload and static key.')
const messageBuffer = this.xx.sendMessage(this.session, this.payload)
this.connection.writeLP(encode1(messageBuffer))
await this.connection.write(encode1(messageBuffer))
logger.trace('Stage 1 - Responder sent the second handshake message with signed payload.')
logLocalEphemeralKeys(this.session.hs.e)
}
Expand All @@ -117,11 +117,11 @@ export class XXHandshake implements IHandshake {
if (this.isInitiator) {
logger.trace('Stage 2 - Initiator sending third handshake message.')
const messageBuffer = this.xx.sendMessage(this.session, this.payload)
this.connection.writeLP(encode2(messageBuffer))
await this.connection.write(encode2(messageBuffer))
logger.trace('Stage 2 - Initiator sent message with signed payload.')
} else {
logger.trace('Stage 2 - Responder waiting for third handshake message...')
const receivedMessageBuffer = decode2((await this.connection.readLP()).subarray())
const receivedMessageBuffer = decode2((await this.connection.read()).subarray())
const { plaintext, valid } = this.xx.recvMessage(this.session, receivedMessageBuffer)
if (!valid) {
throw new InvalidCryptoExchangeError('xx handshake stage 2 validation fail')
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Noise } from './noise.js'
import type { NoiseInit } from './noise.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter'
import type { ConnectionEncrypter } from '@libp2p/interface/connection-encrypter'
export type { ICryptoInterface } from './crypto.js'
export { pureJsCrypto } from './crypto/js.js'

Expand Down
2 changes: 1 addition & 1 deletion src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Counter, Metrics } from '@libp2p/interface-metrics'
import type { Counter, Metrics } from '@libp2p/interface/metrics'

export type MetricsRegistry = Record<string, Counter>

Expand Down
16 changes: 8 additions & 8 deletions src/noise.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { decode } from 'it-length-prefixed'
import { lpStream, type LengthPrefixedStream } from 'it-length-prefixed-stream'
import { duplexPair } from 'it-pair/duplex'
import { pbStream, type ProtobufStream } from 'it-pb-stream'
import { pipe } from 'it-pipe'
import { NOISE_MSG_MAX_LENGTH_BYTES } from './constants.js'
import { pureJsCrypto } from './crypto/js.js'
Expand All @@ -14,13 +14,13 @@ import type { IHandshake } from './@types/handshake-interface.js'
import type { INoiseConnection, KeyPair } from './@types/libp2p.js'
import type { ICryptoInterface } from './crypto.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { Metrics } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { SecuredConnection } from '@libp2p/interface/connection-encrypter'
import type { Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Duplex, Source } from 'it-stream-types'

interface HandshakeParams {
connection: ProtobufStream
connection: LengthPrefixedStream
isInitiator: boolean
localPeer: PeerId
remotePeer?: PeerId
Expand Down Expand Up @@ -71,7 +71,7 @@ export class Noise implements INoiseConnection {
* @returns {Promise<SecuredConnection>}
*/
public async secureOutbound (localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<NoiseExtensions>> {
const wrappedConnection = pbStream(
const wrappedConnection = lpStream(
connection,
{
lengthEncoder: uint16BEEncode,
Expand Down Expand Up @@ -103,7 +103,7 @@ export class Noise implements INoiseConnection {
* @returns {Promise<SecuredConnection>}
*/
public async secureInbound (localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<NoiseExtensions>> {
const wrappedConnection = pbStream(
const wrappedConnection = lpStream(
connection,
{
lengthEncoder: uint16BEEncode,
Expand Down Expand Up @@ -171,7 +171,7 @@ export class Noise implements INoiseConnection {
}

private async createSecureConnection (
connection: ProtobufStream<Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>>,
connection: LengthPrefixedStream<Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>>,
handshake: IHandshake
): Promise<Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>> {
// Create encryption box/unbox wrapper
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { type NoiseExtensions, NoiseHandshakePayload } from './proto/payload.js'
import type { bytes } from './@types/basic.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface/peer-id'

export async function getPayload (
localPeer: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion test/compliance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import tests from '@libp2p/interface-connection-encrypter-compliance-tests'
import tests from '@libp2p/interface-compliance-tests/connection-encryption'
import { Noise } from '../src/noise.js'

describe('spec compliance tests', function () {
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/peer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createEd25519PeerId, createFromJSON } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface/peer-id'

// ed25519 keys
const peers = [{
Expand Down
12 changes: 6 additions & 6 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { expect } from 'aegir/chai'
import { lpStream } from 'it-length-prefixed-stream'
import { duplexPair } from 'it-pair/duplex'
import { pbStream } from 'it-pb-stream'
import sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { noise } from '../src/index.js'
import { Noise } from '../src/noise.js'
import { createPeerIdsFromFixtures } from './fixtures/peer.js'
import type { Metrics } from '@libp2p/interface-metrics'
import type { Metrics } from '@libp2p/interface/metrics'

function createCounterSpy (): ReturnType<typeof sinon.spy> {
return sinon.spy({
Expand Down Expand Up @@ -41,11 +41,11 @@ describe('Index', () => {
noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer),
noiseResp.secureInbound(remotePeer, inboundConnection, localPeer)
])
const wrappedInbound = pbStream(inbound.conn)
const wrappedOutbound = pbStream(outbound.conn)
const wrappedInbound = lpStream(inbound.conn)
const wrappedOutbound = lpStream(outbound.conn)

wrappedOutbound.writeLP(uint8ArrayFromString('test'))
await wrappedInbound.readLP()
await wrappedOutbound.write(uint8ArrayFromString('test'))
await wrappedInbound.read()
expect(metricsRegistry.get('libp2p_noise_xxhandshake_successes_total')?.increment.callCount).to.equal(1)
expect(metricsRegistry.get('libp2p_noise_xxhandshake_error_total')?.increment.callCount).to.equal(0)
expect(metricsRegistry.get('libp2p_noise_encrypted_packets_total')?.increment.callCount).to.equal(1)
Expand Down
Loading

0 comments on commit b267e7a

Please sign in to comment.