Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak and cache update issues in 14.0.4 #6133

Merged
merged 10 commits into from
Sep 27, 2020
36 changes: 14 additions & 22 deletions lib/data/dataloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const _ = require('lodash');
const async = require('async');
const times = require('../times');
const fitTreatmentsToBGCurve = require('./treatmenttocurve');
const constants = require('../constants');

Expand Down Expand Up @@ -144,8 +143,11 @@ function init(env, ctx) {
done(err, result);
}

// clear treatments to the base set, we're going to merge from multiple queries
ddata.treatments = []; // ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : [];
// clear data we'll get from the cache

ddata.treatments = [];
ddata.devicestatus = [];
ddata.entries = [];

ddata.dbstats = {};

Expand Down Expand Up @@ -196,11 +198,8 @@ function loadEntries(ddata, ctx, callback) {

if (!err && results) {

const ageFilter = ddata.lastUpdated - constants.TWO_DAYS;
const r = ctx.ddata.processRawDataForRuntime(results);
ctx.cache.insertData('entries', r, ageFilter);

const currentData = ctx.cache.getData('entries').reverse();
const currentData = ctx.cache.insertData('entries', r).reverse();

const mbgs = [];
const sgvs = [];
Expand Down Expand Up @@ -324,12 +323,11 @@ function loadTreatments(ddata, ctx, callback) {

ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
const ageFilter = ddata.lastUpdated - longLoad;
const r = ctx.ddata.processRawDataForRuntime(results);

// update cache
ctx.cache.insertData('treatments', r, ageFilter);
ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, ctx.cache.getData('treatments'));
// update cache and apply to runtime data
const r = ctx.ddata.processRawDataForRuntime(results);
const currentData = ctx.cache.insertData('treatments', r);
ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, currentData);
}

callback();
Expand Down Expand Up @@ -361,7 +359,6 @@ function loadProfileSwitchTreatments(ddata, ctx, callback) {
ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
ddata.treatments = mergeProcessSort(ddata.treatments, results);
//mergeToTreatments(ddata, results);
}

