Skip to content

Commit

Permalink
fix: explicitly close streams when connnections close (#1221)
Browse files Browse the repository at this point in the history
Make sure we don't leave streams open.

Updates all deps to close multiplexed streams when closing connections.
  • Loading branch information
achingbrain authored May 23, 2022
1 parent 35f9c0c commit b09eb8f
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 16 deletions.
1 change: 1 addition & 0 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"dependencies": {
"@libp2p/pubsub-peer-discovery": "^5.0.2",
"@libp2p/floodsub": "^1.0.6",
"@nodeutils/defaults-deep": "^1.1.0",
"execa": "^2.1.0",
"fs-extra": "^8.1.0",
"libp2p": "../",
Expand Down
13 changes: 5 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@
},
"dependencies": {
"@achingbrain/nat-port-mapper": "^1.0.3",
"@libp2p/connection": "^1.1.5",
"@libp2p/connection": "^2.0.2",
"@libp2p/crypto": "^0.22.11",
"@libp2p/interfaces": "^1.3.31",
"@libp2p/interfaces": "^2.0.1",
"@libp2p/logger": "^1.1.4",
"@libp2p/multistream-select": "^1.0.4",
"@libp2p/peer-collections": "^1.0.2",
Expand Down Expand Up @@ -153,24 +153,21 @@
"@libp2p/delegated-content-routing": "^1.0.2",
"@libp2p/delegated-peer-routing": "^1.0.2",
"@libp2p/floodsub": "^1.0.6",
"@libp2p/interface-compliance-tests": "^1.1.32",
"@libp2p/interface-compliance-tests": "^2.0.1",
"@libp2p/interop": "^1.0.3",
"@libp2p/kad-dht": "^1.0.9",
"@libp2p/mdns": "^1.0.5",
"@libp2p/mplex": "^1.0.4",
"@libp2p/mplex": "^1.1.0",
"@libp2p/pubsub": "^1.2.18",
"@libp2p/tcp": "^1.0.9",
"@libp2p/topology": "^1.1.7",
"@libp2p/webrtc-star": "^1.0.8",
"@libp2p/websockets": "^1.0.7",
"@nodeutils/defaults-deep": "^1.1.0",
"@types/node": "^16.11.26",
"@types/node-forge": "^1.0.0",
"@types/p-fifo": "^1.0.0",
"@types/varint": "^6.0.0",
"@types/xsalsa20": "^1.1.0",
"aegir": "^37.0.9",
"buffer": "^6.0.3",
"cborg": "^1.8.1",
"delay": "^5.0.0",
"execa": "^6.1.0",
Expand All @@ -187,7 +184,7 @@
"p-wait-for": "^4.1.0",
"protons": "^3.0.4",
"rimraf": "^3.0.2",
"sinon": "^13.0.1",
"sinon": "^14.0.0",
"ts-sinon": "^2.0.2"
},
"browser": {
Expand Down
1 change: 0 additions & 1 deletion src/connection-manager/dialer/dial-request.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import errCode from 'err-code'
import { anySignal } from 'any-signal'
import FIFO from 'p-fifo'
// @ts-expect-error setMaxListeners is missing from the node 16 types
import { setMaxListeners } from 'events'
import { codes } from '../../errors.js'
import { logger } from '@libp2p/logger'
Expand Down
1 change: 0 additions & 1 deletion src/connection-manager/dialer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { Multiaddr, Resolver } from '@multiformats/multiaddr'
import { TimeoutController } from 'timeout-abort-controller'
import { AbortError } from '@libp2p/interfaces/errors'
import { anySignal } from 'any-signal'
// @ts-expect-error setMaxListeners is missing from the node 16 types
import { setMaxListeners } from 'events'
import { DialAction, DialRequest } from './dial-request.js'
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
Expand Down
11 changes: 8 additions & 3 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type { Startable } from '@libp2p/interfaces/startable'
import { trackedMap } from '@libp2p/tracked-map'
import { codes } from '../errors.js'
import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id'
// @ts-expect-error setMaxListeners is missing from the node 16 types
import { setMaxListeners } from 'events'
import type { Connection } from '@libp2p/interfaces/connection'
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
Expand Down Expand Up @@ -254,10 +253,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
*/
async _close () {
// Close all connections we're tracking
const tasks = []
const tasks: Array<Promise<void>> = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
tasks.push((async () => {
try {
await connection.close()
} catch (err) {
log.error(err)
}
})())
}
}

Expand Down
1 change: 0 additions & 1 deletion src/peer-routing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
clearDelayedInterval
// @ts-expect-error module with no types
} from 'set-delayed-interval'
// @ts-expect-error setMaxListeners is missing from the node 16 types
import { setMaxListeners } from 'events'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
Expand Down
6 changes: 4 additions & 2 deletions src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
getStreams: () => muxer != null ? muxer.streams : errConnectionNotMultiplexed(),
close: async () => {
await maConn.close()
// Ensure remaining streams are aborted
// Ensure remaining streams are closed
if (muxer != null) {
muxer.streams.map(stream => stream.abort())
await Promise.all(muxer.streams.map(async stream => {
await stream.close()
}))
}
}
})
Expand Down

0 comments on commit b09eb8f

Please sign in to comment.