Skip to content

Commit

Permalink
fix: invoke onProgress callback with DHT queries during routing
Browse files Browse the repository at this point in the history
Allow passing an `onProgress` callback to the peer/content routers that can receive DHT query events.

Refs #1574
  • Loading branch information
achingbrain committed Nov 2, 2023
1 parent 16a8707 commit 61aa0ce
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 33 deletions.
1 change: 1 addition & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"progress-events": "^1.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3"
},
Expand Down
16 changes: 11 additions & 5 deletions packages/interface/src/content-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AbortOptions } from '../index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
* Any object that implements this Symbol as a property should return a
Expand All @@ -23,7 +24,12 @@ import type { CID } from 'multiformats/cid'
*/
export const contentRouting = Symbol.for('@libp2p/content-routing')

export interface ContentRouting {
export interface ContentRouting<
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* The implementation of this method should ensure that network peers know the
* caller can provide content that corresponds to the passed CID.
Expand All @@ -35,7 +41,7 @@ export interface ContentRouting {
* await contentRouting.provide(cid)
* ```
*/
provide(cid: CID, options?: AbortOptions): Promise<void>
provide(cid: CID, options?: AbortOptions & ProgressOptions<ProvideProgressEvents>): Promise<void>

/**
* Find the providers of the passed CID.
Expand All @@ -49,7 +55,7 @@ export interface ContentRouting {
* }
* ```
*/
findProviders(cid: CID, options?: AbortOptions): AsyncIterable<PeerInfo>
findProviders(cid: CID, options?: AbortOptions & ProgressOptions<FindProvidersProgressEvents>): AsyncIterable<PeerInfo>

/**
* Puts a value corresponding to the passed key in a way that can later be
Expand All @@ -65,7 +71,7 @@ export interface ContentRouting {
* await contentRouting.put(key, value)
* ```
*/
put(key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise<void>
put(key: Uint8Array, value: Uint8Array, options?: AbortOptions & ProgressOptions<PutProgressEvents>): Promise<void>

/**
* Retrieves a value from the network corresponding to the passed key.
Expand All @@ -79,5 +85,5 @@ export interface ContentRouting {
* const value = await contentRouting.get(key)
* ```
*/
get(key: Uint8Array, options?: AbortOptions): Promise<Uint8Array>
get(key: Uint8Array, options?: AbortOptions & ProgressOptions<GetProgressEvents>): Promise<Uint8Array>
}
55 changes: 48 additions & 7 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import type { StreamHandler, StreamHandlerOptions } from './stream-handler/index
import type { Topology } from './topology/index.js'
import type { Listener } from './transport/index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressEvent } from 'progress-events'

/**
* Used by the connection manager to sort addresses into order before dialling
Expand Down Expand Up @@ -113,7 +114,15 @@ export interface IdentifyResult {
* Event names are `noun:verb` so the first part is the name of the object
* being acted on and the second is the action.
*/
export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
export interface Libp2pEvents<
Services extends ServiceMap = ServiceMap,
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* This event is dispatched when a new network peer is discovered.
*
Expand Down Expand Up @@ -240,7 +249,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
* })
* ```
*/
'start': CustomEvent<Libp2p<T>>
'start': CustomEvent<Libp2p<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>>

/**
* This event notifies listeners that the node has stopped
Expand All @@ -251,7 +268,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
* })
* ```
*/
'stop': CustomEvent<Libp2p<T>>
'stop': CustomEvent<Libp2p<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>>
}