// Store last profile switch
Expand Down Expand Up @@ -418,7 +415,6 @@ function loadLatestSingle(ddata, ctx, dataType, callback) {
ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
ddata.treatments = mergeProcessSort(ddata.treatments, results);
//mergeToTreatments(ddata, results);
}
callback();
});
Expand Down Expand Up @@ -473,16 +469,12 @@ function loadDeviceStatus(ddata, env, ctx, callback) {

ctx.devicestatus.list(opts, function(err, results) {
if (!err && results) {
// ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter);

const ageFilter = ddata.lastUpdated - longLoad;
// update cache and apply to runtime data
const r = ctx.ddata.processRawDataForRuntime(results);
ctx.cache.insertData('devicestatus', r, ageFilter);

const res = ctx.cache.getData('devicestatus');
const currentData = ctx.cache.insertData('devicestatus', r);

const res2 = _.map(res, function eachStatus(result) {
//result.mills = new Date(result.created_at).getTime();
const res2 = _.map(currentData, function eachStatus(result) {
if ('uploaderBattery' in result) {
result.uploader = {
battery: result.uploaderBattery
Expand All @@ -492,7 +484,7 @@ function loadDeviceStatus(ddata, env, ctx, callback) {
return result;
});

ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2, ageFilter);
ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2);
} else {
ddata.devicestatus = [];
}
Expand Down
8 changes: 5 additions & 3 deletions lib/data/ddata.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ function init () {

Object.keys(obj).forEach(key => {
if (typeof obj[key] === 'object' && obj[key]) {
if (obj[key].hasOwnProperty('_id')) {
if (Object.prototype.hasOwnProperty.call(obj[key], '_id')) {
obj[key]._id = obj[key]._id.toString();
}
if (obj[key].hasOwnProperty('created_at') && !obj[key].hasOwnProperty('mills')) {
if (Object.prototype.hasOwnProperty.call(obj[key], 'created_at')
&& !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) {
obj[key].mills = new Date(obj[key].created_at).getTime();
}
if (obj[key].hasOwnProperty('sysTime') && !obj[key].hasOwnProperty('mills')) {
if (Object.prototype.hasOwnProperty.call(obj[key], 'sysTime')
&& !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) {
obj[key].mills = new Date(obj[key].sysTime).getTime();
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/server/bootevent.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ function boot (env, language) {
});

ctx.bus.on('data-loaded', function updatePlugins ( ) {
console.info('reloading sandbox data');
// console.info('reloading sandbox data');
var sbx = require('../sandbox')().serverInit(env, ctx);
ctx.plugins.setProperties(sbx);
ctx.notifications.initRequests();
Expand Down
45 changes: 23 additions & 22 deletions lib/server/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,33 @@ function cache (env, ctx) {
, entries: []
};

const dataArray = [
data.treatments
, data.devicestatus
, data.entries
];
const retentionPeriods = {
treatments: constants.ONE_HOUR * 60
, devicestatus: env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2 ? constants.TWO_DAYS : constants.ONE_DAY
, entries: constants.TWO_DAYS
};

function mergeCacheArrays (oldData, newData, retentionPeriod) {

function mergeCacheArrays (oldData, newData, ageLimit) {
const ageLimit = Date.now() - retentionPeriod;

var filtered = _.filter(newData, function hasId (object) {
const hasId = !_.isEmpty(object._id);
const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit);
return isFresh && hasId;
});
var filteredOld = filterForAge(oldData, ageLimit);
var filteredNew = filterForAge(newData, ageLimit);

const merged = ctx.ddata.idMergePreferNew(oldData, filtered);
const merged = ctx.ddata.idMergePreferNew(filteredOld, filteredNew);

return _.sortBy(merged, function(item) {
return -item.mills;
});

function filterForAge(data, ageLimit) {
return _.filter(data, function hasId(object) {
const hasId = !_.isEmpty(object._id);
const isFresh = object.mills >= ageLimit;
return isFresh && hasId;
});
}

}

data.isEmpty = (datatype) => {
Expand All @@ -54,20 +60,17 @@ function cache (env, ctx) {
return _.cloneDeep(data[datatype]);
}

data.insertData = (datatype, newData, retentionPeriod) => {
data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriod);
data.insertData = (datatype, newData) => {
data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriods[datatype]);
return data.getData(datatype);
}

function dataChanged (operation) {
//console.log('Cache data operation requested', operation);

if (!data[operation.type]) return;

if (operation.op == 'remove') {
//console.log('Cache data delete event');
// if multiple items were deleted, flush entire cache
if (!operation.changes) {
//console.log('Multiple items delete from cache, flushing all')
data.treatments = [];
data.devicestatus = [];
data.entries = [];
Expand All @@ -76,9 +79,8 @@ function cache (env, ctx) {
}
}

if (operation.op == 'update') {
//console.log('Cache data update event');
data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes);
if (operation.op == 'update') {
data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[operation.type]);
}
}

Expand All @@ -96,7 +98,6 @@ function cache (env, ctx) {
}

return data;

}

module.exports = cache;
62 changes: 34 additions & 28 deletions lib/server/devicestatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,38 @@ var find_options = require('./query');

function storage (collection, ctx) {

function create(obj, fn) {
function create (obj, fn) {

// Normalize all dates to UTC
const d = moment(obj.created_at).isValid() ? moment.parseZone(obj.created_at) : moment();
obj.created_at = d.toISOString();
obj.utcOffset = d.utcOffset();
api().insert(obj, function (err, doc) {
if (err != null && err.message) {

api().insertOne(obj, function(err, results) {
if (err !== null && err.message) {
console.log('Error inserting the device status object', err.message);
fn(err.message, null);
return;
}

ctx.bus.emit('data-update', {
type: 'devicestatus',
op: 'update',
changes: ctx.ddata.processRawDataForRuntime([doc])
});
if (!err) {

if (!obj._id) obj._id = results.insertedIds[0]._id;

fn(null, doc.ops);
ctx.bus.emit('data-update', {
type: 'devicestatus'
, op: 'update'
, changes: ctx.ddata.processRawDataForRuntime([obj])
});
}

fn(null, results.ops);
ctx.bus.emit('data-received');
});
}

function last(fn) {
return list({count: 1}, function (err, entries) {
function last (fn) {
return list({ count: 1 }, function(err, entries) {
if (entries && entries.length > 0) {
fn(err, entries[0]);
} else {
Expand All @@ -44,18 +49,18 @@ function storage (collection, ctx) {
return find_options(opts, storage.queryOpts);
}

function list(opts, fn) {
function list (opts, fn) {
// these functions, find, sort, and limit, are used to
// dynamically configure the request, based on the options we've
// been given

// determine sort options
function sort ( ) {
return opts && opts.sort || {created_at: -1};
function sort () {
return opts && opts.sort || { created_at: -1 };
}

// configure the limit portion of the current query
function limit ( ) {
function limit () {
if (opts && opts.count) {
return this.limit(parseInt(opts.count));
}
Expand All @@ -68,31 +73,31 @@ function storage (collection, ctx) {
}

// now just stitch them all together
limit.call(api( )
.find(query_for(opts))
.sort(sort( ))
limit.call(api()
.find(query_for(opts))
.sort(sort())
).toArray(toArray);
}

function remove (opts, fn) {

function removed(err, stat) {
function removed (err, stat) {

ctx.bus.emit('data-update', {
type: 'devicestatus',
op: 'remove',
count: stat.result.n,
changes: opts.find._id
type: 'devicestatus'
, op: 'remove'
, count: stat.result.n
, changes: opts.find._id
});

fn(err, stat);
}

return api( ).remove(
return api().remove(
query_for(opts), removed);
}

function api() {
function api () {
return ctx.store.collection(collection);
}

Expand All @@ -101,9 +106,10 @@ function storage (collection, ctx) {
api.query_for = query_for;
api.last = last;
api.remove = remove;
api.aggregate = require('./aggregate')({ }, api);
api.aggregate = require('./aggregate')({}, api);
api.indexedFields = [
'created_at'

, 'NSCLIENT_ID'
];
return api;
Expand Down
Loading