From 0c7e9bf4568c6c5b73d5c69b6b1b5eb26b63dda1 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sat, 19 Sep 2020 18:17:20 +0300 Subject: [PATCH 1/6] * Require created_at for Treatments on insert * Refactoring the cache: moved to another file and support flushing from bus events --- lib/api/devicestatus/index.js | 2 +- lib/api/entries/index.js | 8 +-- lib/api/treatments/index.js | 24 ++++++-- lib/client/careportal.js | 2 + lib/constants.json | 1 + lib/data/dataloader.js | 26 ++++---- lib/data/ddata.js | 1 - lib/server/bootevent.js | 1 + lib/server/cache.js | 67 ++++++++++++++++++++ lib/server/treatments.js | 54 +++++++++++++++- npm-shrinkwrap.json | 112 +++++++++++++++++----------------- package.json | 1 + tests/api.treatments.test.js | 29 +++++++-- 13 files changed, 245 insertions(+), 83 deletions(-) create mode 100644 lib/server/cache.js diff --git a/lib/api/devicestatus/index.js b/lib/api/devicestatus/index.js index bdddfae9212..94dfb1c9385 100644 --- a/lib/api/devicestatus/index.js +++ b/lib/api/devicestatus/index.js @@ -47,7 +47,7 @@ function configure (app, wares, ctx, env) { q.count = 10; } - const inMemoryData = ctx.ddata.shadow.devicestatus ? ctx.ddata.shadow.devicestatus : []; + const inMemoryData = ctx.cache.devicestatus ? ctx.cache.devicestatus : []; const canServeFromMemory = inMemoryData.length >= q.count && Object.keys(q).length == 1 ? true : false; if (canServeFromMemory) { diff --git a/lib/api/entries/index.js b/lib/api/entries/index.js index 088bb86d3af..8cbf99ff30f 100644 --- a/lib/api/entries/index.js +++ b/lib/api/entries/index.js @@ -456,11 +456,11 @@ function configure (app, wares, ctx, env) { let inMemoryCollection; if (typeQuery) { - if (typeQuery == 'sgv') inMemoryCollection = ctx.ddata.shadow.sgvs; - if (typeQuery == 'mbg') inMemoryCollection = ctx.ddata.shadow.mbgs; - if (typeQuery == 'cal') inMemoryCollection = ctx.ddata.shadow.cals; + if (typeQuery == 'sgv') inMemoryCollection = ctx.cache.sgvs; + if (typeQuery == 'mbg') inMemoryCollection = ctx.cache.mbgs; + if (typeQuery == 'cal') inMemoryCollection = ctx.cache.cals; } else { - const merged = _.unionWith(ctx.ddata.shadow.sgvs, ctx.ddata.shadow.mbgs, ctx.ddata.shadow.cals, function(a, b) { + const merged = _.unionWith(ctx.cache.sgvs, ctx.cache.mbgs, ctx.cache.cals, function(a, b) { return a._id == b._id; }); inMemoryCollection = _.sortBy(merged, function(item) { diff --git a/lib/api/treatments/index.js b/lib/api/treatments/index.js index 0d318301257..df3cc6d7503 100644 --- a/lib/api/treatments/index.js +++ b/lib/api/treatments/index.js @@ -91,7 +91,7 @@ function configure (app, wares, ctx, env) { query.count = query.find ? 1000 : 100; } - const inMemoryData = ctx.ddata.shadow.treatments; + const inMemoryData = ctx.cache.treatments; const canServeFromMemory = inMemoryData && inMemoryData.length >= query.count && Object.keys(query).length == 1 ? true : false; if (canServeFromMemory) { @@ -112,12 +112,27 @@ function configure (app, wares, ctx, env) { treatments = [treatments]; } + for (let i = 0; i < treatments.length; i++) { + const t = treatments[i]; + if (!t.created_at) { + console.log('Trying to create treatment without created_at field', t); + res.sendJSONStatus(res, constants.HTTP_VALIDATION_ERROR, 'Treatments must contain created_at'); + return; + } + const d = moment(t.created_at); + if (!d.isValid()) { + console.log('Trying to insert date with invalid created_at', t); + res.sendJSONStatus(res, constants.HTTP_VALIDATION_ERROR, 'Treatments created_at must be an ISO-8601 date'); + return; + } + } + ctx.treatments.create(treatments, function(err, created) { if (err) { console.log('Error adding treatment', err); res.sendJSONStatus(res, constants.HTTP_INTERNAL_ERROR, 'Mongo Error', err); } else { - console.log('Treatment created'); + console.log('REST API treatment created', created); res.json(created); } }); @@ -146,11 +161,12 @@ function configure (app, wares, ctx, env) { console.log('treatments delete error: ', err); return next(err); } + + console.log('treatments records deleted', query); + // yield some information about success of operation res.json(stat); - console.log('treatments records deleted'); - return next(); }); } diff --git a/lib/client/careportal.js b/lib/client/careportal.js index e2cbe1c1208..a6edfe3ec22 100644 --- a/lib/client/careportal.js +++ b/lib/client/careportal.js @@ -283,6 +283,8 @@ function init (client, $) { data.eventTime = mergeDateAndTime().toDate(); } + data.created_at = data.eventTime ? data.eventTime.toISOString() : new Date().toISOString(); + if (!inputMatrix[data.eventType].profile) { delete data.profile; } diff --git a/lib/constants.json b/lib/constants.json index 2f0375c49c6..38e36040d42 100644 --- a/lib/constants.json +++ b/lib/constants.json @@ -4,6 +4,7 @@ "HTTP_UNAUTHORIZED" : 401, "HTTP_VALIDATION_ERROR" : 422, "HTTP_INTERNAL_ERROR" : 500, + "HTTP_BAD_REQUEST": 400, "ENTRIES_DEFAULT_COUNT" : 10, "PROFILES_DEFAULT_COUNT" : 10, "MMOL_TO_MGDL": 18, diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index 511a844c147..cf3ae46d840 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -145,7 +145,7 @@ function init(env, ctx) { } // clear treatments to the base set, we're going to merge from multiple queries - ddata.treatments = ddata.shadow.treatments ? _.cloneDeep(ddata.shadow.treatments) : []; + ddata.treatments = ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : []; ddata.dbstats = {}; @@ -248,9 +248,9 @@ function loadEntries(ddata, ctx, callback) { ddata.sgvs = mergeProcessSort(ddata.sgvs, sgvs, ageLimit); ddata.mbgs = mergeProcessSort(ddata.mbgs, uniqBasedOnMills(mbgs), ageLimit); ddata.cals = mergeProcessSort(ddata.cals, uniqBasedOnMills(cals), ageLimit); - ddata.shadow.sgvs = mergeProcessSort(ddata.shadow.sgvs, shadowSgvs, ageLimit).reverse(); - ddata.shadow.mbgs = mergeProcessSort(ddata.shadow.mbgs, uniqBasedOnMills(shadowMbgs), ageLimit).reverse(); - ddata.shadow.cals = mergeProcessSort(ddata.shadow.cals, uniqBasedOnMills(shadowCals), ageLimit).reverse(); + ctx.cache.sgvs = mergeProcessSort(ctx.cache.sgvs, shadowSgvs, ageLimit).reverse(); + ctx.cache.mbgs = mergeProcessSort(ctx.cache.mbgs, uniqBasedOnMills(shadowMbgs), ageLimit).reverse(); + ctx.cache.cals = mergeProcessSort(ctx.cache.cals, uniqBasedOnMills(shadowCals), ageLimit).reverse(); } callback(); }); @@ -309,9 +309,13 @@ function loadTreatments(ddata, ctx, callback) { // Load 2.5 days to cover last 48 hours including overlapping temp boluses or temp targets for first load // Subsequently load at least 15 minutes of data, but if latest entry is older than 15 minutes, load until that entry - const loadPeriod = ddata.treatments && ddata.treatments.length > 0 && !withFrame ? constants.SIX_HOURS : longLoad; - const latestEntry = ddata.treatments && ddata.treatments.length > 0 ? findLatestMills(ddata.treatments) : ddata.lastUpdated; - const loadTime = Math.min(ddata.lastUpdated - loadPeriod, latestEntry); + let loadTime = ddata.lastUpdated - longLoad; + + if (ctx.cache.treatments.length > 0) { + const loadPeriod = ddata.treatments && ddata.treatments.length > 0 && !withFrame ? constants.SIX_HOURS : longLoad; + const latestEntry = ddata.treatments && ddata.treatments.length > 0 ? findLatestMills(ddata.treatments) : ddata.lastUpdated; + loadTime = Math.min(ddata.lastUpdated - loadPeriod, latestEntry); + } var dateRange = { $gte: new Date(loadTime).toISOString() @@ -331,9 +335,9 @@ function loadTreatments(ddata, ctx, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { const ageFilter = ddata.lastUpdated - longLoad; - ddata.treatments = mergeProcessSort(ddata.treatments, results, ageFilter); - ddata.shadow.treatments = mergeProcessSort(ddata.shadow.treatments, results, ageFilter).reverse(); //.reverse(); - //mergeToTreatments(ddata, results); + // update cache + ctx.cache.treatments = mergeProcessSort(ctx.cache.treatments, results, ageFilter).reverse(); + ddata.treatments = mergeProcessSort(ddata.treatments, _.cloneDeep(ctx.cache.treatments), ageFilter); } callback(); @@ -487,7 +491,7 @@ function loadDeviceStatus(ddata, env, ctx, callback) { ctx.devicestatus.list(opts, function(err, results) { if (!err && results) { const ageFilter = ddata.lastUpdated - constants.TWO_DAYS; - ddata.shadow.devicestatus = mergeProcessSort(ddata.shadow.devicestatus, results, ageFilter); + ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter); const r = _.map(results, function eachStatus(result) { //result.mills = new Date(result.created_at).getTime(); diff --git a/lib/data/ddata.js b/lib/data/ddata.js index 265bca5789e..ed0830b270a 100644 --- a/lib/data/ddata.js +++ b/lib/data/ddata.js @@ -19,7 +19,6 @@ function init () { , activity: [] , dbstats: {} , lastUpdated: 0 - , shadow: {} }; ddata.clone = function clone () { diff --git a/lib/server/bootevent.js b/lib/server/bootevent.js index f5702efc4fe..17f0319b360 100644 --- a/lib/server/bootevent.js +++ b/lib/server/bootevent.js @@ -176,6 +176,7 @@ function boot (env, language) { ctx.properties = require('../api/properties')(env, ctx); ctx.bus = require('../bus')(env.settings, ctx); ctx.ddata = require('../data/ddata')(); + ctx.cache = require('./cache')(env,ctx); ctx.dataloader = require('../data/dataloader')(env, ctx); ctx.notifications = require('../notifications')(env, ctx); diff --git a/lib/server/cache.js b/lib/server/cache.js new file mode 100644 index 00000000000..cf6ec7b8f6b --- /dev/null +++ b/lib/server/cache.js @@ -0,0 +1,67 @@ +'use strict'; + +var _ = require('lodash'); + +function cache (env, ctx) { + + const data = { + treatments : [], + devicestatus : [], + sgvs : [], + cals : [], + mbgs : [] + }; + + const dataArray = [ + data.treatments, + data.devicestatus, + data.sgvs, + data.cals, + data.mbgs + ]; + + function match(o1, o2) { + return o1._id == o2._id; + } + + function dataChanged(operation) { + console.log('Cache data update event', data); + + if (operation.op == 'remove') { + console.log('Cache data delete event'); + // if multiple items were deleted, flush entire cache + if (!operation.changes) { + console.log('Multiple items delete from cache, flushing all') + data[operation.op.type] = []; + } else { + removeFromArray(data[operation.type], operation.changes); + } + } + + if (operation.op == 'update') { + console.log('Cache data update event'); + } + } + + ctx.bus.on('data-update', dataChanged); + + function removeFromArray(array, id) { + for (let i = 0; i < array.length; i++) { + const o = array[i]; + if (o._id == id) { + console.log('Deleting object from cache', id); + array.splice(i,1); + break; + } + } + } + + cache.updateObject = (o) => { + + } + + return data; + +} + +module.exports = cache; diff --git a/lib/server/treatments.js b/lib/server/treatments.js index 3edd00a155e..c4ae785767d 100644 --- a/lib/server/treatments.js +++ b/lib/server/treatments.js @@ -3,7 +3,6 @@ var _ = require('lodash'); var async = require('async'); var moment = require('moment'); - var find_options = require('./query'); function storage (env, ctx) { @@ -48,12 +47,18 @@ function storage (env, ctx) { }; api( ).update(query, obj, {upsert: true}, function complete (err, updateResults) { + + if (err) console.error('Problem upserting treatment', err); + if (!err) { if (updateResults.result.upserted) { obj._id = updateResults.result.upserted[0]._id + console.log('PERSISTENCE: treatment upserted', updateResults.result.upserted[0]); } + console.log('Update result', updateResults.result); } + // TODO document this feature if (!err && obj.preBolus) { //create a new object to insert copying only the needed fields var pbTreat = { @@ -69,9 +74,23 @@ function storage (env, ctx) { query.created_at = pbTreat.created_at; api( ).update(query, pbTreat, {upsert: true}, function pbComplete (err) { var treatments = _.compact([obj, pbTreat]); + + ctx.bus.emit('data-update', { + type: 'treatments', + op: 'update', + changes: treatments + }); + fn(err, treatments); }); } else { + + ctx.bus.emit('data-update', { + type: 'treatments', + op: 'update', + changes: [obj] + }); + fn(err, [obj]); } @@ -99,7 +118,16 @@ function storage (env, ctx) { function remove (opts, fn) { return api( ).remove(query_for(opts), function (err, stat) { - //TODO: this is triggering a read from Mongo, we can do better + //TODO: this is triggering a read from Mongo, we can do better + console.log('Treatment removed', opts); // , stat); + + ctx.bus.emit('data-update', { + type: 'treatments', + op: 'remove', + count: stat.result.n, + changes: opts.find._id + }); + ctx.bus.emit('data-received'); fn(err, stat); }); @@ -108,6 +136,23 @@ function storage (env, ctx) { function save (obj, fn) { obj._id = new ObjectID(obj._id); prepareData(obj); + + function saved (err, created) { + if (!err) { +// console.log('Treatment updated', created); + + ctx.bus.emit('data-update', { + type: 'treatments', + op: 'update', + changes: [obj] + }); + + } + if (err) console.error('Problem saving treating', err); + + fn(err, created); + } + api().save(obj, fn); ctx.bus.emit('data-received'); @@ -147,6 +192,7 @@ function prepareData(obj) { // Convert all dates to UTC dates + // TODO remove this -> must not create new date if missing const d = moment(obj.created_at).isValid() ? moment.parseZone(obj.created_at) : moment(); obj.created_at = d.toISOString(); @@ -170,7 +216,7 @@ function prepareData(obj) { obj.relative = Number(obj.relative); obj.preBolus = Number(obj.preBolus); - //NOTE: the eventTime is sent by the client, but deleted, we only store created_at right now + //NOTE: the eventTime is sent by the client, but deleted, we only store created_at var eventTime; if (obj.eventTime) { eventTime = new Date(obj.eventTime).toISOString(); @@ -220,6 +266,8 @@ function prepareData(obj) { delete obj.units; } + console.log('Preparing treatment for insertion, obj', obj, 'results', results); + return results; } diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 0c16f367bd4..106d5676bac 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -1551,30 +1551,6 @@ "resolved": "https://registry.npmjs.org/aws4/-/aws4-1.10.1.tgz", "integrity": "sha512-zg7Hz2k5lI8kb7U32998pRRFin7zJlkfezGJjUc2heaD4Pw2wObakCDVzkKztTm/Ln7eiVvYsjqak0Ed4LkMDA==" }, - "axios": { - "version": "0.19.2", - "resolved": "https://registry.npmjs.org/axios/-/axios-0.19.2.tgz", - "integrity": "sha512-fjgm5MvRHLhx+osE2xoekY70AhARk3a6hkN+3Io1jc00jtquGvxYlKlsFUhmUET0V5te6CcZI7lcv2Ym61mjHA==", - "requires": { - "follow-redirects": "1.5.10" - } - }, - "axios-cookiejar-support": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/axios-cookiejar-support/-/axios-cookiejar-support-1.0.0.tgz", - "integrity": "sha512-9pBlIU5jfrGZTnUQlt8symShviSTOSlOKGtryHx76lJPnKIXDqUT3JDAjJ1ywOQLyfiWrthIt4iJiVP2L2S4jA==", - "requires": { - "is-redirect": "^1.0.0", - "pify": "^5.0.0" - }, - "dependencies": { - "pify": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/pify/-/pify-5.0.0.tgz", - "integrity": "sha512-eW/gHNMlxdSP6dmG6uJip6FXN0EQBwm2clYYd8Wul42Cwu/DK8HEftzsapcNdYe2MfLiIwZqsDk2RDEsTE79hA==" - } - } - }, "babel-code-frame": { "version": "6.26.0", "resolved": "https://registry.npmjs.org/babel-code-frame/-/babel-code-frame-6.26.0.tgz", @@ -4227,24 +4203,6 @@ "readable-stream": "^2.3.6" } }, - "follow-redirects": { - "version": "1.5.10", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.5.10.tgz", - "integrity": "sha512-0V5l4Cizzvqt5D44aTXbFZz+FtyXV1vrDN6qrelxtfYQKW0KO0W2T/hkE8xvGa/540LkZlkaUjO4ailYTFtHVQ==", - "requires": { - "debug": "=3.1.0" - }, - "dependencies": { - "debug": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz", - "integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==", - "requires": { - "ms": "2.0.0" - } - } - } - }, "for-in": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz", @@ -5179,7 +5137,8 @@ "is-redirect": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/is-redirect/-/is-redirect-1.0.0.tgz", - "integrity": "sha1-HQPd7VO9jbDzDCbk+V02/HyH3CQ=" + "integrity": "sha1-HQPd7VO9jbDzDCbk+V02/HyH3CQ=", + "dev": true }, "is-regex": { "version": "1.1.1", @@ -6399,6 +6358,23 @@ "resolved": "https://registry.npmjs.org/aws4/-/aws4-1.8.0.tgz", "integrity": "sha512-ReZxvNHIOv88FlT7rxcXIIC0fPt4KZqZbOlivyWtXLt8ESx84zd3kMC6iK5jVeS2qt+g7ftS7ye4fi06X5rtRQ==" }, + "axios": { + "version": "0.19.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-0.19.2.tgz", + "integrity": "sha512-fjgm5MvRHLhx+osE2xoekY70AhARk3a6hkN+3Io1jc00jtquGvxYlKlsFUhmUET0V5te6CcZI7lcv2Ym61mjHA==", + "requires": { + "follow-redirects": "1.5.10" + } + }, + "axios-cookiejar-support": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/axios-cookiejar-support/-/axios-cookiejar-support-1.0.0.tgz", + "integrity": "sha512-9pBlIU5jfrGZTnUQlt8symShviSTOSlOKGtryHx76lJPnKIXDqUT3JDAjJ1ywOQLyfiWrthIt4iJiVP2L2S4jA==", + "requires": { + "is-redirect": "^1.0.0", + "pify": "^5.0.0" + } + }, "balanced-match": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", @@ -6668,6 +6644,29 @@ "is-buffer": "~2.0.3" } }, + "follow-redirects": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.5.10.tgz", + "integrity": "sha512-0V5l4Cizzvqt5D44aTXbFZz+FtyXV1vrDN6qrelxtfYQKW0KO0W2T/hkE8xvGa/540LkZlkaUjO4ailYTFtHVQ==", + "requires": { + "debug": "=3.1.0" + }, + "dependencies": { + "debug": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz", + "integrity": "sha512-OX8XqP7/1a9cqkxYw2yXss15f26NKWBpDXQd0/uK/KPqdQhxbPa994hnzjcE2VqQpDslf55723cKPUOGSmMY3g==", + "requires": { + "ms": "2.0.0" + } + }, + "ms": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", + "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" + } + } + }, "forever-agent": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/forever-agent/-/forever-agent-0.6.1.tgz", @@ -6818,6 +6817,11 @@ "resolved": "https://registry.npmjs.org/is-fullwidth-code-point/-/is-fullwidth-code-point-2.0.0.tgz", "integrity": "sha1-o7MKXE8ZkYMWeqq5O+764937ZU8=" }, + "is-redirect": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/is-redirect/-/is-redirect-1.0.0.tgz", + "integrity": "sha1-HQPd7VO9jbDzDCbk+V02/HyH3CQ=" + }, "is-regex": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/is-regex/-/is-regex-1.0.4.tgz", @@ -7147,6 +7151,11 @@ "resolved": "https://registry.npmjs.org/performance-now/-/performance-now-2.1.0.tgz", "integrity": "sha1-Ywn04OX6kT7BxpMHrjZLSzd8nns=" }, + "pify": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/pify/-/pify-5.0.0.tgz", + "integrity": "sha512-eW/gHNMlxdSP6dmG6uJip6FXN0EQBwm2clYYd8Wul42Cwu/DK8HEftzsapcNdYe2MfLiIwZqsDk2RDEsTE79hA==" + }, "psl": { "version": "1.8.0", "resolved": "https://registry.npmjs.org/psl/-/psl-1.8.0.tgz", @@ -7330,13 +7339,6 @@ "psl": "^1.1.33", "punycode": "^2.1.1", "universalify": "^0.1.2" - }, - "dependencies": { - "punycode": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.1.1.tgz", - "integrity": "sha512-XRsRjdf+j5ml+y/6GKHPZbrF/8p2Yga0JPtdqTIY2Xe5ohJPD9saDJJLPvp9+NSBprVvevdXZybnj2cv8OEd0A==" - } } }, "tunnel-agent": { @@ -7352,6 +7354,11 @@ "resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-0.14.5.tgz", "integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=" }, + "universalify": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz", + "integrity": "sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg==" + }, "uri-js": { "version": "4.2.2", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.2.2.tgz", @@ -11067,11 +11074,6 @@ "crypto-random-string": "^1.0.0" } }, - "universalify": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz", - "integrity": "sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg==" - }, "unpipe": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz", diff --git a/package.json b/package.json index 24c65e97f88..2a764808302 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "scripts": { "start": "node server.js", "test": "env-cmd ./my.test.env mocha --exit tests/*.test.js", + "test-single": "env-cmd ./my.test.env mocha --exit tests/$TEST.test.js", "test-ci": "env-cmd ./ci.test.env nyc --reporter=lcov --reporter=text-summary mocha --exit tests/*.test.js", "env": "env", "postinstall": "webpack --mode production --config webpack.config.js && npm run-script update-buster", diff --git a/tests/api.treatments.test.js b/tests/api.treatments.test.js index 4ba3739f4c0..0c78267971d 100644 --- a/tests/api.treatments.test.js +++ b/tests/api.treatments.test.js @@ -34,10 +34,11 @@ describe('Treatment API', function ( ) { it('post single treatments', function (done) { self.ctx.treatments().remove({ }, function ( ) { + var now = (new Date()).toISOString(); request(self.app) .post('/api/treatments/') .set('api-secret', self.env.api_secret || '') - .send({eventType: 'Meal Bolus', carbs: '30', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'}) + .send({eventType: 'Meal Bolus', created_at: now, carbs: '30', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'}) .expect(200) .end(function (err) { if (err) { @@ -61,6 +62,24 @@ describe('Treatment API', function ( ) { }); }); + it('saving entry without created_at should fail', function (done) { + + self.ctx.treatments().remove({ }, function ( ) { + request(self.app) + .post('/api/treatments/') + .set('api-secret', self.env.api_secret || '') + .send({eventType: 'Meal Bolus', carbs: '30', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'}) + .expect(422) + .end(function (err) { + if (err) { + done(err); + } else { + done(); + } + }); + }); + }); + it('post single treatments in zoned time format', function (done) { @@ -101,12 +120,13 @@ describe('Treatment API', function ( ) { it('post a treatment array', function (done) { self.ctx.treatments().remove({ }, function ( ) { + var now = (new Date()).toISOString(); request(self.app) .post('/api/treatments/') .set('api-secret', self.env.api_secret || '') .send([ - {eventType: 'BG Check', glucose: 100, preBolus: '0', glucoseType: 'Finger', units: 'mg/dl', notes: ''} - , {eventType: 'Meal Bolus', carbs: '30', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'} + {eventType: 'BG Check', created_at: now, glucose: 100, preBolus: '0', glucoseType: 'Finger', units: 'mg/dl', notes: ''} + , {eventType: 'Meal Bolus', created_at: now, carbs: '30', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'} ]) .expect(200) .end(function (err) { @@ -168,10 +188,11 @@ describe('Treatment API', function ( ) { it('post a treatment, query, delete, verify gone', function (done) { // insert a treatment - needs to be unique from example data console.log('Inserting treatment entry'); + var now = (new Date()).toISOString(); request(self.app) .post('/api/treatments/') .set('api-secret', self.env.api_secret || '') - .send({eventType: 'Meal Bolus', carbs: '99', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'}) + .send({eventType: 'Meal Bolus', created_at: now, carbs: '99', insulin: '2.00', preBolus: '15', glucose: 100, glucoseType: 'Finger', units: 'mg/dl'}) .expect(200) .end(function (err) { if (err) { From f878ce115b6db45665004374035b3f3c265e9703 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 20 Sep 2020 18:36:19 +0300 Subject: [PATCH 2/6] Add support for CGM data and device statuses. Refactor dataloader to use the new model and reduce queries back down --- lib/api/entries/index.js | 21 ++++++---- lib/data/dataloader.js | 83 ++++++++++++++++--------------------- lib/data/ddata.js | 56 ++++++++++++++++++++++++- lib/server/cache.js | 85 +++++++++++++++++++++++++++----------- lib/server/devicestatus.js | 23 ++++++++++- lib/server/entries.js | 15 +++++++ lib/server/treatments.js | 10 +++-- 7 files changed, 205 insertions(+), 88 deletions(-) diff --git a/lib/api/entries/index.js b/lib/api/entries/index.js index 8cbf99ff30f..3f37811279b 100644 --- a/lib/api/entries/index.js +++ b/lib/api/entries/index.js @@ -315,8 +315,9 @@ function configure (app, wares, ctx, env) { function prepReqModel (req, model) { var type = model || 'sgv'; if (!req.query.find) { + req.query.find = { - type: type + type: { "$eq" : type } }; } else { req.query.find.type = type; @@ -447,6 +448,7 @@ function configure (app, wares, ctx, env) { Object.keys(query.find).forEach(function(key) { if (key == 'type') { typeQuery = query.find[key]["$eq"]; + if (!typeQuery) typeQuery = query.find.type; } else { inMemoryPossible = false; } @@ -456,14 +458,16 @@ function configure (app, wares, ctx, env) { let inMemoryCollection; if (typeQuery) { - if (typeQuery == 'sgv') inMemoryCollection = ctx.cache.sgvs; - if (typeQuery == 'mbg') inMemoryCollection = ctx.cache.mbgs; - if (typeQuery == 'cal') inMemoryCollection = ctx.cache.cals; - } else { - const merged = _.unionWith(ctx.cache.sgvs, ctx.cache.mbgs, ctx.cache.cals, function(a, b) { - return a._id == b._id; + inMemoryCollection= _.filter(ctx.cache.entries, function checkType (object) { + if (typeQuery == 'sgv') return 'sgv' in object; + if (typeQuery == 'mbg') return 'mbg' in object; + if (typeQuery == 'cal') return object.type === 'cal'; + return false; }); - inMemoryCollection = _.sortBy(merged, function(item) { + } else { + inMemoryCollection = ctx.cache.getData('entries'); + + inMemoryCollection = _.sortBy(inMemoryCollection, function(item) { return item.mills; }).reverse(); } @@ -475,7 +479,6 @@ function configure (app, wares, ctx, env) { } // If we get this far, query the database - // bias to entries, but allow expressing a preference var storage = req.storage || ctx.entries; // perform the query diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index cf3ae46d840..2ded134bf66 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -145,7 +145,7 @@ function init(env, ctx) { } // clear treatments to the base set, we're going to merge from multiple queries - ddata.treatments = ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : []; + ddata.treatments = []; // ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : []; ddata.dbstats = {}; @@ -170,11 +170,11 @@ function init(env, ctx) { function loadEntries(ddata, ctx, callback) { const withFrame = ddata.page && ddata.page.frame; - const loadPeriod = ddata.sgvs && ddata.sgvs.length > 0 && !withFrame ? constants.ONE_HOUR : constants.TWO_DAYS; - const latestEntry = ddata.sgvs && ddata.sgvs.length > 0 ? findLatestMills(ddata.sgvs) : ddata.lastUpdated; + const longLoad = Math.round(constants.TWO_DAYS); + const loadTime = ctx.cache.isEmpty('entries') || withFrame ? longLoad : constants.FIFTEEN_MINUTES; var dateRange = { - $gte: Math.min(ddata.lastUpdated - loadPeriod, latestEntry) + $gte: ddata.lastUpdated - loadTime }; if (withFrame) { dateRange['$lte'] = ddata.lastUpdated; @@ -195,14 +195,18 @@ function loadEntries(ddata, ctx, callback) { } if (!err && results) { + + const ageFilter = ddata.lastUpdated - constants.TWO_DAYS; + const r = ctx.ddata.processRawDataForRuntime(results); + ctx.cache.insertData('entries', r, ageFilter); + + const currentData = ctx.cache.getData('entries'); + const mbgs = []; const sgvs = []; const cals = []; - const shadowMbgs = []; - const shadowSgvs = []; - const shadowCals = []; - results.forEach(function(element) { + currentData.forEach(function(element) { if (element) { if (!element.mills) element.mills = element.date; if (element.mbg) { @@ -213,7 +217,6 @@ function loadEntries(ddata, ctx, callback) { device: element.device, type: 'mbg' }); - shadowMbgs.push(element); } else if (element.sgv) { sgvs.push({ _id: element._id, @@ -227,7 +230,6 @@ function loadEntries(ddata, ctx, callback) { rssi: element.rssi, type: 'sgv' }); - shadowSgvs.push(element); } else if (element.type === 'cal') { cals.push({ _id: element._id, @@ -237,20 +239,14 @@ function loadEntries(ddata, ctx, callback) { slope: element.slope, type: 'cal' }); - shadowCals.push(element); } } }); const ageLimit = ddata.lastUpdated - constants.TWO_DAYS; - - //stop using uniq for SGVs since we use buckets, also enables more detailed monitoring - ddata.sgvs = mergeProcessSort(ddata.sgvs, sgvs, ageLimit); - ddata.mbgs = mergeProcessSort(ddata.mbgs, uniqBasedOnMills(mbgs), ageLimit); - ddata.cals = mergeProcessSort(ddata.cals, uniqBasedOnMills(cals), ageLimit); - ctx.cache.sgvs = mergeProcessSort(ctx.cache.sgvs, shadowSgvs, ageLimit).reverse(); - ctx.cache.mbgs = mergeProcessSort(ctx.cache.mbgs, uniqBasedOnMills(shadowMbgs), ageLimit).reverse(); - ctx.cache.cals = mergeProcessSort(ctx.cache.cals, uniqBasedOnMills(shadowCals), ageLimit).reverse(); + ddata.sgvs = sgvs; + ddata.mbgs = mbgs; + ddata.cals = cals; } callback(); }); @@ -307,18 +303,12 @@ function loadTreatments(ddata, ctx, callback) { const longLoad = Math.round(constants.ONE_DAY * 2.5); //ONE_DAY * 2.5; // Load 2.5 days to cover last 48 hours including overlapping temp boluses or temp targets for first load - // Subsequently load at least 15 minutes of data, but if latest entry is older than 15 minutes, load until that entry + // Subsequently load at least 15 minutes of data - let loadTime = ddata.lastUpdated - longLoad; + const loadTime = ctx.cache.isEmpty('treatments') || withFrame ? longLoad : constants.FIFTEEN_MINUTES; - if (ctx.cache.treatments.length > 0) { - const loadPeriod = ddata.treatments && ddata.treatments.length > 0 && !withFrame ? constants.SIX_HOURS : longLoad; - const latestEntry = ddata.treatments && ddata.treatments.length > 0 ? findLatestMills(ddata.treatments) : ddata.lastUpdated; - loadTime = Math.min(ddata.lastUpdated - loadPeriod, latestEntry); - } - var dateRange = { - $gte: new Date(loadTime).toISOString() + $gte: new Date(ddata.lastUpdated - loadTime).toISOString() }; if (withFrame) { dateRange['$lte'] = new Date(ddata.lastUpdated).toISOString(); @@ -335,9 +325,11 @@ function loadTreatments(ddata, ctx, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { const ageFilter = ddata.lastUpdated - longLoad; + const r = ctx.ddata.processRawDataForRuntime(results); + // update cache - ctx.cache.treatments = mergeProcessSort(ctx.cache.treatments, results, ageFilter).reverse(); - ddata.treatments = mergeProcessSort(ddata.treatments, _.cloneDeep(ctx.cache.treatments), ageFilter); + ctx.cache.insertData('treatments', r, ageFilter); + ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, ctx.cache.getData('treatments')); } callback(); @@ -458,21 +450,12 @@ function loadFood(ddata, ctx, callback) { function loadDeviceStatus(ddata, env, ctx, callback) { - let loadPeriod = constants.ONE_DAY; - if(env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2) loadPeriod = constants.TWO_DAYS; - - const withFrame = ddata.page && ddata.page.frame ? true : false; - - if (!withFrame && ddata.devicestatus && ddata.devicestatus.length > 0) { - loadPeriod = constants.FIFTEEN_MINUTES; - } - - let latestEntry = ddata.devicestatus && ddata.devicestatus.length > 0 ? findLatestMills(ddata.devicestatus) : ddata.lastUpdated; - if (!latestEntry) latestEntry = ddata.lastUpdated; // TODO find out why report test fails withtout this - const loadTime = Math.min(ddata.lastUpdated - loadPeriod, latestEntry); + const withFrame = ddata.page && ddata.page.frame; + const longLoad = env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2 ? constants.TWO_DAYS : constants.ONE_DAY; + const loadTime = ctx.cache.isEmpty('devicestatus') || withFrame ? longLoad : constants.FIFTEEN_MINUTES; var dateRange = { - $gte: new Date( loadTime ).toISOString() + $gte: new Date( ddata.lastUpdated - loadTime ).toISOString() }; if (withFrame) { @@ -490,10 +473,15 @@ function loadDeviceStatus(ddata, env, ctx, callback) { ctx.devicestatus.list(opts, function(err, results) { if (!err && results) { - const ageFilter = ddata.lastUpdated - constants.TWO_DAYS; - ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter); +// ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter); - const r = _.map(results, function eachStatus(result) { + const ageFilter = ddata.lastUpdated - longLoad; + const r = ctx.ddata.processRawDataForRuntime(results); + ctx.cache.insertData('devicestatus', r, ageFilter); + + const res = ctx.cache.getData('devicestatus'); + + const res2 = _.map(res, function eachStatus(result) { //result.mills = new Date(result.created_at).getTime(); if ('uploaderBattery' in result) { result.uploader = { @@ -503,7 +491,8 @@ function loadDeviceStatus(ddata, env, ctx, callback) { } return result; }); - ddata.devicestatus = mergeProcessSort(ddata.devicestatus, r, ageFilter); + + ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2, ageFilter); } else { ddata.devicestatus = []; } diff --git a/lib/data/ddata.js b/lib/data/ddata.js index ed0830b270a..65120782fc0 100644 --- a/lib/data/ddata.js +++ b/lib/data/ddata.js @@ -21,6 +21,59 @@ function init () { , lastUpdated: 0 }; + /** + * Convert Mongo ids to strings and ensure all objects have the mills property for + * significantly faster processing than constant date parsing, plus simplified + * logic + */ + ddata.processRawDataForRuntime = (data) => { + + let obj = _.cloneDeep(data); + + Object.keys(obj).forEach(key => { + if (typeof obj[key] === 'object' && obj[key]) { + if (obj[key].hasOwnProperty('_id')) { + obj[key]._id = obj[key]._id.toString(); + } + if (obj[key].hasOwnProperty('created_at') && !obj[key].hasOwnProperty('mills')) { + obj[key].mills = new Date(obj[key].created_at).getTime(); + } + if (obj[key].hasOwnProperty('sysTime') && !obj[key].hasOwnProperty('mills')) { + obj[key].mills = new Date(obj[key].sysTime).getTime(); + } + } + }); + + return obj; + }; + + /** + * Merge two arrays based on _id string, preferring new objects when a collision is found + * @param {array} oldData + * @param {array} newData + */ + ddata.idMergePreferNew = (oldData, newData) => { + + if (!newData && oldData) return oldData; + if (!oldData && newData) return newData; + + const merged = _.cloneDeep(newData); + + for (let i = 0; i < oldData.length; i++) { + const oldElement = oldData[i]; + let found = false; + for (let j = 0; j < newData.length; j++) { + if (oldElement._id == newData[j]._id) { + found = true; + break; + } + } + if (!found) merged.push(oldElement); // Merge old object in, if it wasn't found in the new data + } + + return merged; + }; + ddata.clone = function clone () { return _.clone(ddata, function(value) { //special handling of mongo ObjectID's @@ -34,7 +87,7 @@ function init () { }); }; - ddata.dataWithRecentStatuses = function dataWithRecentStatuses() { + ddata.dataWithRecentStatuses = function dataWithRecentStatuses () { var results = {}; results.devicestatus = ddata.recentDeviceStatus(Date.now()); results.sgvs = ddata.sgvs; @@ -55,7 +108,6 @@ function init () { results.dbstats = ddata.dbstats; return results; - } ddata.recentDeviceStatus = function recentDeviceStatus (time) { diff --git a/lib/server/cache.js b/lib/server/cache.js index cf6ec7b8f6b..208abaca1c9 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -1,67 +1,102 @@ 'use strict'; -var _ = require('lodash'); +/* This is a simple cache intended to reduce the amount of load + * Nightscout puts on MongoDB. The cache is based on identifying + * elements based on the MongoDB _id field and implements simple + * semantics for adding data to the cache in the runtime, intended + * to be accessed by the persistence layer as data is inserted, updated + * or deleted, as well as the periodic dataloader, which polls Mongo + * for new inserts. + * + * Longer term, the cache is planned to allow skipping the Mongo polls + * altogether. + */ + +const _ = require('lodash'); +const constants = require('../constants'); function cache (env, ctx) { const data = { - treatments : [], - devicestatus : [], - sgvs : [], - cals : [], - mbgs : [] + treatments: [] + , devicestatus: [] + , entries: [] }; const dataArray = [ - data.treatments, - data.devicestatus, - data.sgvs, - data.cals, - data.mbgs + data.treatments + , data.devicestatus + , data.entries ]; - function match(o1, o2) { - return o1._id == o2._id; + + function mergeCacheArrays (oldData, newData, ageLimit) { + + var filtered = _.filter(newData, function hasId (object) { + const hasId = !_.isEmpty(object._id); + const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit); + return isFresh && hasId; + }); + + const merged = ctx.ddata.idMergePreferNew(oldData, filtered); + + return _.sortBy(merged, function(item) { + return item.mills; + }); + + } + + data.isEmpty = (datatype) => { + return data[datatype].length == 0; } - function dataChanged(operation) { - console.log('Cache data update event', data); + data.getData = (datatype) => { + return _.cloneDeep(data[datatype]); + } + + data.insertData = (datatype, newData, retentionPeriod) => { + console.log('inserting to cache', data[datatype].length); + data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriod); + console.log('post inserting to cache', data[datatype].length); + } + + function dataChanged (operation) { + console.log('Cache data operation requested', operation); if (operation.op == 'remove') { console.log('Cache data delete event'); // if multiple items were deleted, flush entire cache if (!operation.changes) { console.log('Multiple items delete from cache, flushing all') - data[operation.op.type] = []; + data.treatments = []; + data.devicestatus = []; + data.entries = []; } else { removeFromArray(data[operation.type], operation.changes); } } - if (operation.op == 'update') { + if (operation.op == 'update') { console.log('Cache data update event'); - } + data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes); + } } ctx.bus.on('data-update', dataChanged); - function removeFromArray(array, id) { + function removeFromArray (array, id) { for (let i = 0; i < array.length; i++) { const o = array[i]; if (o._id == id) { console.log('Deleting object from cache', id); - array.splice(i,1); + array.splice(i, 1); break; } } } - cache.updateObject = (o) => { - - } - return data; - + } module.exports = cache; diff --git a/lib/server/devicestatus.js b/lib/server/devicestatus.js index d35c6be87cb..f3515367428 100644 --- a/lib/server/devicestatus.js +++ b/lib/server/devicestatus.js @@ -18,6 +18,13 @@ function storage (collection, ctx) { fn(err.message, null); return; } + + ctx.bus.emit('data-update', { + type: 'devicestatus', + op: 'update', + changes: ctx.ddata.processRawDataForRuntime([doc]) + }); + fn(null, doc.ops); ctx.bus.emit('data-received'); }); @@ -68,7 +75,21 @@ function storage (collection, ctx) { } function remove (opts, fn) { - return api( ).remove(query_for(opts), fn); + + function removed(err, stat) { + + ctx.bus.emit('data-update', { + type: 'devicestatus', + op: 'remove', + count: stat.result.n, + changes: opts.find._id + }); + + fn(err, stat); + } + + return api( ).remove( + query_for(opts), removed); } function api() { diff --git a/lib/server/entries.js b/lib/server/entries.js index 02dd115bb95..f6b61024e7d 100644 --- a/lib/server/entries.js +++ b/lib/server/entries.js @@ -47,6 +47,14 @@ function storage(env, ctx) { function remove (opts, fn) { api( ).remove(query_for(opts), function (err, stat) { + + ctx.bus.emit('data-update', { + type: 'entries', + op: 'remove', + count: stat.result.n, + changes: opts.find._id + }); + //TODO: this is triggering a read from Mongo, we can do better ctx.bus.emit('data-received'); fn(err, stat); @@ -101,6 +109,13 @@ function storage(env, ctx) { var query = (doc.sysTime && doc.type) ? {sysTime: doc.sysTime, type: doc.type} : doc; api( ).update(query, doc, {upsert: true}, function (err) { firstErr = firstErr || err; + + ctx.bus.emit('data-update', { + type: 'entries', + op: 'update', + changes: ctx.ddata.processRawDataForRuntime([doc]) + }); + if (++totalCreated === numDocs) { //TODO: this is triggering a read from Mongo, we can do better ctx.bus.emit('data-received'); diff --git a/lib/server/treatments.js b/lib/server/treatments.js index c4ae785767d..013201e028c 100644 --- a/lib/server/treatments.js +++ b/lib/server/treatments.js @@ -78,7 +78,7 @@ function storage (env, ctx) { ctx.bus.emit('data-update', { type: 'treatments', op: 'update', - changes: treatments + changes: ctx.ddata.processRawDataForRuntime(treatments) }); fn(err, treatments); @@ -88,7 +88,7 @@ function storage (env, ctx) { ctx.bus.emit('data-update', { type: 'treatments', op: 'update', - changes: [obj] + changes: ctx.ddata.processRawDataForRuntime([obj]) }); fn(err, [obj]); @@ -141,10 +141,12 @@ function storage (env, ctx) { if (!err) { // console.log('Treatment updated', created); + ctx.ddata.processRawDataForRuntime(obj); + ctx.bus.emit('data-update', { type: 'treatments', op: 'update', - changes: [obj] + changes: ctx.ddata.processRawDataForRuntime([obj]) }); } @@ -153,7 +155,7 @@ function storage (env, ctx) { fn(err, created); } - api().save(obj, fn); + api().save(obj, saved); ctx.bus.emit('data-received'); } From a75c5a300a56160f7ef45c5fbc9053db4f865a4c Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 20 Sep 2020 19:14:33 +0300 Subject: [PATCH 3/6] Fix data order for REST API --- lib/data/dataloader.js | 2 +- lib/server/cache.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index 2ded134bf66..0f1b3c7bd86 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -200,7 +200,7 @@ function loadEntries(ddata, ctx, callback) { const r = ctx.ddata.processRawDataForRuntime(results); ctx.cache.insertData('entries', r, ageFilter); - const currentData = ctx.cache.getData('entries'); + const currentData = ctx.cache.getData('entries').reverse(); const mbgs = []; const sgvs = []; diff --git a/lib/server/cache.js b/lib/server/cache.js index 208abaca1c9..b80c37ddaab 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -41,7 +41,7 @@ function cache (env, ctx) { const merged = ctx.ddata.idMergePreferNew(oldData, filtered); return _.sortBy(merged, function(item) { - return item.mills; + return -item.mills; }); } From 1059232f160d6e242f79a4139d7406e88632bb58 Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Mon, 21 Sep 2020 20:46:53 +0300 Subject: [PATCH 4/6] * Add cache update events to websocket API * Remove the validation for created_at in REST API ;( --- lib/api/treatments/index.js | 8 ++ lib/server/cache.js | 2 + lib/server/websocket.js | 201 ++++++++++++++++++++++------------- tests/api.treatments.test.js | 3 +- 4 files changed, 142 insertions(+), 72 deletions(-) diff --git a/lib/api/treatments/index.js b/lib/api/treatments/index.js index df3cc6d7503..6e669903801 100644 --- a/lib/api/treatments/index.js +++ b/lib/api/treatments/index.js @@ -114,6 +114,12 @@ function configure (app, wares, ctx, env) { for (let i = 0; i < treatments.length; i++) { const t = treatments[i]; + + if (!t.created_at) { + t.created_at = new Date().toISOString(); + } + + /* if (!t.created_at) { console.log('Trying to create treatment without created_at field', t); res.sendJSONStatus(res, constants.HTTP_VALIDATION_ERROR, 'Treatments must contain created_at'); @@ -125,6 +131,8 @@ function configure (app, wares, ctx, env) { res.sendJSONStatus(res, constants.HTTP_VALIDATION_ERROR, 'Treatments created_at must be an ISO-8601 date'); return; } + */ + } ctx.treatments.create(treatments, function(err, created) { diff --git a/lib/server/cache.js b/lib/server/cache.js index b80c37ddaab..84329864bbb 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -63,6 +63,8 @@ function cache (env, ctx) { function dataChanged (operation) { console.log('Cache data operation requested', operation); + if (!data[operation.type]) return; + if (operation.op == 'remove') { console.log('Cache data delete event'); // if multiple items were deleted, flush entire cache diff --git a/lib/server/websocket.js b/lib/server/websocket.js index 89b67c97a08..7a685ccde05 100644 --- a/lib/server/websocket.js +++ b/lib/server/websocket.js @@ -7,7 +7,7 @@ var ObjectID = require('mongodb').ObjectID; function init (env, ctx, server) { - function websocket ( ) { + function websocket () { return websocket; } @@ -25,12 +25,12 @@ function init (env, ctx, server) { // TODO: this would be better to have somehow integrated/improved var supportedCollections = { - 'treatments' : env.treatments_collection, - 'entries': env.entries_collection, - 'devicestatus': env.devicestatus_collection, - 'profile': env.profile_collection, - 'food': env.food_collection, - 'activity': env.activity_collection + 'treatments': env.treatments_collection + , 'entries': env.entries_collection + , 'devicestatus': env.devicestatus_collection + , 'profile': env.profile_collection + , 'food': env.food_collection + , 'activity': env.activity_collection }; // This is little ugly copy but I was unable to pass testa after making module from status and share with /api/v1/status @@ -38,7 +38,7 @@ function init (env, ctx, server) { var versionNum = 0; var verParse = /(\d+)\.(\d+)\.(\d+)*/.exec(env.version); if (verParse) { - versionNum = 10000 * parseInt(verParse[1]) + 100 * parseInt(verParse[2]) + 1 * parseInt(verParse[3]) ; + versionNum = 10000 * parseInt(verParse[1]) + 100 * parseInt(verParse[2]) + 1 * parseInt(verParse[3]); } var apiEnabled = env.api_secret ? true : false; @@ -63,14 +63,15 @@ function init (env, ctx, server) { return info; } - function start ( ) { + function start () { io = require('socket.io')({ - 'transports': ['xhr-polling'], 'log level': 0 + 'transports': ['xhr-polling'] + , 'log level': 0 }).listen(server, { //these only effect the socket.io.js file that is sent to the client, but better than nothing - 'browser client minification': true, - 'browser client etag': true, - 'browser client gzip': false + 'browser client minification': true + , 'browser client etag': true + , 'browser client gzip': false }); ctx.bus.on('teardown', function serverTeardown () { @@ -85,7 +86,7 @@ function init (env, ctx, server) { ctx.authorization.resolve({ api_secret: message.secret, token: message.token }, function resolved (err, result) { if (err) { - return callback( err, { + return callback(err, { read: false , write: false , write_treatment: false @@ -113,7 +114,7 @@ function init (env, ctx, server) { } } - function listeners ( ) { + function listeners () { io.sockets.on('connection', function onConnection (socket) { var socketAuthorization = null; var clientType = null; @@ -124,16 +125,15 @@ function init (env, ctx, server) { console.log(LOG_WS + 'Connection from client ID: ', socket.client.id, ' IP: ', remoteIP); io.emit('clients', ++watchers); - socket.on('ack', function onAck(level, group, silenceTime) { + socket.on('ack', function onAck (level, group, silenceTime) { ctx.notifications.ack(level, group, silenceTime, true); }); - socket.on('disconnect', function onDisconnect ( ) { + socket.on('disconnect', function onDisconnect () { io.emit('clients', --watchers); - console.log(LOG_WS + 'Disconnected client ID: ',socket.client.id); + console.log(LOG_WS + 'Disconnected client ID: ', socket.client.id); }); - function checkConditions (action, data) { var collection = supportedCollections[data.collection]; if (!collection) { @@ -168,10 +168,10 @@ function init (env, ctx, server) { socket.on('loadRetro', function loadRetro (opts, callback) { if (callback) { - callback( { result: 'success' } ); + callback({ result: 'success' }); } //TODO: use opts to only send delta for retro data - socket.emit('retroUpdate', {devicestatus: lastData.devicestatus}); + socket.emit('retroUpdate', { devicestatus: lastData.devicestatus }); console.info('sent retroUpdate', opts); }); @@ -185,30 +185,46 @@ function init (env, ctx, server) { // } // } socket.on('dbUpdate', function dbUpdate (data, callback) { - console.log(LOG_WS + 'dbUpdate client ID: ', socket.client.id, ' data: ', data); - var collection = supportedCollections[data.collection]; + console.log(LOG_WS + 'dbUpdate client ID: ', socket.client.id, ' data: ', data); + var collection = supportedCollections[data.collection]; var check = checkConditions('dbUpdate', data); if (check) { - if (callback) { - callback( check ); + if (callback) { + callback(check); } return; } - var id ; + var id; try { - id = new ObjectID(data._id); - } catch (err){ - console.error(err); + id = new ObjectID(data._id); + } catch (err) { + console.error(err); id = new ObjectID(); } - ctx.store.collection(collection).update( - { '_id': id }, - { $set: data.data } + + ctx.store.collection(collection).update({ '_id': id } + , { $set: data.data } + , function(err, results) { + + if (!err) { + ctx.store.collection(collection).findOne({ '_id': id } + , function(err, results) { + console.log('Got results', results); + if (!err) { + ctx.bus.emit('data-update', { + type: data.collection + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime([results]) + }); + } + }); + } + } ); if (callback) { - callback( { result: 'success' } ); + callback({ result: 'success' }); } ctx.bus.emit('data-received'); }); @@ -223,13 +239,13 @@ function init (env, ctx, server) { // } // } socket.on('dbUpdateUnset', function dbUpdateUnset (data, callback) { - console.log(LOG_WS + 'dbUpdateUnset client ID: ', socket.client.id, ' data: ', data); - var collection = supportedCollections[data.collection]; + console.log(LOG_WS + 'dbUpdateUnset client ID: ', socket.client.id, ' data: ', data); + var collection = supportedCollections[data.collection]; var check = checkConditions('dbUpdate', data); if (check) { - if (callback) { - callback( check ); + if (callback) { + callback(check); } return; } @@ -238,10 +254,25 @@ function init (env, ctx, server) { ctx.store.collection(collection).update( { '_id': objId }, { $unset: data.data } - ); + , function(err, results) { + + if (!err) { + ctx.store.collection(collection).findOne({ '_id': objId } + , function(err, results) { + console.log('Got results', results); + if (!err) { + ctx.bus.emit('data-update', { + type: data.collection + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime([results]) + }); + } + }); + } + }); if (callback) { - callback( { result: 'success' } ); + callback({ result: 'success' }); } ctx.bus.emit('data-received'); }); @@ -255,14 +286,14 @@ function init (env, ctx, server) { // } // } socket.on('dbAdd', function dbAdd (data, callback) { - console.log(LOG_WS + 'dbAdd client ID: ', socket.client.id, ' data: ', data); + console.log(LOG_WS + 'dbAdd client ID: ', socket.client.id, ' data: ', data); var collection = supportedCollections[data.collection]; var maxtimediff = times.mins(1).msecs; var check = checkConditions('dbAdd', data); if (check) { - if (callback) { - callback( check ); + if (callback) { + callback(check); } return; } @@ -278,7 +309,7 @@ function init (env, ctx, server) { if (data.collection === 'treatments') { var query; if (data.data.NSCLIENT_ID) { - query = { NSCLIENT_ID: data.data.NSCLIENT_ID }; + query = { NSCLIENT_ID: data.data.NSCLIENT_ID }; } else { query = { created_at: data.data.created_at @@ -286,19 +317,19 @@ function init (env, ctx, server) { }; } - // try to find exact match + // try to find exact match ctx.store.collection(collection).find(query).toArray(function findResult (err, array) { if (err || array.length > 0) { - console.log(LOG_DEDUP + 'Exact match'); - if (callback) { - callback([array[0]]); - } - return; + console.log(LOG_DEDUP + 'Exact match'); + if (callback) { + callback([array[0]]); + } + return; } - var selected = false; - var query_similiar = { - created_at: {$gte: new Date(new Date(data.data.created_at).getTime() - maxtimediff).toISOString(), $lte: new Date(new Date(data.data.created_at).getTime() + maxtimediff).toISOString()} + var selected = false; + var query_similiar = { + created_at: { $gte: new Date(new Date(data.data.created_at).getTime() - maxtimediff).toISOString(), $lte: new Date(new Date(data.data.created_at).getTime() + maxtimediff).toISOString() } }; if (data.data.insulin) { query_similiar.insulin = data.data.insulin; @@ -312,7 +343,7 @@ function init (env, ctx, server) { query_similiar.percent = data.data.percent; selected = true; } - if (data.data.absolute) { + if (data.data.absolute) { query_similiar.absolute = data.data.absolute; selected = true; } @@ -320,7 +351,7 @@ function init (env, ctx, server) { query_similiar.duration = data.data.duration; selected = true; } - if (data.data.NSCLIENT_ID) { + if (data.data.NSCLIENT_ID) { query_similiar.NSCLIENT_ID = data.data.NSCLIENT_ID; selected = true; } @@ -335,10 +366,7 @@ function init (env, ctx, server) { console.log(LOG_DEDUP + 'Found similiar', array[0]); array[0].created_at = data.data.created_at; var objId = new ObjectID(array[0]._id); - ctx.store.collection(collection).update( - { '_id': objId }, - { $set: {created_at: data.data.created_at} } - ); + ctx.store.collection(collection).update({ '_id': objId }, { $set: { created_at: data.data.created_at } }); if (callback) { callback([array[0]]); } @@ -352,6 +380,13 @@ function init (env, ctx, server) { console.log('treatments data insertion error: ', err.message); return; } + + ctx.bus.emit('data-update', { + type: data.collection + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime(doc.ops) + }); + if (callback) { callback(doc.ops); } @@ -359,7 +394,7 @@ function init (env, ctx, server) { }); }); }); - // devicestatus deduping + // devicestatus deduping } else if (data.collection === 'devicestatus') { var queryDev; if (data.data.NSCLIENT_ID) { @@ -385,8 +420,15 @@ function init (env, ctx, server) { console.log('devicestatus insertion error: ', err.message); return; } + + ctx.bus.emit('data-update', { + type: 'devicestatus' + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime(doc.ops) + }); + if (callback) { - callback(doc.ops); + callback(doc.ops); } ctx.bus.emit('data-received'); }); @@ -396,6 +438,13 @@ function init (env, ctx, server) { console.log(data.collection + ' insertion error: ', err.message); return; } + + ctx.bus.emit('data-update', { + type: data.collection + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime(doc.ops) + }); + if (callback) { callback(doc.ops); } @@ -409,24 +458,34 @@ function init (env, ctx, server) { // _id: 'some mongo record id' // } socket.on('dbRemove', function dbRemove (data, callback) { - console.log(LOG_WS + 'dbRemove client ID: ', socket.client.id, ' data: ', data); - var collection = supportedCollections[data.collection]; + console.log(LOG_WS + 'dbRemove client ID: ', socket.client.id, ' data: ', data); + var collection = supportedCollections[data.collection]; var check = checkConditions('dbUpdate', data); if (check) { - if (callback) { - callback( check ); + if (callback) { + callback(check); } return; } var objId = new ObjectID(data._id); - ctx.store.collection(collection).remove( - { '_id': objId } - ); + ctx.store.collection(collection).remove({ '_id': objId } + , function(err, stat) { + + if (!err) { + ctx.bus.emit('data-update', { + type: data.collection + , op: 'remove' + , count: stat.result.n + , changes: data._id + }); + + } + }); if (callback) { - callback( { result: 'success' } ); + callback({ result: 'success' }); } ctx.bus.emit('data-received'); }); @@ -479,7 +538,7 @@ function init (env, ctx, server) { }); } - websocket.update = function update ( ) { + websocket.update = function update () { // console.log(LOG_WS + 'running websocket.update'); if (lastData.sgvs) { var delta = calcData(lastData, ctx.ddata); @@ -511,8 +570,8 @@ function init (env, ctx, server) { } }; - start( ); - listeners( ); + start(); + listeners(); if (ctx.storageSocket) { ctx.storageSocket.init(io); diff --git a/tests/api.treatments.test.js b/tests/api.treatments.test.js index 0c78267971d..2b1d3ee877d 100644 --- a/tests/api.treatments.test.js +++ b/tests/api.treatments.test.js @@ -62,6 +62,7 @@ describe('Treatment API', function ( ) { }); }); + /* it('saving entry without created_at should fail', function (done) { self.ctx.treatments().remove({ }, function ( ) { @@ -79,7 +80,7 @@ describe('Treatment API', function ( ) { }); }); }); - +*/ it('post single treatments in zoned time format', function (done) { From 54008a71221d7c0bcbf9efe6b2367ce48ccb267c Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Tue, 22 Sep 2020 11:14:13 +0300 Subject: [PATCH 5/6] Remove excess logging --- lib/api/entries/index.js | 2 +- lib/api/treatments/index.js | 7 +------ lib/server/cache.js | 12 +++++------- lib/server/treatments.js | 10 ++++------ 4 files changed, 11 insertions(+), 20 deletions(-) diff --git a/lib/api/entries/index.js b/lib/api/entries/index.js index 3f37811279b..5075df6eded 100644 --- a/lib/api/entries/index.js +++ b/lib/api/entries/index.js @@ -317,7 +317,7 @@ function configure (app, wares, ctx, env) { if (!req.query.find) { req.query.find = { - type: { "$eq" : type } + type: type }; } else { req.query.find.type = type; diff --git a/lib/api/treatments/index.js b/lib/api/treatments/index.js index 6e669903801..0e1d8a0ede8 100644 --- a/lib/api/treatments/index.js +++ b/lib/api/treatments/index.js @@ -161,8 +161,6 @@ function configure (app, wares, ctx, env) { query.count = 10 } - console.log('Delete records with query: ', query); - // remove using the query ctx.treatments.remove(query, function(err, stat) { if (err) { @@ -170,8 +168,6 @@ function configure (app, wares, ctx, env) { return next(err); } - console.log('treatments records deleted', query); - // yield some information about success of operation res.json(stat); @@ -204,8 +200,7 @@ function configure (app, wares, ctx, env) { ctx.treatments.save(data, function(err, created) { if (err) { res.sendJSONStatus(res, constants.HTTP_INTERNAL_ERROR, 'Mongo Error', err); - console.log('Error saving treatment'); - console.log(err); + console.log('Error saving treatment', err); } else { res.json(created); console.log('Treatment saved', data); diff --git a/lib/server/cache.js b/lib/server/cache.js index 84329864bbb..fca93afafde 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -55,21 +55,19 @@ function cache (env, ctx) { } data.insertData = (datatype, newData, retentionPeriod) => { - console.log('inserting to cache', data[datatype].length); data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriod); - console.log('post inserting to cache', data[datatype].length); } function dataChanged (operation) { - console.log('Cache data operation requested', operation); + //console.log('Cache data operation requested', operation); if (!data[operation.type]) return; if (operation.op == 'remove') { - console.log('Cache data delete event'); + //console.log('Cache data delete event'); // if multiple items were deleted, flush entire cache if (!operation.changes) { - console.log('Multiple items delete from cache, flushing all') + //console.log('Multiple items delete from cache, flushing all') data.treatments = []; data.devicestatus = []; data.entries = []; @@ -79,7 +77,7 @@ function cache (env, ctx) { } if (operation.op == 'update') { - console.log('Cache data update event'); + //console.log('Cache data update event'); data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes); } } @@ -90,7 +88,7 @@ function cache (env, ctx) { for (let i = 0; i < array.length; i++) { const o = array[i]; if (o._id == id) { - console.log('Deleting object from cache', id); + //console.log('Deleting object from cache', id); array.splice(i, 1); break; } diff --git a/lib/server/treatments.js b/lib/server/treatments.js index 013201e028c..dad13b5b5d6 100644 --- a/lib/server/treatments.js +++ b/lib/server/treatments.js @@ -53,9 +53,9 @@ function storage (env, ctx) { if (!err) { if (updateResults.result.upserted) { obj._id = updateResults.result.upserted[0]._id - console.log('PERSISTENCE: treatment upserted', updateResults.result.upserted[0]); + //console.log('PERSISTENCE: treatment upserted', updateResults.result.upserted[0]); } - console.log('Update result', updateResults.result); + //console.log('Update result', updateResults.result); } // TODO document this feature @@ -119,7 +119,7 @@ function storage (env, ctx) { function remove (opts, fn) { return api( ).remove(query_for(opts), function (err, stat) { //TODO: this is triggering a read from Mongo, we can do better - console.log('Treatment removed', opts); // , stat); + //console.log('Treatment removed', opts); // , stat); ctx.bus.emit('data-update', { type: 'treatments', @@ -139,7 +139,7 @@ function storage (env, ctx) { function saved (err, created) { if (!err) { -// console.log('Treatment updated', created); + // console.log('Treatment updated', created); ctx.ddata.processRawDataForRuntime(obj); @@ -268,8 +268,6 @@ function prepareData(obj) { delete obj.units; } - console.log('Preparing treatment for insertion, obj', obj, 'results', results); - return results; } From 3f1081e3aa814c84eb5c632e9b477aa4c0a3705b Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Tue, 22 Sep 2020 12:44:31 +0300 Subject: [PATCH 6/6] Bump version to .4 --- npm-shrinkwrap.json | 2 +- package.json | 2 +- swagger.json | 2 +- swagger.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 106d5676bac..891da5af29c 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -1,6 +1,6 @@ { "name": "nightscout", - "version": "14.0.3", + "version": "14.0.4", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 2a764808302..6a24379ef16 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nightscout", - "version": "14.0.3", + "version": "14.0.4", "description": "Nightscout acts as a web-based CGM (Continuous Glucose Montinor) to allow multiple caregivers to remotely view a patients glucose data in realtime.", "license": "AGPL-3.0", "author": "Nightscout Team", diff --git a/swagger.json b/swagger.json index 4f4e1f2813a..ffed8fd699c 100755 --- a/swagger.json +++ b/swagger.json @@ -8,7 +8,7 @@ "info": { "title": "Nightscout API", "description": "Own your DData with the Nightscout API", - "version": "14.0.3", + "version": "14.0.4", "license": { "name": "AGPL 3", "url": "https://www.gnu.org/licenses/agpl.txt" diff --git a/swagger.yaml b/swagger.yaml index 0dff0a7c477..ce94fe876db 100755 --- a/swagger.yaml +++ b/swagger.yaml @@ -4,7 +4,7 @@ servers: info: title: Nightscout API description: Own your DData with the Nightscout API - version: 14.0.3 + version: 14.0.4 license: name: AGPL 3 url: 'https://www.gnu.org/licenses/agpl.txt'