Skip to content
This repository has been archived by the owner on Jul 31, 2020. It is now read-only.

Compaction API #350

Merged
merged 5 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/constants/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ const messages = {
* browser sends this to delete all records in a category.
*/
DELETE_SYNC_CATEGORY: _, /* @param {string} categoryName */
/**
* browser -> webview
* browser sends this to compact records in a category
*/
COMPACT_SYNC_CATEGORY: _, /* @param {string} categoryName */
/**
* webview -> browser
* webview sends this to delete all site settings.
Expand Down
76 changes: 72 additions & 4 deletions client/requestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const proto = require('./constants/proto')
const {limitConcurrency} = require('../lib/promiseHelper')
const s3Helper = require('../lib/s3Helper')
const serializer = require('../lib/serializer')
const LRUCache = require('lru-cache')

const CONFIG = require('./config')

Expand Down Expand Up @@ -63,6 +64,9 @@ const RequestUtil = function (opts = {}) {
this.saveAWSCredentials(credentials)
}
this.SQSUrlByCat = []
this.missingObjectsCache = new LRUCache(50)
// This is used to keep the most recent records for each object id
this.latestRecordsCache = new LRUCache(100)
}

/**
Expand Down Expand Up @@ -178,9 +182,12 @@ RequestUtil.prototype.parseAWSResponse = function (bytes) {
* @param {string} category - the category ID
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340)
* @param {number=} maxRecords Limit response to a given number of recods. By default the Sync lib will fetch all matching records, which might take a long time. If falsey, fetch all records.
* @param {{
* compaction {boolean} // compact records while list object from S3
* }} opts
* @returns {Promise(Array.<Object>)}
*/
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken) {
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, opts = {}) {
const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: maxRecords || 1000,
Expand All @@ -192,8 +199,26 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
}
if (startAt) { options.StartAfter = `${prefix}/${startAt}` }
return this.withRetry(() => {
if (this.shouldListObject(startAt, category)) {
return s3Helper.listObjects(this.s3, options, !!maxRecords)
if (this.shouldListObject(startAt, category) || opts.compaction) {
const s3ObjectsPromise = s3Helper.listObjects(this.s3, options, !!maxRecords)
if (!opts.compaction) {
return s3ObjectsPromise
}
return new Promise((resolve, reject) => {
s3ObjectsPromise.then((s3Objects) => {
setTimeout(() => {
this.compactObjects(s3Objects.contents)
if (s3Objects.isTruncated) {
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, opts)
}
return new Promise((resolve, reject) => {
// compaction is done
resolve()
})
}, 15000)
})
resolve()
})
}

if (!this.SQSUrlByCat[category]) {
Expand Down Expand Up @@ -354,6 +379,11 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
const radix64 = require('../lib/radix64')
const output = []
const partBuffer = {}
const objectMap = {}
// restore the partBuffer from last round
this.missingObjectsCache.forEach((value, key, cache) => {
partBuffer[key] = value
})
for (let s3Object of s3Objects) {
const parsedKey = s3Helper.parseS3Key(s3Object.Key)
const fullCrc = parsedKey.recordCrc
Expand All @@ -362,23 +392,33 @@ RequestUtil.prototype.s3ObjectsToRecords = function (s3Objects) {
partBuffer[fullCrc] = partBuffer[fullCrc].concat(data)
data = partBuffer[fullCrc]
}
if (objectMap[fullCrc]) {
objectMap[fullCrc].push(s3Object.Key)
} else {
objectMap[fullCrc] = [s3Object.Key]
}
const dataBytes = s3Helper.s3StringToByteArray(data)
const dataCrc = radix64.fromNumber(crc.crc32.unsigned(dataBytes.buffer))
if (dataCrc === fullCrc) {
let decrypted = {}
try {
decrypted = this.decrypt(dataBytes)
decrypted.syncTimestamp = parsedKey.timestamp
output.push(decrypted)
output.push(
{ record: decrypted,
objects: objectMap[fullCrc]
})
} catch (e) {
console.log(`Record with CRC ${crc} can't be decrypted: ${e}`)
}
if (partBuffer[fullCrc]) { delete partBuffer[fullCrc] }
if (this.missingObjectsCache.has(fullCrc)) { this.missingObjectsCache.del(fullCrc) }
} else {
partBuffer[fullCrc] = data
}
}
for (let crc in partBuffer) {
this.missingObjectsCache.set(crc, partBuffer[crc])
console.log(`Record with CRC ${crc} is missing parts or corrupt.`)
}
return output
Expand Down Expand Up @@ -420,6 +460,34 @@ RequestUtil.prototype.put = function (category, record) {
})
}

/**
* Compact all records in a category
* @param {string=} category - the category ID
*/
RequestUtil.prototype.compactObjects = function (s3Objects) {
let s3ObjectsToDelete = []
const recordObjects = this.s3ObjectsToRecords(s3Objects)
recordObjects.forEach((recordObject) => {
const record = recordObject.record
const id = JSON.stringify(record.objectId)
if (this.latestRecordsCache.has(id)) {
console.log('compaction deletes')
const cacheRecordObject = this.latestRecordsCache.get(id)
if (record.syncTimestamp > cacheRecordObject.record.syncTimestamp) {
s3ObjectsToDelete = s3ObjectsToDelete.concat(cacheRecordObject.objects)
console.log(cacheRecordObject.record)
this.latestRecordsCache.set(id, recordObject)
} else {
s3ObjectsToDelete = s3ObjectsToDelete.concat(recordObject.objects)
console.log(record)
}
} else {
this.latestRecordsCache.set(id, recordObject)
}
})
s3Helper.deleteObjects(this.s3, this.bucket, s3ObjectsToDelete)
}

RequestUtil.prototype.s3PostFormData = function (objectKey) {
let formData = new FormData() // eslint-disable-line
formData.append('key', objectKey)
Expand Down
20 changes: 15 additions & 5 deletions client/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ const maybeSetDeviceId = (requester) => {
}
return requester.list(proto.categories.PREFERENCES)
.then(s3Objects => requester.s3ObjectsToRecords(s3Objects.contents))
.then((records) => {
.then((recordObjects) => {
let maxId = -1
if (records && records.length) {
records.forEach((record) => {
if (recordObjects && recordObjects.length) {
recordObjects.forEach((recordObject) => {
const record = recordObject.record
const device = record.device
if (device && record.deviceId[0] > maxId) {
maxId = record.deviceId[0]
Expand All @@ -86,9 +87,10 @@ const startSync = (requester) => {
* @returns {Array.<Object>}
*/
const getJSRecords = (s3Objects, filterFunction) => {
const records = requester.s3ObjectsToRecords(s3Objects)
const recordObjects = requester.s3ObjectsToRecords(s3Objects)
let jsRecords = []
for (let record of records) {
for (let recordObject of recordObjects) {
const record = recordObject.record
const jsRecord = recordUtil.syncRecordAsJS(record)
// Useful but stored in the S3 key.
jsRecord.syncTimestamp = record.syncTimestamp
Expand Down Expand Up @@ -184,6 +186,14 @@ const startSync = (requester) => {
requester.purgeUserQueue()
})
})
ipc.on(messages.COMPACT_SYNC_CATEGORY, (e, category) => {
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
}
requester.list(proto.categories[category], 0, 1000, '', {compaction: true}).then(() => {
logSync(`Compacting category: ${category}`)
})
})
ipc.on(messages.DELETE_SYNC_SITE_SETTINGS, (e) => {
logSync(`Deleting siteSettings`)
requester.deleteSiteSettings().then(() => {
Expand Down
25 changes: 25 additions & 0 deletions lib/s3Helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,28 @@ module.exports.deletePrefix = function (s3, bucket, prefix, deleteIf) {
listAndDelete()
})
}

/**
* Delete objects in s3
* @param {AwsSdk.S3} s3
* @param {string} bucket
* @param {Array.<string>} objects
*/
module.exports.deleteObjects = function (s3, bucket, objects) {
if (objects.length > 0) {
s3.deleteObjects({
Bucket: bucket,
Delete: {
Objects: objects.map(object => {
var newObj = {}
newObj['Key'] = object.toString()
return newObj
})
}
}, function (err, data) {
if (err) {
console.error(err, err.stack)
}
})
}
}
49 changes: 41 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"deep-equal": "^1.0.1",
"express": "^4.14.0",
"lodash.merge": "^4.6.2",
"lru-cache": "^5.1.1",
"protobufjs": "^6.8.0",
"raven": "^0.12.3",
"redis": "^2.8.0",
Expand Down
Loading