Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
fix: remove abortable-iterator and close socket directly on abort (#220)
Browse files Browse the repository at this point in the history
Usage of `abortable-iterator` has a non-negligible cost, see libp2p/js-libp2p#1420 so remove it and close the socket directly when the abort signal fires.

Co-authored-by: achingbrain <[email protected]>
  • Loading branch information
dapplion and achingbrain committed Dec 13, 2022
1 parent 1bfc601 commit 28fe750
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 10 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@
"@libp2p/utils": "^3.0.2",
"@multiformats/mafmt": "^11.0.3",
"@multiformats/multiaddr": "^11.0.0",
"abortable-iterator": "^4.0.2",
"err-code": "^3.0.1",
"stream-to-it": "^0.2.2"
},
Expand Down
23 changes: 21 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class TCP implements Transport {
async dial (ma: Multiaddr, options: TCPDialOptions): Promise<Connection> {
options.keepAlive = options.keepAlive ?? true

// options.signal destroys the socket before 'connect' event
const socket = await this._connect(ma, options)

// Avoid uncaught errors caused by unstable connections
Expand All @@ -103,14 +104,32 @@ class TCP implements Transport {

const maConn = toMultiaddrConnection(socket, {
remoteAddr: ma,
signal: options.signal,
socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.metrics?.dialerEvents
})

const onAbort = () => {
maConn.close().catch(err => {
log.error('Error closing maConn after abort', err)
})
}
options.signal?.addEventListener('abort', onAbort, { once: true })

log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection upgraded %s', maConn.remoteAddr)
log('outbound connection %s upgraded', maConn.remoteAddr)

options.signal?.removeEventListener('abort', onAbort)

if (options.signal?.aborted === true) {
conn.close().catch(err => {
log.error('Error closing conn after abort', err)
})

throw new AbortError()
}

return conn
}

Expand Down
8 changes: 1 addition & 7 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { abortableSource } from 'abortable-iterator'
import { logger } from '@libp2p/logger'
// @ts-expect-error no types
import toIterable from 'stream-to-it'
Expand All @@ -17,7 +16,6 @@ interface ToConnectionOptions {
listeningAddr?: Multiaddr
remoteAddr?: Multiaddr
localAddr?: Multiaddr
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: CounterGroup
Expand Down Expand Up @@ -99,10 +97,6 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio

const maConn: MultiaddrConnection = {
async sink (source) {
if ((options?.signal) != null) {
source = abortableSource(source, options.signal)
}

try {
await sink(source)
} catch (err: any) {
Expand All @@ -119,7 +113,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
socket.end()
},

source: (options.signal != null) ? abortableSource(source, options.signal) : source,
source,

// If the remote address was passed, use it - it may have the peer ID encapsulated
remoteAddr,
Expand Down
56 changes: 56 additions & 0 deletions test/listen-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import all from 'it-all'
import { mockRegistrar, mockUpgrader } from '@libp2p/interface-mocks'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { Transport, Upgrader } from '@libp2p/interface-transport'
import pDefer from 'p-defer'
import type { MultiaddrConnection } from '@libp2p/interface-connection'

const isCI = process.env.CI

Expand All @@ -20,6 +22,7 @@ describe('listen', () => {
transport = tcp()()
upgrader = mockUpgrader()
})

afterEach(async () => {
try {
if (listener != null) {
Expand Down Expand Up @@ -326,4 +329,57 @@ describe('dial', () => {
await conn.close()
await listener.close()
})

it('aborts during dial', async () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const maConnPromise = pDefer<MultiaddrConnection>()

// @ts-expect-error missing return value
upgrader.upgradeOutbound = async (maConn) => {
maConnPromise.resolve(maConn)

// take a long time to give us time to abort the dial
await new Promise<void>((resolve) => {
setTimeout(() => resolve(), 100)
})
}

const listener = transport.createListener({
upgrader
})
await listener.listen(ma)

const abortController = new AbortController()

// abort once the upgrade process has started
void maConnPromise.promise.then(() => abortController.abort())

await expect(transport.dial(ma, {
upgrader,
signal: abortController.signal
})).to.eventually.be.rejected('The operation was aborted')

await expect(maConnPromise.promise).to.eventually.have.nested.property('timeline.close')
.that.is.ok('did not gracefully close maConn')

await listener.close()
})

it('aborts before dial', async () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = transport.createListener({
upgrader
})
await listener.listen(ma)

const abortController = new AbortController()
abortController.abort()

await expect(transport.dial(ma, {
upgrader,
signal: abortController.signal
})).to.eventually.be.rejected('The operation was aborted')

await listener.close()
})
})

0 comments on commit 28fe750

Please sign in to comment.