Skip to content

Commit

Permalink
fix!: update pubsub subscribe method to return subscription (#186)
Browse files Browse the repository at this point in the history
Subscribing to pubsub topics is a two step process - first we subscribe
then we recieve messages.

We may subscribe and not receive messages, so change the return type
to reflect that.

It now returns a promise, after which is resolved the node is subscribed,
and the response has a `messages` method that can be used to iterate over
any received messages, and a `cancel` method that will cancel the
subscription.
  • Loading branch information
achingbrain authored Feb 24, 2023
1 parent cc79162 commit 88e4bf5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
10 changes: 9 additions & 1 deletion packages/libp2p-daemon-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Client implements DaemonClient {
async send (request: Request): Promise<StreamHandler> {
const maConn = await this.connectDaemon()

const subtype = request.pubsub?.type ?? request.dht?.type ?? request.peerStore?.type ?? ''
log('send', request.type, subtype)

const streamHandler = new StreamHandler({ stream: maConn })
streamHandler.write(Request.encode(request))
return streamHandler
Expand Down Expand Up @@ -291,9 +294,14 @@ export interface DHTClient {
getClosestPeers: (key: Uint8Array) => AsyncIterable<PeerInfo>
}

export interface Subscription {
messages: () => AsyncIterable<PSMessage>
cancel: () => Promise<void>
}

export interface PubSubClient {
publish: (topic: string, data: Uint8Array) => Promise<void>
subscribe: (topic: string) => AsyncIterable<PSMessage>
subscribe: (topic: string) => Promise<Subscription>
getTopics: () => Promise<string[]>
getSubscribers: (topic: string) => Promise<PeerId[]>
}
Expand Down
29 changes: 20 additions & 9 deletions packages/libp2p-daemon-client/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
PSRequest,
PSMessage
} from '@libp2p/daemon-protocol'
import type { DaemonClient } from './index.js'
import type { DaemonClient, Subscription } from './index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import { peerIdFromBytes } from '@libp2p/peer-id'

Expand Down Expand Up @@ -89,7 +89,7 @@ export class Pubsub {
/**
* Request to subscribe a certain topic
*/
async * subscribe (topic: string): AsyncGenerator<PSMessage, void, undefined> {
async subscribe (topic: string): Promise<Subscription> {
if (typeof topic !== 'string') {
throw new CodeError('invalid topic received', 'ERR_INVALID_TOPIC')
}
Expand All @@ -114,16 +114,27 @@ export class Pubsub {
throw new CodeError(response.error?.msg ?? 'Pubsub publish failed', 'ERR_PUBSUB_PUBLISH_FAILED')
}

// stream messages
while (true) {
message = await sh.read()
let subscribed = true

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}
const subscription: Subscription = {
async * messages () {
while (subscribed) { // eslint-disable-line no-unmodified-loop-condition
message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

yield PSMessage.decode(message)
yield PSMessage.decode(message)
}
},
async cancel () {
subscribed = false
await sh.close()
}
}

return subscription
}

async getSubscribers (topic: string): Promise<PeerId[]> {
Expand Down

0 comments on commit 88e4bf5

Please sign in to comment.