Skip to content

Commit

Permalink
feat(verify): converted to Promises
Browse files Browse the repository at this point in the history
BREAKING CHANGE: API is now promise-based
  • Loading branch information
zkat committed Feb 27, 2017
1 parent 697aa38 commit f0b3974
Showing 1 changed file with 106 additions and 111 deletions.
217 changes: 106 additions & 111 deletions lib/verify.js
Original file line number Diff line number Diff line change
@@ -1,173 +1,168 @@
'use strict'

var asyncMap = require('slide').asyncMap
const Promise = require('bluebird')

var checksumStream = require('checksum-stream')
var fixOwner = require('./util/fix-owner')
var fs = require('graceful-fs')
var index = require('./entry-index')
var lockfile = require('lockfile')
var lockfile = Promise.promisifyAll(require('lockfile'))
var path = require('path')
var pipe = require('mississippi').pipe
var rimraf = require('rimraf')
var pipe = Promise.promisify(require('mississippi').pipe)
var rimraf = Promise.promisify(require('rimraf'))

Promise.promisifyAll(fs)

module.exports = verify
function verify (cache, opts, _cb) {
if (!_cb) {
_cb = opts
opts = null
}
function verify (cache, opts) {
opts = opts || {}
var lock = path.join(cache, 'verify.lock')
var cb = function (err, stats) {
lockfile.unlock(lock, function (er) {
_cb(er || err, stats)
opts.log && opts.log.verbose('verify', 'verifying content cache at', cache)
const startTime = +(new Date())
return fixOwner.mkdirfix(
cache, opts.uid, opts.gid
).then(() => {
const lockPath = path.join(cache, 'verify.lock')
const lock = lockfile.lockAsync(lockPath).disposer(() => {
return lockfile.unlock(lockPath)
})
}
fixOwner.mkdirfix(cache, opts.uid, opts.gid, function (err) {
if (err) { return _cb(err) }
lockfile.lock(lock, function (err) {
if (err) { return _cb(err) }
garbageCollect(cache, opts, function (err, gcStats) {
if (err) { return cb(err) }
tidyIndex(cache, opts, function (err, tidyStats) {
if (err) { return cb(err) }
return Promise.using(lock, () => {
return garbageCollect(cache, opts).then(gcStats => {
return tidyIndex(cache, opts).then(tidyStats => {
var stats = tidyStats
Object.keys(gcStats).forEach(function (key) {
stats[key] = gcStats[key]
})
var verifile = path.join(cache, '_lastverified')
fs.writeFile(verifile, '' + (+(new Date())), function (err) {
if (err) { return cb(err) }
fixOwner.chownr(cache, opts.uid, opts.gid, function (err) {
if (err) { return cb(err) }
rimraf(path.join(cache, 'tmp'), function (err) {
if (err) { return cb(err) }
cb(null, stats)
})
})
})
return stats
})
}).then(stats => {
var verifile = path.join(cache, '_lastverified')
opts.log && opts.log.verbose('verify', 'writing verifile to ' + verifile)
return fs.writeFileAsync(
verifile, '' + (+(new Date()))
).then(() => {
opts.log && opts.log.verbose('verify', 'fixing cache ownership')
return fixOwner.chownr(cache, opts.uid, opts.gid)
}).then(() => {
opts.log && opts.log.verbose('verify', 'clearing out tmp')
return rimraf(path.join(cache, 'tmp'))
}).then(() => stats)
})
})
}).then(stats => {
stats.runTime = (+(new Date()) - startTime) / 1000
opts.log && opts.log.verbose('verify', 'final stats:', stats)
return stats
})
}

function tidyIndex (cache, opts, cb) {
index.ls(cache, function (err, entries) {
if (err) { return cb(err) }
rimraf(path.join(cache, 'index'), function (err) {
if (err) { return cb(err) }
function tidyIndex (cache, opts) {
opts.log && opts.log.verbose('verify', 'tidying index')
return index.ls(cache).then(entries => {
return rimraf(path.join(cache, 'index')).then(() => {
var stats = {
entriesRemoved: 0,
digestMissing: 0,
totalEntries: 0
}
asyncMap(Object.keys(entries), function (key, cb) {
return Promise.reduce(Object.keys(entries), (stats, key) => {
var entry = entries[key]
if (!entry.digest) {
stats.digestMissing++
return cb()
return stats
}
var content = path.join(cache, 'content', entries[key].digest)
fs.stat(content, function (err) {
if (err && err.code === 'ENOENT') {
stats.entriesRemoved++
return cb()
} else {
stats.totalEntries++
index.insert(cache, key, entry.digest, {
uid: opts.uid,
gid: opts.gid,
metadata: entry.metadata
}, cb)
return fs.statAsync(content).catch(err => {
if (err.code === 'ENOENT') {
stats.entriesRemoves++
return stats
}
}).then(() => {
stats.totalEntries++
return index.insert(cache, key, entry.digest, {
uid: opts.uid,
gid: opts.gid,
metadata: entry.metadata
}).then(() => stats)
})
}, function (err) {
if (err) { return cb(err) }
cb(null, stats)
})
}, stats)
})
})
}

function garbageCollect (cache, opts, cb) {
index.ls(cache, function (err, entries) {
function garbageCollect (cache, opts) {
opts.log && opts.log.verbose('verify', 'garbage collecting content')
return index.ls(cache).then(entries => {
var byDigest = {}
Object.keys(entries).forEach(function (k) {
byDigest[entries[k].digest] = entries[k]
})
if (err) { return cb(err) }
var stats = {
verifiedContent: 0,
collectedCount: 0,
reclaimedSize: 0
}
var contentDir = path.join(cache, 'content')
fs.readdir(contentDir, function (err, files) {
if (err && err.code === 'ENOENT') {
return cb(null, stats)
} else if (err) {
return cb(err)
return fs.readdirAsync(contentDir).catch(err => {
if (err.code === 'ENOENT') {
return
} else {
asyncMap(files, function (f, cb) {
var fullPath = path.join(contentDir, f)
if (byDigest[f]) {
var algo = opts.hashAlgorithm || 'sha1'
verifyContent(fullPath, algo, function (err, collected) {
if (err) { return cb(err) }
if (collected != null) {
stats.collectedCount++
stats.reclaimedSize += collected
} else {
stats.verifiedContent++
}
cb()
})
} else {
stats.collectedCount++
fs.stat(fullPath, function (err, s) {
if (err) { return cb(err) }
stats.reclaimedSize += s.size
rimraf(path.join(contentDir, f), cb)
})
}
}, function (err) {
if (err) { return cb(err) }
cb(null, stats)
})
throw err
}
}).then(files => {
var stats = {
verifiedContent: 0,
collectedCount: 0,
reclaimedSize: 0,
keptSize: 0
}
return Promise.reduce(files, (stats, f) => {
var fullPath = path.join(contentDir, f)
if (byDigest[f]) {
var algo = opts.hashAlgorithm || 'sha1'
return verifyContent(fullPath, algo).then(info => {
if (!info.valid) {
stats.collectedCount++
stats.reclaimedSize += info.size
} else {
stats.verifiedContent++
stats.keptSize += info.size
}
return stats
})
} else {
stats.collectedCount++
return fs.statAsync(fullPath).then(s => {
stats.reclaimedSize += s.size
return rimraf(path.join(contentDir, f)).then(() => stats)
})
}
}, stats)
})
})
}

function verifyContent (filepath, algo, cb) {
fs.stat(filepath, function (err, stat) {
if (err) { return cb(err) }
function verifyContent (filepath, algo) {
return fs.statAsync(filepath).then(stat => {
var reader = fs.createReadStream(filepath)
var checksummer = checksumStream({
digest: path.basename(filepath),
algorithm: algo
})
checksummer.on('data', function () {})
pipe(reader, checksummer, function (err) {
var contentInfo = {
size: stat.size,
valid: true
}
checksummer.on('data', () => {})
return pipe(reader, checksummer).catch(err => {
if (err && err.code === 'EBADCHECKSUM') {
rimraf(filepath, function (err) {
if (err) { return cb(err) }
cb(null, stat.size)
return rimraf(filepath).then(() => {
contentInfo.valid = false
})
} else if (err) {
return cb(err)
} else {
cb(null, null)
throw err
}
})
}).then(() => contentInfo)
})
}

module.exports.lastRun = lastRun
function lastRun (cache, cb) {
fs.readFile(path.join(cache, '_lastverified'), 'utf8', function (err, data) {
if (err) { return cb(err) }
cb(null, new Date(+data))
})
function lastRun (cache) {
return fs.readFileAsync(
path.join(cache, '_lastverified'), 'utf8'
).then(data => new Date(+data))
}

0 comments on commit f0b3974

Please sign in to comment.