From aa8c83303283934ef332cf8210043b5d81a4c78b Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 11:32:40 +0300 Subject: [PATCH 1/8] * Fix a memory leak in 14.0.4 * Fix linter error in ddata.js --- lib/data/ddata.js | 8 +++++--- lib/server/cache.js | 18 +++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/data/ddata.js b/lib/data/ddata.js index 65120782fc0..9389f10d360 100644 --- a/lib/data/ddata.js +++ b/lib/data/ddata.js @@ -32,13 +32,15 @@ function init () { Object.keys(obj).forEach(key => { if (typeof obj[key] === 'object' && obj[key]) { - if (obj[key].hasOwnProperty('_id')) { + if (Object.prototype.hasOwnProperty.call(obj[key], '_id')) { obj[key]._id = obj[key]._id.toString(); } - if (obj[key].hasOwnProperty('created_at') && !obj[key].hasOwnProperty('mills')) { + if (Object.prototype.hasOwnProperty.call(obj[key], 'created_at') + && !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) { obj[key].mills = new Date(obj[key].created_at).getTime(); } - if (obj[key].hasOwnProperty('sysTime') && !obj[key].hasOwnProperty('mills')) { + if (Object.prototype.hasOwnProperty.call(obj[key], 'sysTime') + && !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) { obj[key].mills = new Date(obj[key].sysTime).getTime(); } } diff --git a/lib/server/cache.js b/lib/server/cache.js index fca93afafde..5b6f51fd1f8 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -29,21 +29,25 @@ function cache (env, ctx) { , data.entries ]; - function mergeCacheArrays (oldData, newData, ageLimit) { - var filtered = _.filter(newData, function hasId (object) { - const hasId = !_.isEmpty(object._id); - const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit); - return isFresh && hasId; - }); + var filteredOld = filterForAge(oldData, ageLimit); + var filteredNew = filterForAge(newData, ageLimit); - const merged = ctx.ddata.idMergePreferNew(oldData, filtered); + const merged = ctx.ddata.idMergePreferNew(filteredOld, filteredNew); return _.sortBy(merged, function(item) { return -item.mills; }); + function filterForAge(data, ageLimit) { + return _.filter(data, function hasId(object) { + const hasId = !_.isEmpty(object._id); + const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit); + return isFresh && hasId; + }); + } + } data.isEmpty = (datatype) => { From 0072bc9c02f2299d364d59cf4f79752ccd258b4a Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 12:01:00 +0300 Subject: [PATCH 2/8] Move data retention periods to caching --- lib/data/dataloader.js | 29 +++++++++-------------------- lib/server/cache.js | 24 +++++++++++++----------- 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index 0f1b3c7bd86..43e1b828d3d 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -2,7 +2,6 @@ const _ = require('lodash'); const async = require('async'); -const times = require('../times'); const fitTreatmentsToBGCurve = require('./treatmenttocurve'); const constants = require('../constants'); @@ -196,11 +195,8 @@ function loadEntries(ddata, ctx, callback) { if (!err && results) { - const ageFilter = ddata.lastUpdated - constants.TWO_DAYS; const r = ctx.ddata.processRawDataForRuntime(results); - ctx.cache.insertData('entries', r, ageFilter); - - const currentData = ctx.cache.getData('entries').reverse(); + const currentData = ctx.cache.insertData('entries', r).reverse(); const mbgs = []; const sgvs = []; @@ -324,12 +320,11 @@ function loadTreatments(ddata, ctx, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { - const ageFilter = ddata.lastUpdated - longLoad; - const r = ctx.ddata.processRawDataForRuntime(results); - // update cache - ctx.cache.insertData('treatments', r, ageFilter); - ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, ctx.cache.getData('treatments')); + // update cache and apply to runtime data + const r = ctx.ddata.processRawDataForRuntime(results); + const currentData = ctx.cache.insertData('treatments', r); + ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, currentData); } callback(); @@ -361,7 +356,6 @@ function loadProfileSwitchTreatments(ddata, ctx, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { ddata.treatments = mergeProcessSort(ddata.treatments, results); - //mergeToTreatments(ddata, results); } // Store last profile switch @@ -418,7 +412,6 @@ function loadLatestSingle(ddata, ctx, dataType, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { ddata.treatments = mergeProcessSort(ddata.treatments, results); - //mergeToTreatments(ddata, results); } callback(); }); @@ -473,16 +466,12 @@ function loadDeviceStatus(ddata, env, ctx, callback) { ctx.devicestatus.list(opts, function(err, results) { if (!err && results) { -// ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter); - const ageFilter = ddata.lastUpdated - longLoad; + // update cache and apply to runtime data const r = ctx.ddata.processRawDataForRuntime(results); - ctx.cache.insertData('devicestatus', r, ageFilter); - - const res = ctx.cache.getData('devicestatus'); + const currentData = ctx.cache.insertData('devicestatus', r); - const res2 = _.map(res, function eachStatus(result) { - //result.mills = new Date(result.created_at).getTime(); + const res2 = _.map(currentData, function eachStatus(result) { if ('uploaderBattery' in result) { result.uploader = { battery: result.uploaderBattery @@ -492,7 +481,7 @@ function loadDeviceStatus(ddata, env, ctx, callback) { return result; }); - ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2, ageFilter); + ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2); } else { ddata.devicestatus = []; } diff --git a/lib/server/cache.js b/lib/server/cache.js index 5b6f51fd1f8..9472fb8ccb7 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -23,13 +23,15 @@ function cache (env, ctx) { , entries: [] }; - const dataArray = [ - data.treatments - , data.devicestatus - , data.entries - ]; + const retentionPeriods = { + treatments: constants.ONE_HOUR * 60 + , devicestatus: constants.TWO_DAYS + , entries: constants.WO_DAYS + }; + + function mergeCacheArrays (oldData, newData, retentionPeriod) { - function mergeCacheArrays (oldData, newData, ageLimit) { + const ageLimit = Date.now() - retentionPeriod; var filteredOld = filterForAge(oldData, ageLimit); var filteredNew = filterForAge(newData, ageLimit); @@ -58,8 +60,9 @@ function cache (env, ctx) { return _.cloneDeep(data[datatype]); } - data.insertData = (datatype, newData, retentionPeriod) => { - data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriod); + data.insertData = (datatype, newData) => { + data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriods[datatype]); + return data.getData(datatype); } function dataChanged (operation) { @@ -80,9 +83,9 @@ function cache (env, ctx) { } } - if (operation.op == 'update') { + if (operation.op == 'update') { //console.log('Cache data update event'); - data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes); + data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[datatype]); } } @@ -100,7 +103,6 @@ function cache (env, ctx) { } return data; - } module.exports = cache; From 69edd36645fa345d3381d1623185ddeee4621a01 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 12:14:29 +0300 Subject: [PATCH 3/8] Fix typo --- lib/server/cache.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/server/cache.js b/lib/server/cache.js index 9472fb8ccb7..1927e479078 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -85,7 +85,7 @@ function cache (env, ctx) { if (operation.op == 'update') { //console.log('Cache data update event'); - data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[datatype]); + data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[operation.type]); } } From 0eefc69ee89b6cb12266190544a7b3fc7bea8327 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 12:40:31 +0300 Subject: [PATCH 4/8] Update _id of inserted entries and device status so merging to cache works correctly --- lib/server/cache.js | 5 -- lib/server/devicestatus.js | 62 +++++++++++---------- lib/server/entries.js | 110 ++++++++++++++++++++----------------- 3 files changed, 95 insertions(+), 82 deletions(-) diff --git a/lib/server/cache.js b/lib/server/cache.js index 1927e479078..661215a86b6 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -66,15 +66,11 @@ function cache (env, ctx) { } function dataChanged (operation) { - //console.log('Cache data operation requested', operation); - if (!data[operation.type]) return; if (operation.op == 'remove') { - //console.log('Cache data delete event'); // if multiple items were deleted, flush entire cache if (!operation.changes) { - //console.log('Multiple items delete from cache, flushing all') data.treatments = []; data.devicestatus = []; data.entries = []; @@ -84,7 +80,6 @@ function cache (env, ctx) { } if (operation.op == 'update') { - //console.log('Cache data update event'); data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[operation.type]); } } diff --git a/lib/server/devicestatus.js b/lib/server/devicestatus.js index f3515367428..e228f5b372c 100644 --- a/lib/server/devicestatus.js +++ b/lib/server/devicestatus.js @@ -5,33 +5,38 @@ var find_options = require('./query'); function storage (collection, ctx) { - function create(obj, fn) { - + function create (obj, fn) { + // Normalize all dates to UTC const d = moment(obj.created_at).isValid() ? moment.parseZone(obj.created_at) : moment(); obj.created_at = d.toISOString(); obj.utcOffset = d.utcOffset(); - - api().insert(obj, function (err, doc) { - if (err != null && err.message) { + + api().insertOne(obj, function(err, results) { + if (err !== null && err.message) { console.log('Error inserting the device status object', err.message); fn(err.message, null); return; } - ctx.bus.emit('data-update', { - type: 'devicestatus', - op: 'update', - changes: ctx.ddata.processRawDataForRuntime([doc]) - }); + if (!err) { + + if (!obj._id) obj._id = results.insertedIds[0]._id; - fn(null, doc.ops); + ctx.bus.emit('data-update', { + type: 'devicestatus' + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime([obj]) + }); + } + + fn(null, results.ops); ctx.bus.emit('data-received'); }); } - function last(fn) { - return list({count: 1}, function (err, entries) { + function last (fn) { + return list({ count: 1 }, function(err, entries) { if (entries && entries.length > 0) { fn(err, entries[0]); } else { @@ -44,18 +49,18 @@ function storage (collection, ctx) { return find_options(opts, storage.queryOpts); } - function list(opts, fn) { + function list (opts, fn) { // these functions, find, sort, and limit, are used to // dynamically configure the request, based on the options we've // been given // determine sort options - function sort ( ) { - return opts && opts.sort || {created_at: -1}; + function sort () { + return opts && opts.sort || { created_at: -1 }; } // configure the limit portion of the current query - function limit ( ) { + function limit () { if (opts && opts.count) { return this.limit(parseInt(opts.count)); } @@ -68,31 +73,31 @@ function storage (collection, ctx) { } // now just stitch them all together - limit.call(api( ) - .find(query_for(opts)) - .sort(sort( )) + limit.call(api() + .find(query_for(opts)) + .sort(sort()) ).toArray(toArray); } function remove (opts, fn) { - function removed(err, stat) { + function removed (err, stat) { ctx.bus.emit('data-update', { - type: 'devicestatus', - op: 'remove', - count: stat.result.n, - changes: opts.find._id + type: 'devicestatus' + , op: 'remove' + , count: stat.result.n + , changes: opts.find._id }); fn(err, stat); } - return api( ).remove( + return api().remove( query_for(opts), removed); } - function api() { + function api () { return ctx.store.collection(collection); } @@ -101,9 +106,10 @@ function storage (collection, ctx) { api.query_for = query_for; api.last = last; api.remove = remove; - api.aggregate = require('./aggregate')({ }, api); + api.aggregate = require('./aggregate')({}, api); api.indexedFields = [ 'created_at' + , 'NSCLIENT_ID' ]; return api; diff --git a/lib/server/entries.js b/lib/server/entries.js index f6b61024e7d..50c8e0cc41a 100644 --- a/lib/server/entries.js +++ b/lib/server/entries.js @@ -10,60 +10,60 @@ var moment = require('moment'); * Encapsulate persistent storage of sgv entries. \**********/ -function storage(env, ctx) { +function storage (env, ctx) { // TODO: Code is a little redundant. // query for entries from storage function list (opts, fn) { - // these functions, find, sort, and limit, are used to - // dynamically configure the request, based on the options we've - // been given + // these functions, find, sort, and limit, are used to + // dynamically configure the request, based on the options we've + // been given - // determine sort options - function sort ( ) { - return opts && opts.sort || {date: -1}; - } + // determine sort options + function sort () { + return opts && opts.sort || { date: -1 }; + } - // configure the limit portion of the current query - function limit ( ) { - if (opts && opts.count) { - return this.limit(parseInt(opts.count)); - } - return this; + // configure the limit portion of the current query + function limit () { + if (opts && opts.count) { + return this.limit(parseInt(opts.count)); } + return this; + } - // handle all the results - function toArray (err, entries) { - fn(err, entries); - } + // handle all the results + function toArray (err, entries) { + fn(err, entries); + } - // now just stitch them all together - limit.call(api( ) - .find(query_for(opts)) - .sort(sort( )) - ).toArray(toArray); + // now just stitch them all together + limit.call(api() + .find(query_for(opts)) + .sort(sort()) + ).toArray(toArray); } function remove (opts, fn) { - api( ).remove(query_for(opts), function (err, stat) { + api().remove(query_for(opts), function(err, stat) { ctx.bus.emit('data-update', { - type: 'entries', - op: 'remove', - count: stat.result.n, - changes: opts.find._id + type: 'entries' + , op: 'remove' + , count: stat.result.n + , changes: opts.find._id }); //TODO: this is triggering a read from Mongo, we can do better - ctx.bus.emit('data-received'); - fn(err, stat); - }); + ctx.bus.emit('data-received'); + fn(err, stat); + }); } // return writable stream to lint each sgv record passing through it // TODO: get rid of this? not doing anything now - function map ( ) { + function map () { return es.map(function iter (item, next) { return next(null, item); }); @@ -80,7 +80,7 @@ function storage(env, ctx) { create(result, fn); } // lint and store the entire list - return es.pipeline(map( ), es.writeArray(done)); + return es.pipeline(map(), es.writeArray(done)); } //TODO: implement @@ -91,9 +91,9 @@ function storage(env, ctx) { // store new documents using the storage mechanism function create (docs, fn) { // potentially a batch insert - var firstErr = null, - numDocs = docs.length, - totalCreated = 0; + var firstErr = null + , numDocs = docs.length + , totalCreated = 0; docs.forEach(function(doc) { @@ -106,15 +106,21 @@ function storage(env, ctx) { doc.sysTime = _sysTime.toISOString(); if (doc.dateString) doc.dateString = doc.sysTime; - var query = (doc.sysTime && doc.type) ? {sysTime: doc.sysTime, type: doc.type} : doc; - api( ).update(query, doc, {upsert: true}, function (err) { + var query = (doc.sysTime && doc.type) ? { sysTime: doc.sysTime, type: doc.type } : doc; + api().update(query, doc, { upsert: true }, function(err, updateResults) { firstErr = firstErr || err; - ctx.bus.emit('data-update', { - type: 'entries', - op: 'update', - changes: ctx.ddata.processRawDataForRuntime([doc]) - }); + if (!err) { + if (updateResults.result.upserted) { + doc._id = updateResults.result.upserted[0]._id + } + + ctx.bus.emit('data-update', { + type: 'entries' + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime([doc]) + }); + } if (++totalCreated === numDocs) { //TODO: this is triggering a read from Mongo, we can do better @@ -125,8 +131,8 @@ function storage(env, ctx) { }); } - function getEntry(id, fn) { - api( ).findOne({_id: ObjectID(id)}, function (err, entry) { + function getEntry (id, fn) { + api().findOne({ _id: ObjectID(id) }, function(err, entry) { if (err) { fn(err); } else { @@ -140,7 +146,7 @@ function storage(env, ctx) { } // closure to represent the API - function api ( ) { + function api () { // obtain handle usable for querying the collection associated // with these records return ctx.store.collection(env.entries_collection); @@ -154,15 +160,21 @@ function storage(env, ctx) { api.persist = persist; api.query_for = query_for; api.getEntry = getEntry; - api.aggregate = require('./aggregate')({ }, api); + api.aggregate = require('./aggregate')({}, api); api.indexedFields = [ 'date' + , 'type' + , 'sgv' + , 'mbg' + , 'sysTime' + , 'dateString' - , { 'type' : 1, 'date' : -1, 'dateString' : 1 } + + , { 'type': 1, 'date': -1, 'dateString': 1 } ]; return api; } @@ -176,7 +188,7 @@ storage.queryOpts = { , rssi: parseInt , noise: parseInt , mbg: parseInt - } + } , useEpoch: true }; From 46eb70f8de55df84c2e94592cb9409425165c3c2 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 13:06:20 +0300 Subject: [PATCH 5/8] Reset the data in ddata before updating --- lib/data/dataloader.js | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index 43e1b828d3d..97b6933b551 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -143,8 +143,11 @@ function init(env, ctx) { done(err, result); } - // clear treatments to the base set, we're going to merge from multiple queries - ddata.treatments = []; // ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : []; + // clear data we'll get from the cache + + ddata.treatments = []; + ddata.devicestatus = []; + ddata.entries = []; ddata.dbstats = {}; @@ -195,6 +198,8 @@ function loadEntries(ddata, ctx, callback) { if (!err && results) { + console.log('Loaded entries', results.length); + const r = ctx.ddata.processRawDataForRuntime(results); const currentData = ctx.cache.insertData('entries', r).reverse(); From cd8719bd8d031cbd566c3bbef6bff5bfb6f93289 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 13:18:39 +0300 Subject: [PATCH 6/8] Fix typo on entry cache retention period --- lib/data/dataloader.js | 2 -- lib/server/cache.js | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index 97b6933b551..7074b18e1fb 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -198,8 +198,6 @@ function loadEntries(ddata, ctx, callback) { if (!err && results) { - console.log('Loaded entries', results.length); - const r = ctx.ddata.processRawDataForRuntime(results); const currentData = ctx.cache.insertData('entries', r).reverse(); diff --git a/lib/server/cache.js b/lib/server/cache.js index 661215a86b6..e0ebdedd371 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -26,7 +26,7 @@ function cache (env, ctx) { const retentionPeriods = { treatments: constants.ONE_HOUR * 60 , devicestatus: constants.TWO_DAYS - , entries: constants.WO_DAYS + , entries: constants.TWO_DAYS }; function mergeCacheArrays (oldData, newData, retentionPeriod) { @@ -45,7 +45,7 @@ function cache (env, ctx) { function filterForAge(data, ageLimit) { return _.filter(data, function hasId(object) { const hasId = !_.isEmpty(object._id); - const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit); + const isFresh = object.mills >= ageLimit; return isFresh && hasId; }); } From fc1528eae97c843ecf1ed190dc2626c34619de48 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 13:24:35 +0300 Subject: [PATCH 7/8] Have device status cache retention period follow configuration --- lib/server/cache.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/server/cache.js b/lib/server/cache.js index e0ebdedd371..1b2576ce029 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -25,7 +25,7 @@ function cache (env, ctx) { const retentionPeriods = { treatments: constants.ONE_HOUR * 60 - , devicestatus: constants.TWO_DAYS + , devicestatus: env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2 ? constants.TWO_DAYS : constants.ONE_DAY , entries: constants.TWO_DAYS }; From be2ddcd825a23e6438d5d28c5059f4764d17bab2 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 14:49:02 +0300 Subject: [PATCH 8/8] Fix _id injection in treatments --- lib/server/bootevent.js | 2 +- lib/server/treatments.js | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/server/bootevent.js b/lib/server/bootevent.js index 93c6c7f2057..6247645b3d2 100644 --- a/lib/server/bootevent.js +++ b/lib/server/bootevent.js @@ -255,7 +255,7 @@ function boot (env, language) { }); ctx.bus.on('data-loaded', function updatePlugins ( ) { - console.info('reloading sandbox data'); + // console.info('reloading sandbox data'); var sbx = require('../sandbox')().serverInit(env, ctx); ctx.plugins.setProperties(sbx); ctx.notifications.initRequests(); diff --git a/lib/server/treatments.js b/lib/server/treatments.js index dad13b5b5d6..a9107ea99b2 100644 --- a/lib/server/treatments.js +++ b/lib/server/treatments.js @@ -53,9 +53,7 @@ function storage (env, ctx) { if (!err) { if (updateResults.result.upserted) { obj._id = updateResults.result.upserted[0]._id - //console.log('PERSISTENCE: treatment upserted', updateResults.result.upserted[0]); } - //console.log('Update result', updateResults.result); } // TODO document this feature @@ -72,7 +70,14 @@ function storage (env, ctx) { } query.created_at = pbTreat.created_at; - api( ).update(query, pbTreat, {upsert: true}, function pbComplete (err) { + api( ).update(query, pbTreat, {upsert: true}, function pbComplete (err, updateResults) { + + if (!err) { + if (updateResults.result.upserted) { + pbTreat._id = updateResults.result.upserted[0]._id + } + } + var treatments = _.compact([obj, pbTreat]); ctx.bus.emit('data-update', {