Skip to content

Commit

Permalink
feat(index): promisify entry index
Browse files Browse the repository at this point in the history
BREAKING CHANGE: this changes the API to work off promises instead of callbacks
  • Loading branch information
zkat committed Feb 27, 2017
1 parent ae204bb commit cda3335
Showing 1 changed file with 95 additions and 90 deletions.
185 changes: 95 additions & 90 deletions lib/entry-index.js
Original file line number Diff line number Diff line change
@@ -1,104 +1,106 @@
'use strict'

var asyncMap = require('slide/lib/async-map')
var contentPath = require('./content/path')
var fixOwner = require('./util/fix-owner')
var fs = require('graceful-fs')
var lockfile = require('lockfile')
var path = require('path')
var pipe = require('mississippi').pipe
var split = require('split')
var through = require('mississippi').through
const asyncMap = require('slide/lib/async-map')
const contentPath = require('./content/path')
const fixOwner = require('./util/fix-owner')
const fs = require('graceful-fs')
const lockfile = require('lockfile')
const path = require('path')
const pipe = require('mississippi').pipe
const Promise = require('bluebird')
const split = require('split')
const through = require('mississippi').through

module.exports.insert = insert
function insert (cache, key, digest, opts, _cb) {
if (!_cb) {
_cb = opts
opts = null
}
function insert (cache, key, digest, opts) {
opts = opts || {}
var bucket = indexPath(cache, key)
var lock = bucket + '.lock'
var cb = function (err, entry) {
lockfile.unlock(lock, function (er) {
_cb(er || err, entry)
})
}
fixOwner.mkdirfix(path.dirname(bucket), opts.uid, opts.gid, function (err) {
if (err) { return _cb(err) }
lockfile.lock(lock, {
stale: 60000,
retries: 10,
wait: 10000
}, function (err) {
if (err) { return _cb(err) }
fs.stat(bucket, function (err, existing) {
if (err && err.code !== 'ENOENT' && err.code !== 'EPERM') { cb(err) }
var entry = {
key: key,
digest: digest,
time: +(new Date()),
metadata: opts.metadata
}
// Because of the way these entries work,
// the index is safe from fs.appendFile stopping
// mid-write so long as newlines are *prepended*
//
// That is, if a write fails, it will be ignored
// by `find`, and the next successful one will be
// used.
//
// This should be -very rare-, since `fs.appendFile`
// will often be atomic on most platforms unless
// very large metadata has been included, but caches
// like this one tend to last a long time. :)
// Most corrupted reads are likely to be from attempting
// to read the index while it's being written to --
// which is safe, but not guaranteed to be atomic.
var e = (existing ? '\n' : '') + JSON.stringify(entry)
fs.appendFile(bucket, e, function (err) {
if (err) { return cb(err) }
fixOwner.chownr(bucket, opts.uid, opts.gid, function (err) {
const bucket = indexPath(cache, key)
const lock = bucket + '.lock'
return fixOwner.mkdirfix(
path.dirname(bucket), opts.uid, opts.gid
).then(() => (
Promise.fromNode(_cb => {
const cb = (err, entry) => {
lockfile.unlock(lock, er => {
_cb(err || er, entry)
})
}
lockfile.lock(lock, {
stale: 60000,
retries: 10,
wait: 10000
}, function (err) {
if (err) { return _cb(err) }
fs.stat(bucket, function (err, existing) {
if (err && err.code !== 'ENOENT' && err.code !== 'EPERM') {
return cb(err)
}
const entry = {
key: key,
digest: digest,
time: +(new Date()),
metadata: opts.metadata
}
// Because of the way these entries work,
// the index is safe from fs.appendFile stopping
// mid-write so long as newlines are *prepended*
//
// That is, if a write fails, it will be ignored
// by `find`, and the next successful one will be
// used.
//
// This should be -very rare-, since `fs.appendFile`
// will often be atomic on most platforms unless
// very large metadata has been included, but caches
// like this one tend to last a long time. :)
// Most corrupted reads are likely to be from attempting
// to read the index while it's being written to --
// which is safe, but not guaranteed to be atomic.
const e = (existing ? '\n' : '') + JSON.stringify(entry)
fs.appendFile(bucket, e, function (err) {
cb(err, entry)
})
})
})
})
})
)).then(() => fixOwner.chownr(bucket, opts.uid, opts.gid))
}

module.exports.find = find
function find (cache, key, cb) {
var bucket = indexPath(cache, key)
var stream = fs.createReadStream(bucket)
var ret
pipe(stream, split('\n', null, {trailing: true}).on('data', function (l) {
try {
var obj = JSON.parse(l)
} catch (e) {
return
}
if (obj && (obj.key === key)) {
ret = formatEntry(cache, obj)
}
}), function (err) {
if (err && err.code === 'ENOENT') {
cb(null, null)
} else {
cb(err, ret)
}
function find (cache, key) {
const bucket = indexPath(cache, key)
const stream = fs.createReadStream(bucket)
let ret
return Promise.fromNode(cb => {
pipe(stream, split('\n', null, {trailing: true}).on('data', function (l) {
let obj
try {
obj = JSON.parse(l)
} catch (e) {
return
}
if (obj && (obj.key === key)) {
ret = formatEntry(cache, obj)
}
}), function (err) {
if (err && err.code === 'ENOENT') {
cb(null, null)
} else {
cb(err, ret)
}
})
})
}

module.exports.delete = del
function del (cache, key, cb) {
insert(cache, key, null, cb)
function del (cache, key) {
return insert(cache, key, null)
}

module.exports.lsStream = lsStream
function lsStream (cache) {
var indexPath = path.join(cache, 'index')
var stream = through.obj()
const indexPath = path.join(cache, 'index')
const stream = through.obj()
fs.readdir(indexPath, function (err, files) {
if (err && err.code === 'ENOENT') {
return stream.end()
Expand All @@ -108,10 +110,11 @@ function lsStream (cache) {
asyncMap(files, function (f, cb) {
fs.readFile(path.join(indexPath, f), 'utf8', function (err, data) {
if (err) { return cb(err) }
var entries = {}
const entries = {}
data.split('\n').forEach(function (entry) {
let parsed
try {
var parsed = JSON.parse(entry)
parsed = JSON.parse(entry)
} catch (e) {
}
// NOTE - it's possible for an entry to be
Expand All @@ -136,18 +139,20 @@ function lsStream (cache) {
}

module.exports.ls = ls
function ls (cache, cb) {
var entries = {}
lsStream(cache).on('finish', function () {
cb(null, entries)
}).on('data', function (d) {
entries[d.key] = d
}).on('error', cb)
function ls (cache) {
const entries = {}
return Promise.fromNode(cb => {
lsStream(cache).on('finish', function () {
cb(null, entries)
}).on('data', function (d) {
entries[d.key] = d
}).on('error', cb)
})
}

module.exports.notFoundError = notFoundError
function notFoundError (cache, key) {
var err = new Error('content not found')
const err = new Error('content not found')
err.code = 'ENOENT'
err.cache = cache
err.key = key
Expand Down

0 comments on commit cda3335

Please sign in to comment.