Skip to content

Commit

Permalink
fix(perf): use bulk file reads for index reads
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Mar 5, 2017
1 parent 1af0141 commit 79a8891
Showing 1 changed file with 54 additions and 52 deletions.
106 changes: 54 additions & 52 deletions lib/entry-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ const crypto = require('crypto')
const fixOwner = require('./util/fix-owner')
const fs = require('graceful-fs')
const path = require('path')
const pipe = require('mississippi').pipe
const Promise = require('bluebird')
const split = require('split')
const through = require('mississippi').through

const indexV = require('../package.json')['cache-version'].index

const appendFileAsync = Promise.promisify(fs.appendFile)
const readFileAsync = Promise.promisify(fs.readFile)

module.exports.insert = insert
function insert (cache, key, digest, opts) {
Expand Down Expand Up @@ -50,32 +49,20 @@ function insert (cache, key, digest, opts) {
module.exports.find = find
function find (cache, key) {
const bucket = bucketPath(cache, key)
const stream = fs.createReadStream(bucket)
let ret
return Promise.fromNode(cb => {
pipe(stream, split('\n', null, {trailing: true}).on('data', function (l) {
const pieces = l.split('\t')
if (!pieces[1] || pieces[1].length !== parseInt(pieces[0], 10)) {
// Length is no good! Corruption ahoy!
return
}
let obj
try {
obj = JSON.parse(pieces[1])
} catch (e) {
// Entry is corrupted!
return
}
if (obj && (obj.key === key)) {
ret = formatEntry(cache, obj)
}
}), function (err) {
if (err && err.code === 'ENOENT') {
cb(null, null)
return bucketEntries(cache, bucket).then(entries => {
return entries.reduce((latest, next) => {
if (next && next.key === key) {
return formatEntry(cache, next)
} else {
cb(err, ret)
return latest
}
})
}, null)
}).catch(err => {
if (err.code === 'ENOENT') {
return null
} else {
throw err
}
})
}

Expand All @@ -102,38 +89,27 @@ function lsStream (cache) {
return cb(err)
} else {
asyncMap(files, function (f, cb) {
fs.readFile(path.join(indexDir, bucket, f), 'utf8', function (err, data) {
if (err) { return cb(err) }
const entries = {}
data.split('\n').slice(1).forEach(function (entry) {
const pieces = entry.split('\t')
if (pieces[1].length !== parseInt(pieces[0], 10)) {
// Length is no good! Corruption ahoy!
return
}
let parsed
try {
parsed = JSON.parse(pieces[1])
} catch (e) {
}
// NOTE - it's possible for an entry to be
// incomplete/corrupt. So we just skip it.
// See comment on `insert()` for deets.
if (parsed) {
entries[parsed.key] = formatEntry(cache, parsed)
}
})
const bpath = path.join(indexDir, bucket, f)
bucketEntries(cache, bpath).then(_entries => {
const entries = _entries.reduce((acc, entry) => {
acc[entry.key] = entry
return acc
}, {})
Object.keys(entries).forEach(function (k) {
stream.write(entries[k])
stream.write(formatEntry(cache, entries[k]))
})
cb()
}, err => {
if (err.code === 'ENOENT') {
cb()
} else {
cb(err)
}
})
}, function (err) {
cb(err)
})
}, cb)
}
})
}, err => {
}, function (err) {
if (err) { stream.emit('error') }
stream.end()
})
Expand Down Expand Up @@ -163,6 +139,32 @@ function notFoundError (cache, key) {
return err
}

function bucketEntries (cache, bucket, filter) {
return readFileAsync(
bucket, 'utf8'
).then(data => {
let entries = []
data.split('\n').forEach(entry => {
const pieces = entry.split('\t')
if (!pieces[1] || pieces[1].length !== parseInt(pieces[0], 10)) {
// Length is no good! Corruption ahoy!
return
}
let obj
try {
obj = JSON.parse(pieces[1])
} catch (e) {
// Entry is corrupted!
return
}
if (obj) {
entries.push(obj)
}
})
return entries
})
}

function bucketDir (cache) {
return path.join(cache, `index-v${indexV}`)
}
Expand Down

0 comments on commit 79a8891

Please sign in to comment.