Skip to content

Commit

Permalink
feat(verify): tested, working cache verifier/gc (#68)
Browse files Browse the repository at this point in the history
Fixes: #3
  • Loading branch information
zkat authored Mar 12, 2017
1 parent d5d25ba commit 45ad77a
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 127 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ Completely resets the in-memory entry cache.
Checks out and fixes up your cache:

* Cleans up corrupted or invalid index entries.
* Custom entry filtering options.
* Garbage collects any content entries not referenced by the index.
* Checks digests for all content entries and removes invalid content.
* Fixes cache ownership.
Expand All @@ -440,8 +441,8 @@ reading/writing on the cache.

* `opts.uid` - uid to assign to cache and its contents
* `opts.gid` - gid to assign to cache and its contents
* `opts.hashAlgorithm` - defaults to `'sha512'`. Hash to use for content checks.

* `opts.filter` - receives a formatted entry. Return false to remove it.
Note: might be called more than once on the same entry.

##### Example

Expand Down
10 changes: 7 additions & 3 deletions lib/content/path.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ function contentPath (cache, address, hashAlgorithm) {
address = address && address.toLowerCase()
hashAlgorithm = hashAlgorithm ? hashAlgorithm.toLowerCase() : 'sha512'
return path.join.apply(path, [
cache,
`content-v${contentVer}`,
hashAlgorithm,
contentDir(cache),
hashAlgorithm
].concat(hashToSegments(address)))
}

module.exports._contentDir = contentDir
function contentDir (cache) {
return path.join(cache, `content-v${contentVer}`)
}
1 change: 1 addition & 0 deletions lib/entry-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ function bucketEntries (cache, bucket, filter) {
})
}

module.exports._bucketDir = bucketDir
function bucketDir (cache) {
return path.join(cache, `index-v${indexV}`)
}
Expand Down
287 changes: 166 additions & 121 deletions lib/verify.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,121 +2,111 @@

const Promise = require('bluebird')

var checksumStream = require('checksum-stream')
var fixOwner = require('./util/fix-owner')
var fs = require('graceful-fs')
var index = require('./entry-index')
var lockfile = Promise.promisifyAll(require('lockfile'))
var path = require('path')
var pipe = Promise.promisify(require('mississippi').pipe)
var rimraf = Promise.promisify(require('rimraf'))
const checksumStream = require('checksum-stream')
const contentPath = require('./content/path')
const finished = Promise.promisify(require('mississippi').finished)
const fixOwner = require('./util/fix-owner')
const fs = require('graceful-fs')
const glob = Promise.promisify(require('glob'))
const index = require('./entry-index')
const path = require('path')
const pipe = Promise.promisify(require('mississippi').pipe)
const rimraf = Promise.promisify(require('rimraf'))

Promise.promisifyAll(fs)

module.exports.lastRun = lastRun
function lastRun (cache) {
return fs.readFileAsync(
path.join(cache, '_lastverified'), 'utf8'
).then(data => new Date(+data))
}

