Skip to content

Commit

Permalink
feat(put): added memoization support to put
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Mar 2, 2017
1 parent 4205cf0 commit b613a70
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions put.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,21 @@
const Promise = require('bluebird')

const index = require('./lib/entry-index')
const memo = require('./lib/memoization')
const pipe = Promise.promisify(require('mississippi').pipe)
const putContent = require('./lib/content/put-stream')
const through = require('mississippi').through
const to = require('mississippi').to

module.exports = putData
function putData (cache, key, data, opts, cb) {
if (!cb) {
cb = opts
opts = null
}
function putData (cache, key, data, opts) {
opts = opts || {}
const src = through()
let meta
let digest
const dest = putStream(cache, key, opts)
dest.on('metadata', function (m) { meta = m })
const ret = pipe(src, dest).then(() => meta)
src.write(data, function () {
src.end()
})
dest.on('digest', d => { digest = d })
const ret = pipe(src, dest).then(() => digest)
src.write(data, () => src.end())
return ret
}

Expand All @@ -33,26 +28,37 @@ function putStream (cache, key, opts) {
const contentStream = putContent(cache, opts).on('digest', function (d) {
digest = d
})
let errored = false
const stream = to(function (chunk, enc, cb) {
contentStream.write(chunk, enc, cb)
}, function (cb) {
contentStream.end(function () {
let memoData
let memoTotal = 0
const stream = to((chunk, enc, cb) => {
contentStream.write(chunk, enc, () => {
if (opts.memoize) {
if (!memoData) { memoData = [] }
memoData.push(chunk)
memoTotal += chunk.length
}
cb()
})
}, cb => {
contentStream.end(() => {
index.insert(cache, key, digest, opts).then(entry => {
if (opts.memoize) {
memo.put(cache, entry, Buffer.concat(memoData, memoTotal))
}
stream.emit('digest', digest)
stream.emit('metadata', entry)
cb()
})
})
})
stream.on('error', function (err) {
if (errored) { return }
errored = true
let erred = false
stream.once('error', err => {
if (erred) { return }
erred = true
contentStream.emit('error', err)
})
contentStream.on('error', function (err) {
if (errored) { return }
errored = true
contentStream.once('error', err => {
if (erred) { return }
erred = true
stream.emit('error', err)
})
return stream
Expand Down

0 comments on commit b613a70

Please sign in to comment.