Skip to content

Commit

Permalink
fix(pubsub): multibase in pubsub http rpc (#3922)
Browse files Browse the repository at this point in the history
This PR  aims to restore interop with go-ipfs by applying the same changes as in ipfs/kubo#8183 

TLDR is that we clean up and unify the API. 

BREAKING CHANGE: We had to make breaking changes to `pubsub` commands sent over HTTP RPC  to fix data corruption caused by topic names and payload bytes that included `\n`. More details in ipfs/kubo#7939 and ipfs/kubo#8183
  • Loading branch information
lidel authored Dec 15, 2021
1 parent d266bdc commit 0b7326a
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 13 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"devDependencies": {
"aegir": "^36.0.1",
"delay": "^5.0.0",
"go-ipfs": "0.10.0",
"go-ipfs": "0.11.0",
"ipfsd-ctl": "^10.0.4",
"it-all": "^1.0.4",
"it-first": "^1.0.4",
Expand Down
11 changes: 10 additions & 1 deletion src/files/rm.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import HTTP from 'ipfs-utils/src/http.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -20,7 +21,15 @@ export const createRm = configure(api => {
headers: options.headers
})

await res.text()
const body = await res.text()
// we don't expect text body to be ever present
// (if so, it means an error such as https://github.com/ipfs/go-ipfs/issues/8606)
if (body !== '') {
/** @type {Error} */
const error = new HTTP.HTTPError(res)
error.message = body
throw error
}
}
return rm
})
41 changes: 41 additions & 0 deletions src/lib/http-rpc-wire-format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { base64url } from 'multiformats/bases/base64'

/* HTTP RPC:
* - wraps binary data in multibase. base64url is used to avoid issues
* when a binary data is passed as search param in URL.
* Historical context: https://github.com/ipfs/go-ipfs/issues/7939
* Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183
*/

/**
* @param {Array<string>} strings
* @returns {Array<string>} strings
*/
const rpcArrayToTextArray = strings => {
if (Array.isArray(strings)) {
return strings.map(rpcToText)
}
return strings
}

/**
* @param {string} mb
* @returns {string}
*/
const rpcToText = mb => uint8ArrayToString(rpcToBytes(mb))

/**
* @param {string} mb
* @returns {Uint8Array}
*/
const rpcToBytes = mb => base64url.decode(mb)

/**
* @param {string} text
* @returns {string}
*/
const textToUrlSafeRpc = text => base64url.encode(uint8ArrayFromString(text))

export { rpcArrayToTextArray, rpcToText, rpcToBytes, textToUrlSafeRpc }
3 changes: 2 additions & 1 deletion src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { rpcArrayToTextArray } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -17,7 +18,7 @@ export const createLs = configure(api => {
headers: options.headers
})).json()

return Strings || []
return rpcArrayToTextArray(Strings) || []
}
return ls
})
3 changes: 2 additions & 1 deletion src/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -14,7 +15,7 @@ export const createPeers = configure(api => {
const res = await api.post('pubsub/peers', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down
3 changes: 2 additions & 1 deletion src/pubsub/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { multipartRequest } from 'ipfs-core-utils/multipart-request'
import { abortSignal } from '../lib/abort-signal.js'
import { textToUrlSafeRpc } from '../lib/http-rpc-wire-format.js'
import { AbortController } from 'native-abort-controller'

/**
Expand All @@ -15,7 +16,7 @@ export const createPublish = configure(api => {
*/
async function publish (topic, data, options = {}) {
const searchParams = toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
})

Expand Down
13 changes: 6 additions & 7 deletions src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import debug from 'debug'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { textToUrlSafeRpc, rpcArrayToTextArray, rpcToBytes } from '../lib/http-rpc-wire-format.js'
const log = debug('ipfs-http-client:pubsub:subscribe')

/**
Expand Down Expand Up @@ -43,7 +42,7 @@ export const createSubscribe = (options, subsTracker) => {
api.post('pubsub/sub', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: textToUrlSafeRpc(topic),
...options
}),
headers: options.headers
Expand Down Expand Up @@ -95,10 +94,10 @@ async function readMessages (response, { onMessage, onEnd, onError }) {
}

onMessage({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'),
data: uint8ArrayFromString(msg.data, 'base64pad'),
seqno: uint8ArrayFromString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from,
data: rpcToBytes(msg.data),
seqno: rpcToBytes(msg.seqno),
topicIDs: rpcArrayToTextArray(msg.topicIDs)
})
} catch (/** @type {any} */ err) {
err.message = `Failed to parse pubsub message: ${err.message}`
Expand Down
2 changes: 1 addition & 1 deletion test/utils/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const commonOptions = {

const commonOverrides = {
go: {
ipfsBin: isNode ? path() : undefined
ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined
}
}

Expand Down

0 comments on commit 0b7326a

Please sign in to comment.