module.exports = verify
function verify (cache, opts) {
opts = opts || {}
opts.log && opts.log.verbose('verify', 'verifying content cache at', cache)
const startTime = +(new Date())
return fixOwner.mkdirfix(
cache, opts.uid, opts.gid
).then(() => {
const lockPath = path.join(cache, 'verify.lock')
const lock = lockfile.lockAsync(lockPath).disposer(() => {
return lockfile.unlock(lockPath)
})
return Promise.using(lock, () => {
return garbageCollect(cache, opts).then(gcStats => {
return tidyIndex(cache, opts).then(tidyStats => {
var stats = tidyStats
Object.keys(gcStats).forEach(function (key) {
stats[key] = gcStats[key]
})
return stats
})
}).then(stats => {
var verifile = path.join(cache, '_lastverified')
opts.log && opts.log.verbose('verify', 'writing verifile to ' + verifile)
return fs.writeFileAsync(
verifile, '' + (+(new Date()))
).then(() => {
opts.log && opts.log.verbose('verify', 'fixing cache ownership')
return fixOwner.chownr(cache, opts.uid, opts.gid)
}).then(() => {
opts.log && opts.log.verbose('verify', 'clearing out tmp')
return rimraf(path.join(cache, 'tmp'))
}).then(() => stats)
opts.log && opts.log.silly('verify', 'verifying cache at', cache)
return Promise.reduce([
markStartTime,
fixPerms,
garbageCollect,
rebuildIndex,
cleanTmp,
writeVerifile,
markEndTime
], (stats, step, i) => {
const label = step.name || `step #${i}`
const start = new Date()
return Promise.resolve(step(cache, opts)).then(s => {
s && Object.keys(s).forEach(k => {
stats[k] = s[k]
})
const end = new Date()
if (!stats.runTime) { stats.runTime = {} }
stats.runTime[label] = end - start
return stats
})
}).then(stats => {
stats.runTime = (+(new Date()) - startTime) / 1000
opts.log && opts.log.verbose('verify', 'final stats:', stats)
return stats
}, {}).tap(stats => {
stats.runTime.total = stats.endTime - stats.startTime
opts.log && opts.log.silly('verify', 'verification finished for', cache, 'in', `${stats.runTime.total}ms`)
})
}

function tidyIndex (cache, opts) {
opts.log && opts.log.verbose('verify', 'tidying index')
return index.ls(cache).then(entries => {
return rimraf(path.join(cache, 'index')).then(() => {
var stats = {
entriesRemoved: 0,
digestMissing: 0,
totalEntries: 0
}
return Promise.reduce(Object.keys(entries), (stats, key) => {
var entry = entries[key]
if (!entry.digest) {
stats.digestMissing++
return stats
}
var content = path.join(cache, 'content', entries[key].digest)
return fs.statAsync(content).catch(err => {
if (err.code === 'ENOENT') {
stats.entriesRemoved++
return stats
}
}).then(() => {
stats.totalEntries++
return index.insert(cache, key, entry.digest, {
uid: opts.uid,
gid: opts.gid,
metadata: entry.metadata
}).then(() => stats)
})
}, stats)
})
})
function markStartTime (cache, opts) {
return { startTime: new Date() }
}

function markEndTime (cache, opts) {
return { endTime: new Date() }
}

function fixPerms (cache, opts) {
opts.log && opts.log.silly('verify', 'fixing cache permissions')
return fixOwner.mkdirfix(cache, opts.uid, opts.gid).then(() => {
// TODO - fix file permissions too
fixOwner.chownr(cache, opts.uid, opts.gid)
}).then(() => null)
}

// Implements a naive mark-and-sweep tracing garbage collector.
//
// The algorithm is basically as follows:
// 1. Read (and filter) all index entries ("pointers")
// 2. Mark each algo/digest combo as "live"
// 3. Read entire filesystem tree in `content-vX/` dir
// 4. If content is live, verify its checksum and delete it if it fails
// 5. If content is not marked as live, rimraf it.
//
function garbageCollect (cache, opts) {
opts.log && opts.log.verbose('verify', 'garbage collecting content')
return index.ls(cache).then(entries => {
var byDigest = {}
Object.keys(entries).forEach(function (k) {
byDigest[entries[k].digest] = entries[k]
})
var contentDir = path.join(cache, 'content')
return fs.readdirAsync(contentDir).catch(err => {
if (err.code === 'ENOENT') {
return
} else {
throw err
}
opts.log && opts.log.silly('verify', 'garbage collecting content')
const indexStream = index.lsStream(cache)
const liveContent = new Set()
indexStream.on('data', entry => {
if (opts && opts.filter && !opts.filter(entry)) { return }
liveContent.add(`${entry.hashAlgorithm}-${entry.digest}`)
})
return finished(indexStream).then(() => {
const contentDir = contentPath._contentDir(cache)
return glob(path.join(contentDir, '**'), {
follow: false,
nodir: true,
nosort: true
}).then(files => {
var stats = {
return Promise.resolve({
verifiedContent: 0,
collectedCount: 0,
reclaimedCount: 0,
reclaimedSize: 0,
badContentCount: 0,
keptSize: 0
}
return Promise.reduce(files, (stats, f) => {
var fullPath = path.join(contentDir, f)
if (byDigest[f]) {
var algo = opts.hashAlgorithm || 'sha512'
return verifyContent(fullPath, algo).then(info => {
}).tap((stats) => Promise.map(files, (f) => {
const split = f.split(/[/\\]/)
const digest = split.slice(split.length - 3).join('')
const algo = split[split.length - 4]
if (liveContent.has(`${algo}-${digest}`)) {
return verifyContent(f, digest, algo).then(info => {
if (!info.valid) {
stats.collectedCount++
stats.reclaimedCount++
stats.badContentCount++
stats.reclaimedSize += info.size
} else {
stats.verifiedContent++
Expand All @@ -125,44 +115,99 @@ function garbageCollect (cache, opts) {
return stats
})
} else {
stats.collectedCount++
return fs.statAsync(fullPath).then(s => {
stats.reclaimedSize += s.size
return rimraf(path.join(contentDir, f)).then(() => stats)
// No entries refer to this content. We can delete.
stats.reclaimedCount++
return fs.statAsync(f).then(s => {
return rimraf(f).then(() => {
stats.reclaimedSize += s.size
return stats
})
})
}
}, stats)
}, {concurrency: opts.concurrency || 20}))
})
})
}

function verifyContent (filepath, algo) {
function verifyContent (filepath, digest, algorithm) {
return fs.statAsync(filepath).then(stat => {
var reader = fs.createReadStream(filepath)
var checksummer = checksumStream({
digest: path.basename(filepath),
algorithm: algo
})
var contentInfo = {
const reader = fs.createReadStream(filepath)
const checksummer = checksumStream({digest, algorithm})
const contentInfo = {
size: stat.size,
valid: true
}
checksummer.on('data', () => {})
return pipe(reader, checksummer).catch(err => {
if (err && err.code === 'EBADCHECKSUM') {
return rimraf(filepath).then(() => {
contentInfo.valid = false
})
} else {
throw err
}
return pipe(reader, checksummer).catch({code: 'EBADCHECKSUM'}, () => {
return rimraf(filepath).then(() => {
contentInfo.valid = false
})
}).then(() => contentInfo)
}).catch({code: 'ENOENT'}, () => ({size: 0, valid: false}))
}

function rebuildIndex (cache, opts) {
opts.log && opts.log.silly('verify', 'rebuilding index')
return index.ls(cache).then(entries => {
const stats = {
missingContent: 0,
rejectedEntries: 0,
totalEntries: 0
}
const buckets = {}
for (let k in entries) {
if (entries.hasOwnProperty(k)) {
const hashed = index._hashKey(k)
const entry = entries[k]
const excluded = opts && opts.filter && !opts.filter(entry)
excluded && stats.rejectedEntries++
if (buckets[hashed] && !excluded) {
buckets[hashed].push(entry)
} else if (buckets[hashed] && excluded) {
// skip
} else if (excluded) {
buckets[hashed] = []
buckets[hashed]._path = index._bucketPath(cache, k)
} else {
buckets[hashed] = [entry]
buckets[hashed]._path = index._bucketPath(cache, k)
}
}
}
return Promise.map(Object.keys(buckets), key => {
return rebuildBucket(cache, buckets[key], stats, opts)
}, {concurrency: opts.concurrency || 20}).then(() => stats)
})
}

module.exports.lastRun = lastRun
function lastRun (cache) {
return fs.readFileAsync(
path.join(cache, '_lastverified'), 'utf8'
).then(data => new Date(+data))
function rebuildBucket (cache, bucket, stats, opts) {
return fs.truncateAsync(bucket._path).then(() => {
// This needs to be serialized because cacache explicitly
// lets very racy bucket conflicts clobber each other.
return Promise.mapSeries(bucket, entry => {
const content = contentPath(cache, entry.digest, entry.hashAlgorithm)
return fs.statAsync(content).then(() => {
return index.insert(cache, entry.key, entry.digest, {
uid: opts.uid,
gid: opts.gid,
hashAlgorithm: entry.hashAlgorithm,
metadata: entry.metadata
}).then(() => { stats.totalEntries++ })
}).catch({code: 'ENOENT'}, () => {
stats.rejectedEntries++
stats.missingContent++
})
})
})
}

function cleanTmp (cache, opts) {
opts.log && opts.log.silly('verify', 'cleaning tmp directory')
return rimraf(path.join(cache, 'tmp'))
}

function writeVerifile (cache, opts) {
const verifile = path.join(cache, '_lastverified')
opts.log && opts.log.silly('verify', 'writing verifile to ' + verifile)
return fs.writeFileAsync(verifile, '' + (+(new Date())))
}
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
"chownr": "^1.0.1",
"dezalgo": "^1.0.3",
"graceful-fs": "^4.1.10",
"lockfile": "^1.0.2",
"mississippi": "^1.2.0",
"mkdirp": "^0.5.1",
"once": "^1.4.0",
Expand Down
Loading

0 comments on commit 45ad77a

Please sign in to comment.