/**
Expand Down Expand Up @@ -308,7 +333,23 @@ export interface PendingDial {
/**
* Libp2p nodes implement this interface.
*/
export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, TypedEventTarget<Libp2pEvents<T>> {
export interface Libp2p<
Services extends ServiceMap = ServiceMap,
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> extends Startable, TypedEventTarget<Libp2pEvents<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>> {
/**
* The PeerId is a unique identifier for a node on the network.
*
Expand Down Expand Up @@ -359,7 +400,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
* }
* ```
*/
peerRouting: PeerRouting
peerRouting: PeerRouting<FindPeerProgressEvents, GetClosestPeersProgressEvents>

/**
* The content routing subsystem allows the user to find providers for content,
Expand All @@ -375,7 +416,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
* }
* ```
*/
contentRouting: ContentRouting
contentRouting: ContentRouting<ProvideProgressEvents, FindProvidersProgressEvents, GetProgressEvents, PutProgressEvents>

/**
* The keychain contains the keys used by the current node, and can create new
Expand Down Expand Up @@ -602,7 +643,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
/**
* A set of user defined services
*/
services: T
services: Services
}

/**
Expand Down
10 changes: 7 additions & 3 deletions packages/interface/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AbortOptions } from '../index.js'
import type { PeerId } from '../peer-id/index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
* Any object that implements this Symbol as a property should return a
Expand All @@ -23,7 +24,10 @@ import type { PeerInfo } from '../peer-info/index.js'
*/
export const peerRouting = Symbol.for('@libp2p/peer-routing')

export interface PeerRouting {
export interface PeerRouting<
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* Searches the network for peer info corresponding to the passed peer id.
*
Expand All @@ -34,7 +38,7 @@ export interface PeerRouting {
* const peer = await peerRouting.findPeer(peerId, options)
* ```
*/
findPeer(peerId: PeerId, options?: AbortOptions): Promise<PeerInfo>
findPeer(peerId: PeerId, options?: AbortOptions & ProgressOptions<FindPeerProgressEvents>): Promise<PeerInfo>

/**
* Search the network for peers that are closer to the passed key. Peer
Expand All @@ -49,5 +53,5 @@ export interface PeerRouting {
* }
* ```
*/
getClosestPeers(key: Uint8Array, options?: AbortOptions): AsyncIterable<PeerInfo>
getClosestPeers(key: Uint8Array, options?: AbortOptions & ProgressOptions<GetClosestPeersProgressEvents>): AsyncIterable<PeerInfo>
}
61 changes: 50 additions & 11 deletions packages/kad-dht/src/dual-kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,73 @@ import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { type PeerDiscovery, peerDiscovery, type PeerDiscoveryEvents } from '@libp2p/interface/peer-discovery'
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import merge from 'it-merge'
import isPrivate from 'private-ip'
import { CustomProgressEvent } from 'progress-events'
import { DefaultKadDHT } from './kad-dht.js'
import { queryErrorEvent } from './query/events.js'
import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type ProvideProgressEvents =
ProgressEvent<'libp2p:content-routing:provide:dht:event', QueryEvent>

export type FindProvidersProgressEvents =
ProgressEvent<'libp2p:content-routing:find-providers:dht:event', QueryEvent>

export type PutProgressEvents =
ProgressEvent<'libp2p:content-routing:put:dht:event', QueryEvent>

export type GetProgressEvents =
ProgressEvent<'libp2p:content-routing:get:dht:event', QueryEvent>

const log = logger('libp2p:kad-dht')

/**
* Wrapper class to convert events into returned values
*/
class DHTContentRouting implements ContentRouting {
class DHTContentRouting implements ContentRouting<
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
> {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
async provide (cid: CID, options: QueryOptions & ProgressOptions<ProvideProgressEvents> = {}): Promise<void> {
for await (const event of this.dht.provide(cid, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:provide:dht:event', event))
}

Check warning on line 51 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L49-L51

Added lines #L49 - L51 were not covered by tests
}

async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
async * findProviders (cid: CID, options: QueryOptions & ProgressOptions<FindProvidersProgressEvents> = {}): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of this.dht.findProviders(cid, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:find-providers:dht:event', event))

Check warning on line 57 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L56-L57

Added lines #L56 - L57 were not covered by tests
if (event.name === 'PROVIDER') {
yield * event.providers
}
}
}

async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
await drain(this.dht.put(key, value, options))
async put (key: Uint8Array, value: Uint8Array, options: QueryOptions & ProgressOptions<PutProgressEvents> = {}): Promise<void> {
for await (const event of this.dht.put(key, value, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:put:dht:event', event))
}

Check warning on line 67 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L65-L67

Added lines #L65 - L67 were not covered by tests
}

async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
async get (key: Uint8Array, options: QueryOptions & ProgressOptions<GetProgressEvents> = {}): Promise<Uint8Array> {
for await (const event of this.dht.get(key, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:get:dht:event', event))

Check warning on line 73 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L72-L73

Added lines #L72 - L73 were not covered by tests
if (event.name === 'VALUE') {
return event.value
}
Expand All @@ -54,18 +80,29 @@ class DHTContentRouting implements ContentRouting {
}
}

export type FindPeerProgressEvents =
ProgressEvent<'libp2p:peer-routing:find-peer:dht:event', QueryEvent>

export type GetClosestPeersProgressEvents =
ProgressEvent<'libp2p:peer-routing:get-closest-peers:dht:event', QueryEvent>

/**
* Wrapper class to convert events into returned values
*/
class DHTPeerRouting implements PeerRouting {
class DHTPeerRouting implements PeerRouting<
FindPeerProgressEvents,
GetClosestPeersProgressEvents
> {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
async findPeer (peerId: PeerId, options: QueryOptions & ProgressOptions<FindPeerProgressEvents> = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:find-peer:dht:event', event))

Check warning on line 105 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L104-L105

Added lines #L104 - L105 were not covered by tests
if (event.name === 'FINAL_PEER') {
return event.peer
}
Expand All @@ -74,8 +111,10 @@ class DHTPeerRouting implements PeerRouting {
throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
async * getClosestPeers (key: Uint8Array, options: QueryOptions & ProgressOptions<GetClosestPeersProgressEvents> = {}): AsyncIterable<PeerInfo> {
for await (const event of this.dht.getClosestPeers(key, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:get-closest-peers:dht:event', event))

Check warning on line 117 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L116-L117

Added lines #L116 - L117 were not covered by tests
if (event.name === 'FINAL_PEER') {
yield event.peer
}
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"p-queue": "^7.3.4",
"p-retry": "^6.0.0",
"private-ip": "^3.0.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"rate-limiter-flexible": "^3.0.0",
"uint8arraylist": "^2.4.3",
Expand Down
Loading

0 comments on commit 61aa0ce

Please sign in to comment.