From 4a2cc2f7fa037594a4fde2dc39064ac693ffc031 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 10 Dec 2018 12:12:15 +0000 Subject: [PATCH 1/5] refactor: modularise files regular This will make adding `addFrom*` methods a whole bunch easier. License: MIT Signed-off-by: Alan Shaw --- .../files-regular/add-pull-stream.js | 156 ++++++++++++++++++ .../files-regular/add-readable-stream.js | 53 ++++++ src/core/components/files-regular/add.js | 62 +++++++ .../files-regular/cat-pull-stream.js | 49 ++++++ .../files-regular/cat-readable-stream.js | 7 + src/core/components/files-regular/cat.js | 21 +++ .../files-regular/get-pull-stream.js | 26 +++ .../files-regular/get-readable-stream.js | 24 +++ src/core/components/files-regular/get.js | 34 ++++ src/core/components/files-regular/index.js | 14 ++ .../files-regular/ls-pull-stream.js | 36 ++++ .../files-regular/ls-readable-stream.js | 9 + src/core/components/files-regular/ls.js | 20 +++ src/core/components/files-regular/utils.js | 97 +++++++++++ src/core/utils.js | 81 --------- 15 files changed, 608 insertions(+), 81 deletions(-) create mode 100644 src/core/components/files-regular/add-pull-stream.js create mode 100644 src/core/components/files-regular/add-readable-stream.js create mode 100644 src/core/components/files-regular/add.js create mode 100644 src/core/components/files-regular/cat-pull-stream.js create mode 100644 src/core/components/files-regular/cat-readable-stream.js create mode 100644 src/core/components/files-regular/cat.js create mode 100644 src/core/components/files-regular/get-pull-stream.js create mode 100644 src/core/components/files-regular/get-readable-stream.js create mode 100644 src/core/components/files-regular/get.js create mode 100644 src/core/components/files-regular/index.js create mode 100644 src/core/components/files-regular/ls-pull-stream.js create mode 100644 src/core/components/files-regular/ls-readable-stream.js create mode 100644 src/core/components/files-regular/ls.js create mode 100644 src/core/components/files-regular/utils.js diff --git a/src/core/components/files-regular/add-pull-stream.js b/src/core/components/files-regular/add-pull-stream.js new file mode 100644 index 0000000000..f32bff090b --- /dev/null +++ b/src/core/components/files-regular/add-pull-stream.js @@ -0,0 +1,156 @@ +'use strict' + +const { importer } = require('ipfs-unixfs-engine') +const pull = require('pull-stream') +const toPull = require('stream-to-pull-stream') +const waterfall = require('async/waterfall') +const isStream = require('is-stream') +const isSource = require('is-pull-stream').isSource +const CID = require('cids') +const { parseChunkerString } = require('./utils') + +const WRAPPER = 'wrapper/' + +function noop () {} + +function prepareFile (self, opts, file, callback) { + opts = opts || {} + + let cid = new CID(file.multihash) + + if (opts.cidVersion === 1) { + cid = cid.toV1() + } + + waterfall([ + (cb) => opts.onlyHash + ? cb(null, file) + : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb), + (node, cb) => { + const b58Hash = cid.toBaseEncodedString() + + let size = node.size + + if (Buffer.isBuffer(node)) { + size = node.length + } + + cb(null, { + path: opts.wrapWithDirectory + ? file.path.substring(WRAPPER.length) + : (file.path || b58Hash), + hash: b58Hash, + size + }) + } + ], callback) +} + +function normalizeContent (opts, content) { + if (!Array.isArray(content)) { + content = [content] + } + + return content.map((data) => { + // Buffer input + if (Buffer.isBuffer(data)) { + data = { path: '', content: pull.values([data]) } + } + + // Readable stream input + if (isStream.readable(data)) { + data = { path: '', content: toPull.source(data) } + } + + if (isSource(data)) { + data = { path: '', content: data } + } + + if (data && data.content && typeof data.content !== 'function') { + if (Buffer.isBuffer(data.content)) { + data.content = pull.values([data.content]) + } + + if (isStream.readable(data.content)) { + data.content = toPull.source(data.content) + } + } + + if (opts.wrapWithDirectory && !data.path) { + throw new Error('Must provide a path when wrapping with a directory') + } + + if (opts.wrapWithDirectory) { + data.path = WRAPPER + data.path + } + + return data + }) +} + +function preloadFile (self, opts, file) { + const isRootFile = opts.wrapWithDirectory + ? file.path === '' + : !file.path.includes('/') + + const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false + + if (shouldPreload) { + self._preload(file.hash) + } + + return file +} + +function pinFile (self, opts, file, cb) { + // Pin a file if it is the root dir of a recursive add or the single file + // of a direct add. + const pin = 'pin' in opts ? opts.pin : true + const isRootDir = !file.path.includes('/') + const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg + if (shouldPin) { + return self.pin.add(file.hash, { preload: false }, err => cb(err, file)) + } else { + cb(null, file) + } +} + +module.exports = function (self) { + // Internal add func that gets used by all add funcs + return function addPullStream (options = {}) { + let chunkerOptions + try { + chunkerOptions = parseChunkerString(options.chunker) + } catch (err) { + return pull.map(() => { throw err }) + } + const opts = Object.assign({}, { + shardSplitThreshold: self._options.EXPERIMENTAL.sharding + ? 1000 + : Infinity + }, options, chunkerOptions) + + // CID v0 is for multihashes encoded with sha2-256 + if (opts.hashAlg && opts.cidVersion !== 1) { + opts.cidVersion = 1 + } + + let total = 0 + + const prog = opts.progress || noop + const progress = (bytes) => { + total += bytes + prog(total) + } + + opts.progress = progress + return pull( + pull.map(normalizeContent.bind(null, opts)), + pull.flatten(), + importer(self._ipld, opts), + pull.asyncMap(prepareFile.bind(null, self, opts)), + pull.map(preloadFile.bind(null, self, opts)), + pull.asyncMap(pinFile.bind(null, self, opts)) + ) + } +} diff --git a/src/core/components/files-regular/add-readable-stream.js b/src/core/components/files-regular/add-readable-stream.js new file mode 100644 index 0000000000..0611aa9338 --- /dev/null +++ b/src/core/components/files-regular/add-readable-stream.js @@ -0,0 +1,53 @@ +'use strict' + +const pull = require('pull-stream') +const pushable = require('pull-pushable') +const Duplex = require('readable-stream').Duplex + +class AddHelper extends Duplex { + constructor (pullStream, push, options) { + super(Object.assign({ objectMode: true }, options)) + this._pullStream = pullStream + this._pushable = push + this._waitingPullFlush = [] + } + + _read () { + this._pullStream(null, (end, data) => { + while (this._waitingPullFlush.length) { + const cb = this._waitingPullFlush.shift() + cb() + } + if (end) { + if (end instanceof Error) { + this.emit('error', end) + } + } else { + this.push(data) + } + }) + } + + _write (chunk, encoding, callback) { + this._waitingPullFlush.push(callback) + this._pushable.push(chunk) + } +} + +module.exports = function (self) { + return (options) => { + options = options || {} + + const p = pushable() + const s = pull( + p, + self.addPullStream(options) + ) + + const retStream = new AddHelper(s, p) + + retStream.once('finish', () => p.end()) + + return retStream + } +} diff --git a/src/core/components/files-regular/add.js b/src/core/components/files-regular/add.js new file mode 100644 index 0000000000..1c36b7d20a --- /dev/null +++ b/src/core/components/files-regular/add.js @@ -0,0 +1,62 @@ +'use strict' + +const promisify = require('promisify-es6') +const pull = require('pull-stream') +const sort = require('pull-sort') +const isStream = require('is-stream') +const isSource = require('is-pull-stream').isSource +const isString = require('lodash/isString') + +module.exports = function (self) { + const add = promisify((data, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + options = options || {} + + // Buffer, pull stream or Node.js stream + const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj) + // An object like { content?, path? }, where content isBufferOrStream and path isString + const isContentObject = obj => { + if (typeof obj !== 'object') return false + // path is optional if content is present + if (obj.content) return isBufferOrStream(obj.content) + // path must be a non-empty string if no content + return Boolean(obj.path) && isString(obj.path) + } + // An input atom: a buffer, stream or content object + const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) + // All is ok if data isInput or data is an array of isInput + const ok = isInput(data) || (Array.isArray(data) && data.every(isInput)) + + if (!ok) { + return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) + } + + pull( + pull.values([data]), + self.addPullStream(options), + sort((a, b) => { + if (a.path < b.path) return 1 + if (a.path > b.path) return -1 + return 0 + }), + pull.collect(callback) + ) + }) + + return function () { + const args = Array.from(arguments) + + // If we .add(), then promisify thinks the pull stream + // is a callback! Add an empty options object in this case so that a + // promise is returned. + if (args.length === 1 && isSource(args[0])) { + args.push({}) + } + + return add.apply(null, args) + } +} diff --git a/src/core/components/files-regular/cat-pull-stream.js b/src/core/components/files-regular/cat-pull-stream.js new file mode 100644 index 0000000000..feea643d86 --- /dev/null +++ b/src/core/components/files-regular/cat-pull-stream.js @@ -0,0 +1,49 @@ +'use strict' + +const { exporter } = require('ipfs-unixfs-engine') +const pull = require('pull-stream') +const deferred = require('pull-defer') +const { normalizePath } = require('./utils') + +module.exports = function (self) { + return function catPullStream (ipfsPath, options) { + if (typeof ipfsPath === 'function') { + throw new Error('You must supply an ipfsPath') + } + + options = options || {} + + ipfsPath = normalizePath(ipfsPath) + const pathComponents = ipfsPath.split('/') + const restPath = normalizePath(pathComponents.slice(1).join('/')) + const filterFile = (file) => (restPath && file.path === restPath) || (file.path === ipfsPath) + + if (options.preload !== false) { + self._preload(pathComponents[0]) + } + + const d = deferred.source() + + pull( + exporter(ipfsPath, self._ipld, options), + pull.collect((err, files) => { + if (err) { return d.abort(err) } + if (files && files.length > 1) { + files = files.filter(filterFile) + } + if (!files || !files.length) { + return d.abort(new Error('No such file')) + } + + const file = files[0] + const content = file.content + if (!content && file.type === 'dir') { + return d.abort(new Error('this dag node is a directory')) + } + d.resolve(content) + }) + ) + + return d + } +} diff --git a/src/core/components/files-regular/cat-readable-stream.js b/src/core/components/files-regular/cat-readable-stream.js new file mode 100644 index 0000000000..b0c8c89029 --- /dev/null +++ b/src/core/components/files-regular/cat-readable-stream.js @@ -0,0 +1,7 @@ +'use strict' + +const toStream = require('pull-stream-to-stream') + +module.exports = function (self) { + return (ipfsPath, options) => toStream.source(self.catPullStream(ipfsPath, options)) +} diff --git a/src/core/components/files-regular/cat.js b/src/core/components/files-regular/cat.js new file mode 100644 index 0000000000..6740f62d9f --- /dev/null +++ b/src/core/components/files-regular/cat.js @@ -0,0 +1,21 @@ +'use strict' + +const promisify = require('promisify-es6') +const pull = require('pull-stream') + +module.exports = function (self) { + return promisify((ipfsPath, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + pull( + self.catPullStream(ipfsPath, options), + pull.collect((err, buffers) => { + if (err) { return callback(err) } + callback(null, Buffer.concat(buffers)) + }) + ) + }) +} diff --git a/src/core/components/files-regular/get-pull-stream.js b/src/core/components/files-regular/get-pull-stream.js new file mode 100644 index 0000000000..a8bdef7b4e --- /dev/null +++ b/src/core/components/files-regular/get-pull-stream.js @@ -0,0 +1,26 @@ +'use strict' + +const { exporter } = require('ipfs-unixfs-engine') +const pull = require('pull-stream') +const errCode = require('err-code') +const { normalizePath } = require('./utils') + +module.exports = function (self) { + return (ipfsPath, options) => { + options = options || {} + + if (options.preload !== false) { + let pathComponents + + try { + pathComponents = normalizePath(ipfsPath).split('/') + } catch (err) { + return pull.error(errCode(err, 'ERR_INVALID_PATH')) + } + + self._preload(pathComponents[0]) + } + + return exporter(ipfsPath, self._ipld, options) + } +} diff --git a/src/core/components/files-regular/get-readable-stream.js b/src/core/components/files-regular/get-readable-stream.js new file mode 100644 index 0000000000..a8a53150ea --- /dev/null +++ b/src/core/components/files-regular/get-readable-stream.js @@ -0,0 +1,24 @@ +'use strict' + +const pull = require('pull-stream') +const toStream = require('pull-stream-to-stream') + +module.exports = function (self) { + return (ipfsPath, options) => { + options = options || {} + + return toStream.source( + pull( + self.getPullStream(ipfsPath, options), + pull.map((file) => { + if (file.content) { + file.content = toStream.source(file.content) + file.content.pause() + } + + return file + }) + ) + ) + } +} diff --git a/src/core/components/files-regular/get.js b/src/core/components/files-regular/get.js new file mode 100644 index 0000000000..a64577c222 --- /dev/null +++ b/src/core/components/files-regular/get.js @@ -0,0 +1,34 @@ +'use strict' + +const promisify = require('promisify-es6') +const pull = require('pull-stream') + +module.exports = function (self) { + return promisify((ipfsPath, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + options = options || {} + + pull( + self.getPullStream(ipfsPath, options), + pull.asyncMap((file, cb) => { + if (file.content) { + pull( + file.content, + pull.collect((err, buffers) => { + if (err) { return cb(err) } + file.content = Buffer.concat(buffers) + cb(null, file) + }) + ) + } else { + cb(null, file) + } + }), + pull.collect(callback) + ) + }) +} diff --git a/src/core/components/files-regular/index.js b/src/core/components/files-regular/index.js new file mode 100644 index 0000000000..8cfeb80ffa --- /dev/null +++ b/src/core/components/files-regular/index.js @@ -0,0 +1,14 @@ +module.exports = self => ({ + add: require('./add')(self), + addPullStream: require('./add-pull-stream')(self), + addReadableStream: require('./add-readable-stream')(self), + cat: require('./cat')(self), + catPullStream: require('./cat-pull-stream')(self), + catReadableStream: require('./cat-readable-stream')(self), + get: require('./get')(self), + getPullStream: require('./get-pull-stream')(self), + getReadableStream: require('./get-readable-stream')(self), + ls: require('./ls')(self), + lsPullStream: require('./ls-pull-stream')(self), + lsReadableStream: require('./ls-readable-stream')(self) +}) diff --git a/src/core/components/files-regular/ls-pull-stream.js b/src/core/components/files-regular/ls-pull-stream.js new file mode 100644 index 0000000000..25d1e27a0c --- /dev/null +++ b/src/core/components/files-regular/ls-pull-stream.js @@ -0,0 +1,36 @@ +'use strict' + +const { exporter } = require('ipfs-unixfs-engine') +const pull = require('pull-stream') +const CID = require('cids') +const { normalizePath } = require('./utils') + +module.exports = function (self) { + return function (ipfsPath, options) { + options = options || {} + + const path = normalizePath(ipfsPath) + const recursive = options.recursive + const pathComponents = path.split('/') + const pathDepth = pathComponents.length + const maxDepth = recursive ? global.Infinity : pathDepth + options.maxDepth = options.maxDepth || maxDepth + + if (options.preload !== false) { + self._preload(pathComponents[0]) + } + + return pull( + exporter(ipfsPath, self._ipld, options), + pull.filter(node => + recursive ? node.depth >= pathDepth : node.depth === pathDepth + ), + pull.map(node => { + const cid = new CID(node.hash) + node = Object.assign({}, node, { hash: cid.toBaseEncodedString() }) + delete node.content + return node + }) + ) + } +} diff --git a/src/core/components/files-regular/ls-readable-stream.js b/src/core/components/files-regular/ls-readable-stream.js new file mode 100644 index 0000000000..1b02eef0bc --- /dev/null +++ b/src/core/components/files-regular/ls-readable-stream.js @@ -0,0 +1,9 @@ +'use strict' + +const toStream = require('pull-stream-to-stream') + +module.exports = function (self) { + return (ipfsPath, options) => { + return toStream.source(self.lsPullStream(ipfsPath, options)) + } +} diff --git a/src/core/components/files-regular/ls.js b/src/core/components/files-regular/ls.js new file mode 100644 index 0000000000..89bd7f1aaf --- /dev/null +++ b/src/core/components/files-regular/ls.js @@ -0,0 +1,20 @@ +'use strict' + +const promisify = require('promisify-es6') +const pull = require('pull-stream') + +module.exports = function (self) { + return promisify((ipfsPath, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + options = options || {} + + pull( + self.lsPullStream(ipfsPath, options), + pull.collect(callback) + ) + }) +} diff --git a/src/core/components/files-regular/utils.js b/src/core/components/files-regular/utils.js new file mode 100644 index 0000000000..732b555f0d --- /dev/null +++ b/src/core/components/files-regular/utils.js @@ -0,0 +1,97 @@ +const CID = require('cids') + +exports.normalizePath = (path) => { + if (Buffer.isBuffer(path)) { + return new CID(path).toString() + } + if (CID.isCID(path)) { + return path.toString() + } + if (path.indexOf('/ipfs/') === 0) { + path = path.substring('/ipfs/'.length) + } + if (path.charAt(path.length - 1) === '/') { + path = path.substring(0, path.length - 1) + } + return path +} + +/** + * Parses chunker string into options used by DAGBuilder in ipfs-unixfs-engine + * + * + * @param {String} chunker Chunker algorithm supported formats: + * "size-{size}" + * "rabin" + * "rabin-{avg}" + * "rabin-{min}-{avg}-{max}" + * + * @return {Object} Chunker options for DAGBuilder + */ +exports.parseChunkerString = (chunker) => { + if (!chunker) { + return { + chunker: 'fixed' + } + } else if (chunker.startsWith('size-')) { + const sizeStr = chunker.split('-')[1] + const size = parseInt(sizeStr) + if (isNaN(size)) { + throw new Error('Chunker parameter size must be an integer') + } + return { + chunker: 'fixed', + chunkerOptions: { + maxChunkSize: size + } + } + } else if (chunker.startsWith('rabin')) { + return { + chunker: 'rabin', + chunkerOptions: parseRabinString(chunker) + } + } else { + throw new Error(`Unrecognized chunker option: ${chunker}`) + } +} + +/** + * Parses rabin chunker string + * + * @param {String} chunker Chunker algorithm supported formats: + * "rabin" + * "rabin-{avg}" + * "rabin-{min}-{avg}-{max}" + * + * @return {Object} rabin chunker options + */ +function parseRabinString (chunker) { + const options = {} + const parts = chunker.split('-') + switch (parts.length) { + case 1: + options.avgChunkSize = 262144 + break + case 2: + options.avgChunkSize = parseChunkSize(parts[1], 'avg') + break + case 4: + options.minChunkSize = parseChunkSize(parts[1], 'min') + options.avgChunkSize = parseChunkSize(parts[2], 'avg') + options.maxChunkSize = parseChunkSize(parts[3], 'max') + break + default: + throw new Error('Incorrect chunker format (expected "rabin" "rabin-[avg]" or "rabin-[min]-[avg]-[max]"') + } + + return options +} + +function parseChunkSize (str, name) { + let size = parseInt(str) + if (isNaN(size)) { + throw new Error(`Chunker parameter ${name} must be an integer`) + } + + return size +} diff --git a/src/core/utils.js b/src/core/utils.js index 24fcd68edb..96db3a9c87 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -136,87 +136,6 @@ const resolvePath = promisify(function (objectAPI, ipfsPaths, callback) { }, callback) }) -/** - * Parses chunker string into options used by DAGBuilder in ipfs-unixfs-engine - * - * - * @param {String} chunker Chunker algorithm supported formats: - * "size-{size}" - * "rabin" - * "rabin-{avg}" - * "rabin-{min}-{avg}-{max}" - * - * @return {Object} Chunker options for DAGBuilder - */ -function parseChunkerString (chunker) { - if (!chunker) { - return { - chunker: 'fixed' - } - } else if (chunker.startsWith('size-')) { - const sizeStr = chunker.split('-')[1] - const size = parseInt(sizeStr) - if (isNaN(size)) { - throw new Error('Chunker parameter size must be an integer') - } - return { - chunker: 'fixed', - chunkerOptions: { - maxChunkSize: size - } - } - } else if (chunker.startsWith('rabin')) { - return { - chunker: 'rabin', - chunkerOptions: parseRabinString(chunker) - } - } else { - throw new Error(`Unrecognized chunker option: ${chunker}`) - } -} - -/** - * Parses rabin chunker string - * - * @param {String} chunker Chunker algorithm supported formats: - * "rabin" - * "rabin-{avg}" - * "rabin-{min}-{avg}-{max}" - * - * @return {Object} rabin chunker options - */ -function parseRabinString (chunker) { - const options = {} - const parts = chunker.split('-') - switch (parts.length) { - case 1: - options.avgChunkSize = 262144 - break - case 2: - options.avgChunkSize = parseChunkSize(parts[1], 'avg') - break - case 4: - options.minChunkSize = parseChunkSize(parts[1], 'min') - options.avgChunkSize = parseChunkSize(parts[2], 'avg') - options.maxChunkSize = parseChunkSize(parts[3], 'max') - break - default: - throw new Error('Incorrect chunker format (expected "rabin" "rabin-[avg]" or "rabin-[min]-[avg]-[max]"') - } - - return options -} - -function parseChunkSize (str, name) { - let size = parseInt(str) - if (isNaN(size)) { - throw new Error(`Chunker parameter ${name} must be an integer`) - } - - return size -} - exports.normalizePath = normalizePath exports.parseIpfsPath = parseIpfsPath exports.resolvePath = resolvePath -exports.parseChunkerString = parseChunkerString From cb59a7cd79406820b4c219d65f28317f484b6efa Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 10 Dec 2018 12:30:53 +0000 Subject: [PATCH 2/5] fix: remove the old License: MIT Signed-off-by: Alan Shaw --- src/core/components/files-regular.js | 511 ------------------ .../files-regular/cat-pull-stream.js | 21 +- 2 files changed, 14 insertions(+), 518 deletions(-) delete mode 100644 src/core/components/files-regular.js diff --git a/src/core/components/files-regular.js b/src/core/components/files-regular.js deleted file mode 100644 index ed41c1eaaa..0000000000 --- a/src/core/components/files-regular.js +++ /dev/null @@ -1,511 +0,0 @@ -'use strict' - -const unixfsEngine = require('ipfs-unixfs-engine') -const importer = unixfsEngine.importer -const exporter = unixfsEngine.exporter -const promisify = require('promisify-es6') -const pull = require('pull-stream') -const sort = require('pull-sort') -const pushable = require('pull-pushable') -const toStream = require('pull-stream-to-stream') -const toPull = require('stream-to-pull-stream') -const deferred = require('pull-defer') -const waterfall = require('async/waterfall') -const isStream = require('is-stream') -const isSource = require('is-pull-stream').isSource -const Duplex = require('readable-stream').Duplex -const isString = require('lodash/isString') -const CID = require('cids') -const toB58String = require('multihashes').toB58String -const errCode = require('err-code') -const parseChunkerString = require('../utils').parseChunkerString - -const WRAPPER = 'wrapper/' - -function noop () {} - -function normalizePath (path) { - if (Buffer.isBuffer(path)) { - path = toB58String(path) - } - if (CID.isCID(path)) { - path = path.toBaseEncodedString() - } - if (path.indexOf('/ipfs/') === 0) { - path = path.substring('/ipfs/'.length) - } - if (path.charAt(path.length - 1) === '/') { - path = path.substring(0, path.length - 1) - } - - return path -} - -function prepareFile (self, opts, file, callback) { - opts = opts || {} - - let cid = new CID(file.multihash) - - if (opts.cidVersion === 1) { - cid = cid.toV1() - } - - waterfall([ - (cb) => opts.onlyHash - ? cb(null, file) - : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb), - (node, cb) => { - const b58Hash = cid.toBaseEncodedString() - - let size = node.size - - if (Buffer.isBuffer(node)) { - size = node.length - } - - cb(null, { - path: opts.wrapWithDirectory - ? file.path.substring(WRAPPER.length) - : (file.path || b58Hash), - hash: b58Hash, - size - }) - } - ], callback) -} - -function normalizeContent (opts, content) { - if (!Array.isArray(content)) { - content = [content] - } - - return content.map((data) => { - // Buffer input - if (Buffer.isBuffer(data)) { - data = { path: '', content: pull.values([data]) } - } - - // Readable stream input - if (isStream.readable(data)) { - data = { path: '', content: toPull.source(data) } - } - - if (isSource(data)) { - data = { path: '', content: data } - } - - if (data && data.content && typeof data.content !== 'function') { - if (Buffer.isBuffer(data.content)) { - data.content = pull.values([data.content]) - } - - if (isStream.readable(data.content)) { - data.content = toPull.source(data.content) - } - } - - if (opts.wrapWithDirectory && !data.path) { - throw new Error('Must provide a path when wrapping with a directory') - } - - if (opts.wrapWithDirectory) { - data.path = WRAPPER + data.path - } - - return data - }) -} - -function preloadFile (self, opts, file) { - const isRootFile = opts.wrapWithDirectory - ? file.path === '' - : !file.path.includes('/') - - const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false - - if (shouldPreload) { - self._preload(file.hash) - } - - return file -} - -function pinFile (self, opts, file, cb) { - // Pin a file if it is the root dir of a recursive add or the single file - // of a direct add. - const pin = 'pin' in opts ? opts.pin : true - const isRootDir = !file.path.includes('/') - const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg - if (shouldPin) { - return self.pin.add(file.hash, { preload: false }, err => cb(err, file)) - } else { - cb(null, file) - } -} - -class AddHelper extends Duplex { - constructor (pullStream, push, options) { - super(Object.assign({ objectMode: true }, options)) - this._pullStream = pullStream - this._pushable = push - this._waitingPullFlush = [] - } - - _read () { - this._pullStream(null, (end, data) => { - while (this._waitingPullFlush.length) { - const cb = this._waitingPullFlush.shift() - cb() - } - if (end) { - if (end instanceof Error) { - this.emit('error', end) - } - } else { - this.push(data) - } - }) - } - - _write (chunk, encoding, callback) { - this._waitingPullFlush.push(callback) - this._pushable.push(chunk) - } -} - -module.exports = function (self) { - // Internal add func that gets used by all add funcs - function _addPullStream (options = {}) { - let chunkerOptions - try { - chunkerOptions = parseChunkerString(options.chunker) - } catch (err) { - return pull.map(() => { throw err }) - } - const opts = Object.assign({}, { - shardSplitThreshold: self._options.EXPERIMENTAL.sharding - ? 1000 - : Infinity - }, options, chunkerOptions) - - if (opts.hashAlg && opts.cidVersion !== 1) { - opts.cidVersion = 1 - } - - let total = 0 - - const prog = opts.progress || noop - const progress = (bytes) => { - total += bytes - prog(total) - } - - opts.progress = progress - return pull( - pull.map(normalizeContent.bind(null, opts)), - pull.flatten(), - importer(self._ipld, opts), - pull.asyncMap(prepareFile.bind(null, self, opts)), - pull.map(preloadFile.bind(null, self, opts)), - pull.asyncMap(pinFile.bind(null, self, opts)) - ) - } - - // Internal cat func that gets used by all cat funcs - function _catPullStream (ipfsPath, options) { - if (typeof ipfsPath === 'function') { - throw new Error('You must supply an ipfsPath') - } - - options = options || {} - - ipfsPath = normalizePath(ipfsPath) - const pathComponents = ipfsPath.split('/') - const restPath = normalizePath(pathComponents.slice(1).join('/')) - const filterFile = (file) => (restPath && file.path === restPath) || (file.path === ipfsPath) - - if (options.preload !== false) { - self._preload(pathComponents[0]) - } - - const d = deferred.source() - - pull( - exporter(ipfsPath, self._ipld, options), - pull.filter(filterFile), - pull.take(1), - pull.collect((err, files) => { - if (err) { - return d.abort(err) - } - - if (!files.length) { - return d.abort(new Error('No such file')) - } - - const file = files[0] - - if (!file.content && file.type === 'dir') { - return d.abort(new Error('this dag node is a directory')) - } - - if (!file.content) { - return d.abort(new Error('this dag node has no content')) - } - - d.resolve(file.content) - }) - ) - - return d - } - - // Internal ls func that gets used by all ls funcs - function _lsPullStream (ipfsPath, options) { - options = options || {} - - const path = normalizePath(ipfsPath) - const recursive = options.recursive - const pathComponents = path.split('/') - const pathDepth = pathComponents.length - const maxDepth = recursive ? global.Infinity : pathDepth - options.maxDepth = options.maxDepth || maxDepth - - if (options.preload !== false) { - self._preload(pathComponents[0]) - } - - return pull( - exporter(ipfsPath, self._ipld, options), - pull.filter(node => - recursive ? node.depth >= pathDepth : node.depth === pathDepth - ), - pull.map(node => { - const cid = new CID(node.hash) - node = Object.assign({}, node, { hash: cid.toBaseEncodedString() }) - delete node.content - return node - }) - ) - } - - return { - add: (() => { - const add = promisify((data, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - // Buffer, pull stream or Node.js stream - const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj) - // An object like { content?, path? }, where content isBufferOrStream and path isString - const isContentObject = obj => { - if (typeof obj !== 'object') return false - // path is optional if content is present - if (obj.content) return isBufferOrStream(obj.content) - // path must be a non-empty string if no content - return Boolean(obj.path) && isString(obj.path) - } - // An input atom: a buffer, stream or content object - const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) - // All is ok if data isInput or data is an array of isInput - const ok = isInput(data) || (Array.isArray(data) && data.every(isInput)) - - if (!ok) { - return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) - } - - // CID v0 is for multihashes encoded with sha2-256 - if (options.hashAlg && options.cidVersion !== 1) { - options.cidVersion = 1 - } - - pull( - pull.values([data]), - _addPullStream(options), - sort((a, b) => { - if (a.path < b.path) return 1 - if (a.path > b.path) return -1 - return 0 - }), - pull.collect(callback) - ) - }) - - return function () { - const args = Array.from(arguments) - - // If we .add(), then promisify thinks the pull stream - // is a callback! Add an empty options object in this case so that a - // promise is returned. - if (args.length === 1 && isSource(args[0])) { - args.push({}) - } - - return add.apply(null, args) - } - })(), - - addReadableStream: (options) => { - options = options || {} - - const p = pushable() - const s = pull( - p, - _addPullStream(options) - ) - - const retStream = new AddHelper(s, p) - - retStream.once('finish', () => p.end()) - - return retStream - }, - - addPullStream: _addPullStream, - - cat: promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - if (typeof callback !== 'function') { - throw new Error('Please supply a callback to ipfs.cat') - } - - pull( - _catPullStream(ipfsPath, options), - pull.collect((err, buffers) => { - if (err) { return callback(err) } - callback(null, Buffer.concat(buffers)) - }) - ) - }), - - catReadableStream: (ipfsPath, options) => toStream.source(_catPullStream(ipfsPath, options)), - - catPullStream: (ipfsPath, options) => _catPullStream(ipfsPath, options), - - get: promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - if (options.preload !== false) { - let pathComponents - - try { - pathComponents = normalizePath(ipfsPath).split('/') - } catch (err) { - return setImmediate(() => callback(errCode(err, 'ERR_INVALID_PATH'))) - } - - self._preload(pathComponents[0]) - } - - pull( - exporter(ipfsPath, self._ipld, options), - pull.asyncMap((file, cb) => { - if (file.content) { - pull( - file.content, - pull.collect((err, buffers) => { - if (err) { return cb(err) } - file.content = Buffer.concat(buffers) - cb(null, file) - }) - ) - } else { - cb(null, file) - } - }), - pull.collect(callback) - ) - }), - - getReadableStream: (ipfsPath, options) => { - options = options || {} - - if (options.preload !== false) { - let pathComponents - - try { - pathComponents = normalizePath(ipfsPath).split('/') - } catch (err) { - return toStream.source(pull.error(errCode(err, 'ERR_INVALID_PATH'))) - } - - self._preload(pathComponents[0]) - } - - return toStream.source( - pull( - exporter(ipfsPath, self._ipld, options), - pull.map((file) => { - if (file.content) { - file.content = toStream.source(file.content) - file.content.pause() - } - - return file - }) - ) - ) - }, - - getPullStream: (ipfsPath, options) => { - options = options || {} - - if (options.preload !== false) { - let pathComponents - - try { - pathComponents = normalizePath(ipfsPath).split('/') - } catch (err) { - return pull.error(errCode(err, 'ERR_INVALID_PATH')) - } - - self._preload(pathComponents[0]) - } - - return exporter(ipfsPath, self._ipld, options) - }, - - ls: promisify((ipfsPath, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - pull( - _lsPullStream(ipfsPath, options), - pull.collect((err, values) => { - if (err) { - callback(err) - return - } - callback(null, values) - }) - ) - }), - - lsReadableStream: (ipfsPath, options) => { - return toStream.source(_lsPullStream(ipfsPath, options)) - }, - - lsPullStream: _lsPullStream - - // TODO create addFromFs - // TODO create addFromStream - // TODO create addFromUrl - } -} diff --git a/src/core/components/files-regular/cat-pull-stream.js b/src/core/components/files-regular/cat-pull-stream.js index feea643d86..41c126e8d3 100644 --- a/src/core/components/files-regular/cat-pull-stream.js +++ b/src/core/components/files-regular/cat-pull-stream.js @@ -26,21 +26,28 @@ module.exports = function (self) { pull( exporter(ipfsPath, self._ipld, options), + pull.filter(filterFile), + pull.take(1), pull.collect((err, files) => { - if (err) { return d.abort(err) } - if (files && files.length > 1) { - files = files.filter(filterFile) + if (err) { + return d.abort(err) } - if (!files || !files.length) { + + if (!files.length) { return d.abort(new Error('No such file')) } const file = files[0] - const content = file.content - if (!content && file.type === 'dir') { + + if (!file.content && file.type === 'dir') { return d.abort(new Error('this dag node is a directory')) } - d.resolve(content) + + if (!file.content) { + return d.abort(new Error('this dag node has no content')) + } + + d.resolve(file.content) }) ) From d4cdbd9c16563b8da66d040ed4aa5350a4dc024b Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 10 Dec 2018 14:54:29 +0000 Subject: [PATCH 3/5] fix: move chunker utils tests License: MIT Signed-off-by: Alan Shaw --- test/core/files-regular-utils.js | 78 ++++++++++++++++++++++++++++++++ test/core/node.js | 1 + test/core/utils.js | 67 --------------------------- 3 files changed, 79 insertions(+), 67 deletions(-) create mode 100644 test/core/files-regular-utils.js diff --git a/test/core/files-regular-utils.js b/test/core/files-regular-utils.js new file mode 100644 index 0000000000..3b86020d3f --- /dev/null +++ b/test/core/files-regular-utils.js @@ -0,0 +1,78 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const utils = require('../../src/core/components/files-regular/utils') + +describe('files-regular/utils', () => { + describe('parseChunkerString', () => { + it('handles an empty string', () => { + const options = utils.parseChunkerString('') + expect(options).to.have.property('chunker').to.equal('fixed') + }) + + it('handles a null chunker string', () => { + const options = utils.parseChunkerString(null) + expect(options).to.have.property('chunker').to.equal('fixed') + }) + + it('parses a fixed size string', () => { + const options = utils.parseChunkerString('size-512') + expect(options).to.have.property('chunker').to.equal('fixed') + expect(options) + .to.have.property('chunkerOptions') + .to.have.property('maxChunkSize') + .to.equal(512) + }) + + it('parses a rabin string without size', () => { + const options = utils.parseChunkerString('rabin') + expect(options).to.have.property('chunker').to.equal('rabin') + expect(options) + .to.have.property('chunkerOptions') + .to.have.property('avgChunkSize') + }) + + it('parses a rabin string with only avg size', () => { + const options = utils.parseChunkerString('rabin-512') + expect(options).to.have.property('chunker').to.equal('rabin') + expect(options) + .to.have.property('chunkerOptions') + .to.have.property('avgChunkSize') + .to.equal(512) + }) + + it('parses a rabin string with min, avg, and max', () => { + const options = utils.parseChunkerString('rabin-42-92-184') + expect(options).to.have.property('chunker').to.equal('rabin') + expect(options).to.have.property('chunkerOptions') + expect(options.chunkerOptions).to.have.property('minChunkSize').to.equal(42) + expect(options.chunkerOptions).to.have.property('avgChunkSize').to.equal(92) + expect(options.chunkerOptions).to.have.property('maxChunkSize').to.equal(184) + }) + + it('throws an error for unsupported chunker type', () => { + const fn = () => utils.parseChunkerString('fake-512') + expect(fn).to.throw(Error) + }) + + it('throws an error for incorrect format string', () => { + const fn = () => utils.parseChunkerString('fixed-abc') + expect(fn).to.throw(Error) + }) + + it('throws an error for incorrect rabin format string', () => { + let fn = () => utils.parseChunkerString('rabin-1-2-3-4') + expect(fn).to.throw(Error) + }) + + it('throws an error for non integer rabin parameters', () => { + const fn = () => utils.parseChunkerString('rabin-abc') + expect(fn).to.throw(Error) + }) + }) +}) diff --git a/test/core/node.js b/test/core/node.js index 3ddfaef962..3aa939e776 100644 --- a/test/core/node.js +++ b/test/core/node.js @@ -1,6 +1,7 @@ 'use strict' require('./circuit-relay') +require('./files-regular-utils') require('./name') require('./key-exchange') require('./pin') diff --git a/test/core/utils.js b/test/core/utils.js index 1ad815e926..7700e90955 100644 --- a/test/core/utils.js +++ b/test/core/utils.js @@ -185,71 +185,4 @@ describe('utils', () => { }) }) }) - - describe('parseChunkerString', () => { - it('handles an empty string', () => { - const options = utils.parseChunkerString('') - expect(options).to.have.property('chunker').to.equal('fixed') - }) - - it('handles a null chunker string', () => { - const options = utils.parseChunkerString(null) - expect(options).to.have.property('chunker').to.equal('fixed') - }) - - it('parses a fixed size string', () => { - const options = utils.parseChunkerString('size-512') - expect(options).to.have.property('chunker').to.equal('fixed') - expect(options) - .to.have.property('chunkerOptions') - .to.have.property('maxChunkSize') - .to.equal(512) - }) - - it('parses a rabin string without size', () => { - const options = utils.parseChunkerString('rabin') - expect(options).to.have.property('chunker').to.equal('rabin') - expect(options) - .to.have.property('chunkerOptions') - .to.have.property('avgChunkSize') - }) - - it('parses a rabin string with only avg size', () => { - const options = utils.parseChunkerString('rabin-512') - expect(options).to.have.property('chunker').to.equal('rabin') - expect(options) - .to.have.property('chunkerOptions') - .to.have.property('avgChunkSize') - .to.equal(512) - }) - - it('parses a rabin string with min, avg, and max', () => { - const options = utils.parseChunkerString('rabin-42-92-184') - expect(options).to.have.property('chunker').to.equal('rabin') - expect(options).to.have.property('chunkerOptions') - expect(options.chunkerOptions).to.have.property('minChunkSize').to.equal(42) - expect(options.chunkerOptions).to.have.property('avgChunkSize').to.equal(92) - expect(options.chunkerOptions).to.have.property('maxChunkSize').to.equal(184) - }) - - it('throws an error for unsupported chunker type', () => { - const fn = () => utils.parseChunkerString('fake-512') - expect(fn).to.throw(Error) - }) - - it('throws an error for incorrect format string', () => { - const fn = () => utils.parseChunkerString('fixed-abc') - expect(fn).to.throw(Error) - }) - - it('throws an error for incorrect rabin format string', () => { - let fn = () => utils.parseChunkerString('rabin-1-2-3-4') - expect(fn).to.throw(Error) - }) - - it('throws an error for non integer rabin parameters', () => { - const fn = () => utils.parseChunkerString('rabin-abc') - expect(fn).to.throw(Error) - }) - }) }) From 58648cd6f106b5b047f663957418ed8a6f6e006e Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 10 Dec 2018 15:33:29 +0000 Subject: [PATCH 4/5] fix: do not pass value on error License: MIT Signed-off-by: Alan Shaw --- src/core/components/files-regular/ls.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/core/components/files-regular/ls.js b/src/core/components/files-regular/ls.js index 89bd7f1aaf..42e6ff04c9 100644 --- a/src/core/components/files-regular/ls.js +++ b/src/core/components/files-regular/ls.js @@ -14,7 +14,12 @@ module.exports = function (self) { pull( self.lsPullStream(ipfsPath, options), - pull.collect(callback) + pull.collect((err, values) => { + if (err) { + return callback(err) + } + callback(null, values) + }) ) }) } From 3b007041c4c3272b542512a9499b284c709e8bf5 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 10 Dec 2018 21:02:30 +0000 Subject: [PATCH 5/5] chore: appease linter License: MIT Signed-off-by: Alan Shaw --- src/core/components/files-regular/index.js | 2 ++ src/core/components/files-regular/utils.js | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/core/components/files-regular/index.js b/src/core/components/files-regular/index.js index 8cfeb80ffa..8823d69995 100644 --- a/src/core/components/files-regular/index.js +++ b/src/core/components/files-regular/index.js @@ -1,3 +1,5 @@ +'use strict' + module.exports = self => ({ add: require('./add')(self), addPullStream: require('./add-pull-stream')(self), diff --git a/src/core/components/files-regular/utils.js b/src/core/components/files-regular/utils.js index 732b555f0d..8b1241ae76 100644 --- a/src/core/components/files-regular/utils.js +++ b/src/core/components/files-regular/utils.js @@ -1,3 +1,5 @@ +'use strict' + const CID = require('cids') exports.normalizePath = (path) => {