From 4f195ddbf6615314a3bd51d115d30ab87a00bbcf Mon Sep 17 00:00:00 2001 From: Ben West Date: Sat, 2 Aug 2014 00:26:52 -0700 Subject: [PATCH 01/24] introduce mqtt Introduce basic mqtt. --- env.js | 1 + lib/mqtt.js | 38 ++++++++++++++++++++++++++++++++++++++ package.json | 3 ++- server.js | 5 +++++ 4 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 lib/mqtt.js diff --git a/env.js b/env.js index 1ec8ab1d334..8a1e3f2d82f 100644 --- a/env.js +++ b/env.js @@ -21,6 +21,7 @@ function config ( ) { env.version = software.version; env.name = software.name; + env.MQTT_MONITOR = process.env.MQTT_MONITOR || null; env.DISPLAY_UNITS = process.env.DISPLAY_UNITS || 'mg/dl'; env.PORT = process.env.PORT || 1337; env.mongo = process.env.MONGO_CONNECTION || process.env.CUSTOMCONNSTR_mongo; diff --git a/lib/mqtt.js b/lib/mqtt.js new file mode 100644 index 00000000000..5b129b705f6 --- /dev/null +++ b/lib/mqtt.js @@ -0,0 +1,38 @@ +'use strict'; + +var es = require('event-stream'); +var mqtt = require('mqtt'); + +function process (client) { + var stream = es.through( + function _write (data) { + this.push(data); + } + ); + return stream; +} + +function every (storage) { + function iter (item, next) { + storage.create(item, next); + } + return es.map(iter); +} + +function configure (env) { + var uri = env['MQTT_MONITOR']; + var client = mqtt.connect(uri); + client.subscribe('sgvs'); + client.subscribe('published'); + client.subscribe('entries/sgv', function ( ) { + console.log('granted', arguments); + }); + + client.on('message', function (topic, msg) { console.log('topic', topic); + console.log('msg', msg); + }); + client.entries = process(client); + client.every = every; + return client; +} +module.exports = configure; diff --git a/package.json b/package.json index c0f768dee75..9df7a621a90 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,8 @@ "express-extension-to-accept": "0.0.2", "mongodb": "^1.4.7", "sgvdata": "0.0.2", - "socket.io": "^0.9.17" + "socket.io": "^0.9.17", + "mqtt": "~0.3.11" }, "devDependencies": { "supertest": "~0.13.0", diff --git a/server.js b/server.js index 8d8e460fece..a2dbfadf95c 100644 --- a/server.js +++ b/server.js @@ -72,6 +72,11 @@ store(function ready ( ) { var server = app.listen(PORT); console.log('listening', PORT); + if (env.MQTT_MONITOR) { + var mqtt = require('./lib/mqtt')(env); + mqtt.entries.pipe(mqtt.every(entries)); + } + /////////////////////////////////////////////////// // setup socket io for data and message transmission /////////////////////////////////////////////////// From cc0cd36934faf5984d6b4eb3faa1dff575d95616 Mon Sep 17 00:00:00 2001 From: Ben West Date: Sat, 2 Aug 2014 00:32:51 -0700 Subject: [PATCH 02/24] lint records first --- server.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server.js b/server.js index a2dbfadf95c..93c47b3e0ec 100644 --- a/server.js +++ b/server.js @@ -74,7 +74,8 @@ store(function ready ( ) { if (env.MQTT_MONITOR) { var mqtt = require('./lib/mqtt')(env); - mqtt.entries.pipe(mqtt.every(entries)); + var es = require('event-stream'); + es.pipeline(mqtt.entries, entries.map( ), mqtt.every(entries)); } /////////////////////////////////////////////////// From b9e8a5faf11f21fc752dc201df4c048dbf8c1cfc Mon Sep 17 00:00:00 2001 From: Ben West Date: Wed, 1 Oct 2014 13:50:31 -0700 Subject: [PATCH 03/24] make mqtt experiments suitable for hacking Uses experimental support in sgvdata for protobuf. --- lib/mqtt.js | 16 ++++++++++++++++ package.json | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index 5b129b705f6..eddd4d6a8e5 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -1,6 +1,7 @@ 'use strict'; var es = require('event-stream'); +var decoders = require('sgvdata/lib/protobuf'); var mqtt = require('mqtt'); function process (client) { @@ -18,16 +19,31 @@ function every (storage) { } return es.map(iter); } +function downloader ( ) { + var opts = { + model: decoders.models.CookieMonsterG4Download + , json: function (o) { return o; } + , payload: function (o) { return o; } + }; + return decoders(opts); +} function configure (env) { var uri = env['MQTT_MONITOR']; var client = mqtt.connect(uri); + var downloads = downloader( ); client.subscribe('sgvs'); client.subscribe('published'); + client.subscribe('/downloads/protobuf'); client.subscribe('entries/sgv', function ( ) { console.log('granted', arguments); }); + client.on('/downloads/protobuf', function (topic, msg) { console.log('topic', topic); + + console.log('DOWNLOAD msg', msg, downloads.parse(msg)); + }); + client.on('message', function (topic, msg) { console.log('topic', topic); console.log('msg', msg); }); diff --git a/package.json b/package.json index b835ffaf7ac..e60fa52612b 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ "mongodb": "^1.4.7", "moment": "2.8.1", "pushover-notifications": "0.2.0", - "sgvdata": "0.0.2", + "sgvdata": "git://github.com/bewest/sgvdata.git#wip/protobuf", "socket.io": "^0.9.17", "mqtt": "~0.3.11", "git-rev": "git://github.com/bewest/git-rev.git" From c8be1deb9d49c7efa3c20a1418b1a58df42921b6 Mon Sep 17 00:00:00 2001 From: Ben West Date: Sat, 22 Nov 2014 16:40:48 -0800 Subject: [PATCH 04/24] get mqtt basically working, very rough --- lib/entries.js | 5 ++++ lib/mqtt.js | 79 +++++++++++++++++++++++++++++++++++++++++++------- server.js | 2 +- 3 files changed, 75 insertions(+), 11 deletions(-) diff --git a/lib/entries.js b/lib/entries.js index 1746bcefa5e..19f023a9278 100644 --- a/lib/entries.js +++ b/lib/entries.js @@ -140,6 +140,11 @@ function entries (name, storage) { }); } + function writeStream (opts) { + function map (item, next) { + } + } + // closure to represent the API function api ( ) { // obtain handle usable for querying the collection associated diff --git a/lib/mqtt.js b/lib/mqtt.js index eddd4d6a8e5..e08f5069ca4 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -1,7 +1,9 @@ 'use strict'; var es = require('event-stream'); +var Long = require('long'); var decoders = require('sgvdata/lib/protobuf'); +var direction = require('sgvdata/lib/utils').direction; var mqtt = require('mqtt'); function process (client) { @@ -28,24 +30,81 @@ function downloader ( ) { return decoders(opts); } -function configure (env) { +function toSGV (proto) { + var ts = long_time(proto.timestamp); + var obj = { + device: 'dexcom' + , date: ts.getTime( ) + , dateString: ts.toISOString( ) + , sgv: proto.sgv + , direction: direction(proto.direction) + , type: 'sgv' + }; + return obj; +} + +function createProtoStream (packet) { + var stream = es.readArray(packet.sgv); + function map (item, next) { + var r = toSGV(item); + console.log("ITEM", item, "TO SGV", r); + next(null, r); + } + return stream.pipe(es.map(map)); +} +function long_time (p) { + var ts = parseInt(new Long(p.low, p.high, p.unsigned).toString( )); + return new Date(ts); +} + +function configure (env, core) { var uri = env['MQTT_MONITOR']; - var client = mqtt.connect(uri); + var opts = {encoding: 'binary'}; + var client = mqtt.connect(uri, opts); var downloads = downloader( ); client.subscribe('sgvs'); client.subscribe('published'); - client.subscribe('/downloads/protobuf'); - client.subscribe('entries/sgv', function ( ) { + client.subscribe('/downloads/protobuf', granted); + client.subscribe('/uploader', granted); + client.subscribe('/entries/sgv', granted); + function granted ( ) { console.log('granted', arguments); - }); - - client.on('/downloads/protobuf', function (topic, msg) { console.log('topic', topic); + } - console.log('DOWNLOAD msg', msg, downloads.parse(msg)); - }); client.on('message', function (topic, msg) { console.log('topic', topic); - console.log('msg', msg); + console.log(topic, 'on message', 'msg', msg.length); + switch (topic) { + case '/uploader': + console.log({type: topic, msg: msg.toString( )}); + break; + case '/downloads/protobuf': + var b = new Buffer(msg, 'binary'); + console.log("BINARY", b.length, b.toString('hex')); + try { + var packet = downloads.parse(b); + if (!packet.type) { + packet.type = topic; + } + } catch (e) { + console.log("DID NOT PARSE", e); + break; + } + console.log('DOWNLOAD msg', msg.length, packet); + console.log('download SGV', packet.sgv[0]); + console.log('download_timestamp', packet.download_timestamp, long_time(packet.download_timestamp)); + console.log("WRITE TO MONGO"); + createProtoStream(packet).pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING TO MONGO", err); + })); + + // core.write(packet); + break; + default: + console.log(topic, 'on message', 'msg', msg); + // core.write(msg); + break; + } }); client.entries = process(client); client.every = every; diff --git a/server.js b/server.js index 2bc3e4a3617..374cde658fa 100644 --- a/server.js +++ b/server.js @@ -83,7 +83,7 @@ store(function ready ( ) { console.log('listening', PORT); if (env.MQTT_MONITOR) { - var mqtt = require('./lib/mqtt')(env); + var mqtt = require('./lib/mqtt')(env, entries); var es = require('event-stream'); es.pipeline(mqtt.entries, entries.map( ), mqtt.every(entries)); } From 756e9e81d440d1bb241f6bdd7eebad10267a8332 Mon Sep 17 00:00:00 2001 From: Ben West Date: Sat, 22 Nov 2014 17:55:36 -0800 Subject: [PATCH 05/24] forgot long --- package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index e60fa52612b..0ecff09ee10 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,8 @@ "sgvdata": "git://github.com/bewest/sgvdata.git#wip/protobuf", "socket.io": "^0.9.17", "mqtt": "~0.3.11", - "git-rev": "git://github.com/bewest/git-rev.git" + "git-rev": "git://github.com/bewest/git-rev.git", + "long": "~2.2.3" }, "devDependencies": { "supertest": "~0.13.0", From 9ad7bf1b2882095fcaeff80b91239fdf4b813d84 Mon Sep 17 00:00:00 2001 From: Ben West Date: Tue, 25 Nov 2014 12:26:09 -0800 Subject: [PATCH 06/24] test coverage --- Makefile | 3 +++ package.json | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/Makefile b/Makefile index bb605d08d82..521d5672779 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ coveralls: ${TESTS} | ./coverall.sh coverhtml: + MONGO_CONNECTION=${MONGO_CONNECTION} \ + CUSTOMCONNSTR_mongo_collection=${CUSTOMCONNSTR_mongo_collection} \ + CUSTOMCONNSTR_mongo_settings_collection=${CUSTOMCONNSTR_mongo_settings_collection} \ ./node_modules/.bin/mocha ${BLANKET} -R html-cov ${TESTS} > tests/coverage.html test: diff --git a/package.json b/package.json index 0ecff09ee10..2832f403c80 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,16 @@ "test": "make test", "postinstall": "node node_modules/bower/bin/bower install" }, + "config": { + "blanket": { + "pattern": [ + "tests", "lib", "server", "app", "static/js" + ], + "data-cover-never": [ + "node_modules" + ] + } + }, "engines": { "node": ">=0.10 <0.12" }, From 33e1a36726cd60ed5c3b8aee77c9534a2458732f Mon Sep 17 00:00:00 2001 From: Kevin Lee Date: Fri, 26 Dec 2014 03:57:10 -0600 Subject: [PATCH 07/24] Quick hack to get things working --- lib/mqtt.js | 20 +++++++++++++------- package.json | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index e08f5069ca4..fe56ed379f8 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -5,6 +5,7 @@ var Long = require('long'); var decoders = require('sgvdata/lib/protobuf'); var direction = require('sgvdata/lib/utils').direction; var mqtt = require('mqtt'); +var moment = require('moment'); function process (client) { var stream = es.through( @@ -31,13 +32,15 @@ function downloader ( ) { } function toSGV (proto) { - var ts = long_time(proto.timestamp); + var ts = moment(proto.download_timestamp); + console.log("TIMESTAMP", moment(proto.download_timestamp)); var obj = { device: 'dexcom' - , date: ts.getTime( ) - , dateString: ts.toISOString( ) - , sgv: proto.sgv - , direction: direction(proto.direction) + , date: ts.unix() * 1000 + , dateString: ts.format() + , sgv: proto.sgv_mgdl + , direction: direction(proto.trend) + ,noise: proto.noise , type: 'sgv' }; return obj; @@ -59,12 +62,15 @@ function long_time (p) { function configure (env, core) { var uri = env['MQTT_MONITOR']; - var opts = {encoding: 'binary'}; + var opts = { + encoding: 'binary', + clean: false + }; var client = mqtt.connect(uri, opts); var downloads = downloader( ); client.subscribe('sgvs'); client.subscribe('published'); - client.subscribe('/downloads/protobuf', granted); + client.subscribe('/downloads/protobuf',{qos: 2}, granted); client.subscribe('/uploader', granted); client.subscribe('/entries/sgv', granted); function granted ( ) { diff --git a/package.json b/package.json index 2832f403c80..c040d8b4f74 100644 --- a/package.json +++ b/package.json @@ -51,7 +51,7 @@ "mongodb": "^1.4.7", "moment": "2.8.1", "pushover-notifications": "0.2.0", - "sgvdata": "git://github.com/bewest/sgvdata.git#wip/protobuf", + "sgvdata": "git://github.com/ktind/sgvdata.git#wip/protobuf", "socket.io": "^0.9.17", "mqtt": "~0.3.11", "git-rev": "git://github.com/bewest/git-rev.git", From 1f1bc4d6f95907025cc81944c9d1dcc8e35bdab3 Mon Sep 17 00:00:00 2001 From: Ben West Date: Sat, 27 Dec 2014 11:14:41 -0800 Subject: [PATCH 08/24] blarg, attempt to debug mqtt --- lib/mqtt.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index fe56ed379f8..efcba12479c 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -31,16 +31,21 @@ function downloader ( ) { return decoders(opts); } +function ReceiverTime (ts) { + var base = Date.parse('2009-01-01T00:00:00-0800'); + return new Date(base + (ts * 1000)); +} + function toSGV (proto) { var ts = moment(proto.download_timestamp); - console.log("TIMESTAMP", moment(proto.download_timestamp)); + console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.timestamp_sec)); var obj = { device: 'dexcom' , date: ts.unix() * 1000 , dateString: ts.format() , sgv: proto.sgv_mgdl , direction: direction(proto.trend) - ,noise: proto.noise + , noise: proto.noise , type: 'sgv' }; return obj; @@ -64,7 +69,8 @@ function configure (env, core) { var uri = env['MQTT_MONITOR']; var opts = { encoding: 'binary', - clean: false + clean: false, + clientId: 'master' }; var client = mqtt.connect(uri, opts); var downloads = downloader( ); @@ -98,7 +104,7 @@ function configure (env, core) { } console.log('DOWNLOAD msg', msg.length, packet); console.log('download SGV', packet.sgv[0]); - console.log('download_timestamp', packet.download_timestamp, long_time(packet.download_timestamp)); + console.log('download_timestamp', packet.download_timestamp, Date.parse(packet.download_timestamp)); console.log("WRITE TO MONGO"); createProtoStream(packet).pipe(core.persist(function empty(err, result) { console.log("DONE WRITING TO MONGO", err); From f70a031357f3d10b27ee91ae855018710ea0d4be Mon Sep 17 00:00:00 2001 From: Ben West Date: Tue, 30 Dec 2014 11:56:32 -0800 Subject: [PATCH 09/24] store each record according to own timestamp --- lib/mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index efcba12479c..25e1159ca51 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -37,7 +37,7 @@ function ReceiverTime (ts) { } function toSGV (proto) { - var ts = moment(proto.download_timestamp); + var ts = moment(ReceiverTime(proto.timestamp_sec)); console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.timestamp_sec)); var obj = { device: 'dexcom' From bf2f6231da6635050395f80154256e53f5327557 Mon Sep 17 00:00:00 2001 From: Ben West Date: Tue, 30 Dec 2014 19:06:40 -0800 Subject: [PATCH 10/24] tweak times and protobuf model with @ktind --- lib/mqtt.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index 25e1159ca51..38d26525fd9 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -24,7 +24,7 @@ function every (storage) { } function downloader ( ) { var opts = { - model: decoders.models.CookieMonsterG4Download + model: decoders.models.CookieMonsterDownload , json: function (o) { return o; } , payload: function (o) { return o; } }; @@ -37,8 +37,8 @@ function ReceiverTime (ts) { } function toSGV (proto) { - var ts = moment(ReceiverTime(proto.timestamp_sec)); - console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.timestamp_sec)); + var ts = moment(ReceiverTime(proto.disp_timestamp_sec)); + console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.disp_timestamp_sec)); var obj = { device: 'dexcom' , date: ts.unix() * 1000 From 70877f7f67295e6fabfddc9af93ac9986e34d335 Mon Sep 17 00:00:00 2001 From: Kevin Lee Date: Tue, 30 Dec 2014 21:08:34 -0600 Subject: [PATCH 11/24] Adding experimental time calculation algorithm --- lib/mqtt.js | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index 25e1159ca51..1d088ceffc7 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -24,7 +24,7 @@ function every (storage) { } function downloader ( ) { var opts = { - model: decoders.models.CookieMonsterG4Download + model: decoders.models.CookieMonsterDownload , json: function (o) { return o; } , payload: function (o) { return o; } }; @@ -36,13 +36,21 @@ function ReceiverTime (ts) { return new Date(base + (ts * 1000)); } -function toSGV (proto) { - var ts = moment(ReceiverTime(proto.timestamp_sec)); - console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.timestamp_sec)); +function toSGV (proto, receiver_time, download_time) { + var ts = moment(download_time); + console.log("Receiver time: ", receiver_time); + console.log("Record time: ", proto.sys_timestamp_sec); + console.log("Download time: ", ts.unix()); + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = ts.subtract(record_offset, 'second'); + + console.log("errr", " Offset: ",record_offset, " Record time: ", record_time.format()); + + //console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.disp_timestamp_sec)); var obj = { device: 'dexcom' - , date: ts.unix() * 1000 - , dateString: ts.format() + , date: record_time.unix() * 1000 + , dateString: record_time.format() , sgv: proto.sgv_mgdl , direction: direction(proto.trend) , noise: proto.noise @@ -53,8 +61,10 @@ function toSGV (proto) { function createProtoStream (packet) { var stream = es.readArray(packet.sgv); + var receiver_time = packet.receiver_system_time_sec; + var download_time = packet.download_timestamp; function map (item, next) { - var r = toSGV(item); + var r = toSGV(item, receiver_time, download_time); console.log("ITEM", item, "TO SGV", r); next(null, r); } From 650bcd1aae32fd3611979614c0a8dbd90d94b270 Mon Sep 17 00:00:00 2001 From: Kevin Lee Date: Wed, 31 Dec 2014 03:02:46 -0600 Subject: [PATCH 12/24] First pass at adding MGB, Sensor, Calibration, and Device Status records via MQTT New method to create device status with a backdated timestamp --- lib/devicestatus.js | 69 +++++---- lib/mqtt.js | 336 +++++++++++++++++++++++++++++--------------- server.js | 2 +- 3 files changed, 265 insertions(+), 142 deletions(-) diff --git a/lib/devicestatus.js b/lib/devicestatus.js index fb21d1c2490..9b9cf881364 100644 --- a/lib/devicestatus.js +++ b/lib/devicestatus.js @@ -1,34 +1,43 @@ 'use strict'; -function configure (collection, storage) { - - function create (obj, fn) { - obj.created_at = (new Date( )).toISOString( ); - api( ).insert(obj, function (err, doc) { - fn(null, doc); - }); - } - - function last(fn) { - return api( ).find({ }).sort({created_at: -1}).limit(1).toArray(function(err, entries) { - if (entries && entries.length > 0) - fn(err, entries[0]); - else - fn(err, null); - }); - } - - function list (fn) { - return api( ).find({ }).sort({created_at: -1}).toArray(fn); - } - - function api ( ) { - return storage.pool.db.collection(collection); - } - - api.list = list; - api.create = create; - api.last = last; - return api; +function configure(collection, storage) { + + function create(obj, fn) { + if (! obj.hasOwnProperty("created_at")){ + obj.created_at = (new Date()).toISOString(); + } + api().insert(obj, function (err, doc) { + fn(null, doc); + }); + } + + function create_date_included(obj, fn) { + api().insert(obj, function (err, doc) { + fn(null, doc); + }); + + } + + function last(fn) { + return api().find({}).sort({created_at: -1}).limit(1).toArray(function (err, entries) { + if (entries && entries.length > 0) + fn(err, entries[0]); + else + fn(err, null); + }); + } + + function list(fn) { + return api().find({}).sort({created_at: -1}).toArray(fn); + } + + function api() { + return storage.pool.db.collection(collection); + } + + api.list = list; + api.create = create; + api.last = last; + return api; } module.exports = configure; diff --git a/lib/mqtt.js b/lib/mqtt.js index 1d088ceffc7..6da3253415b 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -1,135 +1,249 @@ 'use strict'; var es = require('event-stream'); -var Long = require('long'); var decoders = require('sgvdata/lib/protobuf'); var direction = require('sgvdata/lib/utils').direction; var mqtt = require('mqtt'); var moment = require('moment'); -function process (client) { - var stream = es.through( - function _write (data) { - this.push(data); +function process(client) { + var stream = es.through( + function _write(data) { + this.push(data); + } + ); + return stream; +} + +function every(storage) { + function iter(item, next) { + storage.create(item, next); } - ); - return stream; + + return es.map(iter); } -function every (storage) { - function iter (item, next) { - storage.create(item, next); - } - return es.map(iter); +function downloader() { + var opts = { + model: decoders.models.CookieMonsterDownload + , json: function (o) { + return o; + } + , payload: function (o) { + return o; + } + }; + return decoders(opts); } -function downloader ( ) { - var opts = { - model: decoders.models.CookieMonsterDownload - , json: function (o) { return o; } - , payload: function (o) { return o; } - }; - return decoders(opts); + +function ReceiverTime(ts) { + var base = Date.parse('2009-01-01T00:00:00-0800'); + return new Date(base + (ts * 1000)); } -function ReceiverTime (ts) { - var base = Date.parse('2009-01-01T00:00:00-0800'); - return new Date(base + (ts * 1000)); +function toSGV(proto, receiver_time, download_time) { + console.log("Receiver time: ", receiver_time); + console.log("Record time: ", proto.sys_timestamp_sec); + console.log("Download time: ", download_time.unix()); + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = download_time.subtract(record_offset, 'second'); + + console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); + + var obj = { + device: 'dexcom' + , date: record_time.unix() * 1000 + , dateString: record_time.format() + , sgv: proto.sgv_mgdl + , direction: direction(proto.trend) + , noise: proto.noise + , type: 'sgv' + }; + return obj; } -function toSGV (proto, receiver_time, download_time) { - var ts = moment(download_time); - console.log("Receiver time: ", receiver_time); - console.log("Record time: ", proto.sys_timestamp_sec); - console.log("Download time: ", ts.unix()); - var record_offset = receiver_time - proto.sys_timestamp_sec; - var record_time = ts.subtract(record_offset, 'second'); - - console.log("errr", " Offset: ",record_offset, " Record time: ", record_time.format()); - - //console.log("errr", proto, "TIMESTAMP", ReceiverTime(proto.disp_timestamp_sec)); - var obj = { - device: 'dexcom' - , date: record_time.unix() * 1000 - , dateString: record_time.format() - , sgv: proto.sgv_mgdl - , direction: direction(proto.trend) - , noise: proto.noise - , type: 'sgv' - }; - return obj; +function createProtoStream(packet, download_time) { + var stream = es.readArray(packet.sgv); + var receiver_time = packet.receiver_system_time_sec; + + function map(item, next) { + var r = toSGV(item, receiver_time, download_time); + console.log("ITEM", item, "TO SGV", r); + next(null, r); + } + + return stream.pipe(es.map(map)); } -function createProtoStream (packet) { - var stream = es.readArray(packet.sgv); - var receiver_time = packet.receiver_system_time_sec; - var download_time = packet.download_timestamp; - function map (item, next) { - var r = toSGV(item, receiver_time, download_time); - console.log("ITEM", item, "TO SGV", r); - next(null, r); - } - return stream.pipe(es.map(map)); +function toCal(proto, receiver_time, download_time) { + console.log("Receiver time: ", receiver_time); + console.log("Record time: ", proto.sys_timestamp_sec); + console.log("Download time: ", download_time.unix()); + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = download_time.subtract(record_offset, 'second'); + + console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); + + var obj = { + device: 'dexcom' + , date: record_time.unix() * 1000 + , dateString: record_time.format() + , slope: proto.slope + , intercept: proto.intercept + , scale: proto.scale + , type: 'cal' + }; + return obj; } -function long_time (p) { - var ts = parseInt(new Long(p.low, p.high, p.unsigned).toString( )); - return new Date(ts); + +function createCalProtoStream(packet, download_time) { + var stream = es.readArray(packet.cal); + var receiver_time = packet.receiver_system_time_sec; + + function map(item, next) { + var r = toCal(item, receiver_time, download_time); + console.log("ITEM", item, "TO CAL", r); + next(null, r); + } + + return stream.pipe(es.map(map)); } -function configure (env, core) { - var uri = env['MQTT_MONITOR']; - var opts = { - encoding: 'binary', - clean: false, - clientId: 'master' - }; - var client = mqtt.connect(uri, opts); - var downloads = downloader( ); - client.subscribe('sgvs'); - client.subscribe('published'); - client.subscribe('/downloads/protobuf',{qos: 2}, granted); - client.subscribe('/uploader', granted); - client.subscribe('/entries/sgv', granted); - function granted ( ) { - console.log('granted', arguments); - } - - - client.on('message', function (topic, msg) { console.log('topic', topic); - console.log(topic, 'on message', 'msg', msg.length); - switch (topic) { - case '/uploader': - console.log({type: topic, msg: msg.toString( )}); - break; - case '/downloads/protobuf': - var b = new Buffer(msg, 'binary'); - console.log("BINARY", b.length, b.toString('hex')); - try { - var packet = downloads.parse(b); - if (!packet.type) { - packet.type = topic; - } - } catch (e) { - console.log("DID NOT PARSE", e); - break; - } - console.log('DOWNLOAD msg', msg.length, packet); - console.log('download SGV', packet.sgv[0]); - console.log('download_timestamp', packet.download_timestamp, Date.parse(packet.download_timestamp)); - console.log("WRITE TO MONGO"); - createProtoStream(packet).pipe(core.persist(function empty(err, result) { - console.log("DONE WRITING TO MONGO", err); - })); - - // core.write(packet); - break; - default: - console.log(topic, 'on message', 'msg', msg); - // core.write(msg); - break; +function toSensor(proto, receiver_time, download_time) { + console.log("Receiver time: ", receiver_time); + console.log("Record time: ", proto.sys_timestamp_sec); + console.log("Download time: ", download_time.unix()); + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = download_time.subtract(record_offset, 'second'); + + console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); + + var obj = { + device: 'dexcom' + , date: record_time.unix() * 1000 + , dateString: record_time.format() + , filtered: proto.filtered + , unfiltered: proto.unfiltered + , rssi: proto.rssi + , type: 'sensor' + }; + return obj; +} + +function createSensorProtoStream(packet, download_time) { + var stream = es.readArray(packet.sensor); + var receiver_time = packet.receiver_system_time_sec; + + function map(item, next) { + var r = toSensor(item, receiver_time, download_time); + console.log("ITEM", item, "TO Sensor", r); + next(null, r); + } + + return stream.pipe(es.map(map)); +} + +function toMeter(proto, receiver_time, download_time) { + console.log("Receiver time: ", receiver_time); + console.log("Record time: ", proto.sys_timestamp_sec); + console.log("Download time: ", download_time.unix()); + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = download_time.subtract(record_offset, 'second'); + + console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); + + var obj = { + device: 'dexcom' + , date: record_time.unix() * 1000 + , dateString: record_time.format() + , mbg: proto.mbg + , type: 'mbg' + }; + return obj; +} + +function createMeterProtoStream(packet, download_time) { + var stream = es.readArray(packet.meter); + var receiver_time = packet.receiver_system_time_sec; + + function map(item, next) { + var r = toMeter(item, receiver_time, download_time); + console.log("ITEM", item, "TO Meter", r); + next(null, r); + } + + return stream.pipe(es.map(map)); +} + +function configure(env, core, devicestatus) { + var uri = env['MQTT_MONITOR']; + var opts = { + encoding: 'binary', + clean: false, + clientId: 'master' + }; + var client = mqtt.connect(uri, opts); + var downloads = downloader(); + client.subscribe('sgvs'); + client.subscribe('published'); + client.subscribe('/downloads/protobuf', {qos: 2}, granted); + client.subscribe('/uploader', granted); + client.subscribe('/entries/sgv', granted); + function granted() { + console.log('granted', arguments); } - }); - client.entries = process(client); - client.every = every; - return client; + + + client.on('message', function (topic, msg) { + console.log('topic', topic); + console.log(topic, 'on message', 'msg', msg.length); + switch (topic) { + case '/uploader': + console.log({type: topic, msg: msg.toString()}); + break; + case '/downloads/protobuf': + var b = new Buffer(msg, 'binary'); + console.log("BINARY", b.length, b.toString('hex')); + try { + var packet = downloads.parse(b); + if (!packet.type) { + packet.type = topic; + } + } catch (e) { + console.log("DID NOT PARSE", e); + break; + } + console.log('DOWNLOAD msg', msg.length, packet); + console.log('download SGV', packet.sgv[0]); + console.log('download_timestamp', packet.download_timestamp, Date.parse(packet.download_timestamp)); + console.log("WRITE TO MONGO"); + var download_timestamp = moment(packet.download_timestamp); + createProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING SGV TO MONGO", result); + })); + createCalProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Cal TO MONGO", result); + })); + createMeterProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Meter TO MONGO", result); + })); + createSensorProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Sensor TO MONGO", err); + })); + devicestatus.create( { uploaderBattery: packet.uploader_battery, created_at: download_timestamp.toISOString() } , function empty(err, result) { + console.log("DONE WRITING TO MONGO devicestatus ", result, err); + }); + + // core.write(packet); + break; + default: + console.log(topic, 'on message', 'msg', msg); + // core.write(msg); + break; + } + }); + client.entries = process(client); + client.every = every; + return client; } module.exports = configure; diff --git a/server.js b/server.js index 374cde658fa..9b605dfaab1 100644 --- a/server.js +++ b/server.js @@ -83,7 +83,7 @@ store(function ready ( ) { console.log('listening', PORT); if (env.MQTT_MONITOR) { - var mqtt = require('./lib/mqtt')(env, entries); + var mqtt = require('./lib/mqtt')(env, entries, devicestatus); var es = require('event-stream'); es.pipeline(mqtt.entries, entries.map( ), mqtt.every(entries)); } From b37069eedbdf2a88e02ce9a1e9de05499e24416a Mon Sep 17 00:00:00 2001 From: Ben West Date: Wed, 31 Dec 2014 13:13:06 -0500 Subject: [PATCH 13/24] allow multiple instances of mqtt to multiplex This is intended to allow multiple listeners on mqtt to each receive data from MQTT, so long as they are writing to different databases. --- env.js | 3 +++ lib/mqtt.js | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/env.js b/env.js index 6d7c1830a1c..c94d0d64e02 100644 --- a/env.js +++ b/env.js @@ -34,6 +34,9 @@ function config ( ) { env.PORT = readENV('PORT', 1337); env.mongo = readENV('MONGO_CONNECTION') || readENV('MONGO') || readENV('MONGOLAB_URI'); env.mongo_collection = readENV('MONGO_COLLECTION', 'entries'); + if (env.MQTT_MONITOR) { + env.mqtt_client_id = [env.mongo.split('/').pop( ), env.mongo_collection].join('.'); + } env.settings_collection = readENV('MONGO_SETTINGS_COLLECTION', 'settings'); env.treatments_collection = readENV('MONGO_TREATMENTS_COLLECTION', 'treatments'); env.devicestatus_collection = readENV('MONGO_DEVICESTATUS_COLLECTION', 'devicestatus'); diff --git a/lib/mqtt.js b/lib/mqtt.js index 1d088ceffc7..bda036d73f2 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -80,7 +80,7 @@ function configure (env, core) { var opts = { encoding: 'binary', clean: false, - clientId: 'master' + clientId: env.mqtt_client_id }; var client = mqtt.connect(uri, opts); var downloads = downloader( ); From 76b9a155819588607341690018e67e7fbed5b115 Mon Sep 17 00:00:00 2001 From: Kevin Lee Date: Wed, 31 Dec 2014 18:52:36 -0600 Subject: [PATCH 14/24] Store the download object in the entries for debugging purposes Converting Sensor filtered and unfiltered data to a proper number --- lib/mqtt.js | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index c411d659979..544711386b6 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -123,8 +123,8 @@ function toSensor(proto, receiver_time, download_time) { device: 'dexcom' , date: record_time.unix() * 1000 , dateString: record_time.format() - , filtered: proto.filtered - , unfiltered: proto.unfiltered + , filtered: new Long(proto.filtered).toInt() + , unfiltered: new Long(proto.unfiltered).toInt() , rssi: proto.rssi , type: 'sensor' }; @@ -231,10 +231,19 @@ function configure(env, core, devicestatus) { createSensorProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { console.log("DONE WRITING Sensor TO MONGO", err); })); - devicestatus.create( { uploaderBattery: packet.uploader_battery, created_at: download_timestamp.toISOString() } , function empty(err, result) { + packet.type = "download"; + devicestatus.create({ + uploaderBattery: packet.uploader_battery, + created_at: download_timestamp.toISOString() + }, function empty(err, result) { console.log("DONE WRITING TO MONGO devicestatus ", result, err); }); + core.create([ packet ], function empty(err, res) { + console.log("Download written to mongo: ", packet) + }); + + // core.write(packet); break; default: From 11a7d9d687665df3e0e0749139917dee497118f0 Mon Sep 17 00:00:00 2001 From: Kevin Lee Date: Wed, 31 Dec 2014 23:04:29 -0600 Subject: [PATCH 15/24] Updating model and removing dead code --- lib/mqtt.js | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index 544711386b6..81378b94847 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -26,7 +26,7 @@ function every(storage) { function downloader() { var opts = { - model: decoders.models.CookieMonsterDownload + model: decoders.models.G4Download , json: function (o) { return o; } @@ -37,11 +37,6 @@ function downloader() { return decoders(opts); } -function ReceiverTime(ts) { - var base = Date.parse('2009-01-01T00:00:00-0800'); - return new Date(base + (ts * 1000)); -} - function toSGV(proto, receiver_time, download_time) { console.log("Receiver time: ", receiver_time); console.log("Record time: ", proto.sys_timestamp_sec); From 6b4041a4f825f3b561bddb05ac954640d3b91d9b Mon Sep 17 00:00:00 2001 From: Ben West Date: Fri, 9 Jan 2015 14:38:25 -0800 Subject: [PATCH 16/24] fix mbg undefined --- lib/mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index bbee24d08e5..be6378b3b5b 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -152,7 +152,7 @@ function toMeter(proto, receiver_time, download_time) { device: 'dexcom' , date: record_time.unix() * 1000 , dateString: record_time.format() - , mbg: proto.mbg + , mbg: proto.mbg || proto.meter_bg_mgdl , type: 'mbg' }; return obj; From ddc9bf2cb4e86b828764b7eb459b889f85d9faa7 Mon Sep 17 00:00:00 2001 From: Ben West Date: Fri, 9 Jan 2015 15:32:54 -0800 Subject: [PATCH 17/24] only need one stream factory Pass around the sync function for less code. --- lib/mqtt.js | 191 +++++++++++++++++----------------------------------- 1 file changed, 60 insertions(+), 131 deletions(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index be6378b3b5b..b7c81d88a57 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -37,137 +37,62 @@ function downloader() { return decoders(opts); } -function toSGV(proto, receiver_time, download_time) { - console.log("Receiver time: ", receiver_time); - console.log("Record time: ", proto.sys_timestamp_sec); - console.log("Download time: ", download_time.unix()); - var record_offset = receiver_time - proto.sys_timestamp_sec; - var record_time = download_time.clone().subtract(record_offset, 'second'); - - console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); - - var obj = { - device: 'dexcom' - , date: record_time.unix() * 1000 - , dateString: record_time.format() - , sgv: proto.sgv_mgdl - , direction: direction(proto.trend) - , noise: proto.noise - , type: 'sgv' - }; - return obj; +function toSGV (proto, vars) { + vars.sgv = proto.sgv_mgdl; + vars.direction = direction(proto.trend); + vars.noise = proto.noise; + vars.type = 'sgv'; + return vars; } -function createProtoStream(packet, download_time) { - var stream = es.readArray(packet.sgv); - var receiver_time = packet.receiver_system_time_sec; - - function map(item, next) { - var r = toSGV(item, receiver_time, download_time); - console.log("ITEM", item, "TO SGV", r); - next(null, r); - } - - return stream.pipe(es.map(map)); +function toCal (proto, vars) { + vars.slope = proto.slope; + vars.intercept = proto.intercept; + vars.scale = proto.scale; + vars.type = 'cal'; + return vars; } -function toCal(proto, receiver_time, download_time) { - console.log("Receiver time: ", receiver_time); - console.log("Record time: ", proto.sys_timestamp_sec); - console.log("Download time: ", download_time.unix()); - var record_offset = receiver_time - proto.sys_timestamp_sec; - var record_time = download_time.clone().subtract(record_offset, 'second'); - - console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); - - var obj = { - device: 'dexcom' - , date: record_time.unix() * 1000 - , dateString: record_time.format() - , slope: proto.slope - , intercept: proto.intercept - , scale: proto.scale - , type: 'cal' - }; - return obj; +function toSensor (proto, vars) { + vars.filtered = new Long(proto.filtered).toInt(); + vars.unfiltered = new Long(proto.unfiltered).toInt(); + vars.rssi = proto.rssi; + vars.type = 'sensor'; + return vars; } -function createCalProtoStream(packet, download_time) { - var stream = es.readArray(packet.cal); - var receiver_time = packet.receiver_system_time_sec; - - function map(item, next) { - var r = toCal(item, receiver_time, download_time); - console.log("ITEM", item, "TO CAL", r); - next(null, r); - } - - return stream.pipe(es.map(map)); +function toMeter (proto, result) { + result.type = 'mbg'; + result.mbg = proto.mbg || proto.meter_bg_mgdl; + return result; } -function toSensor(proto, receiver_time, download_time) { - console.log("Receiver time: ", receiver_time); - console.log("Record time: ", proto.sys_timestamp_sec); - console.log("Download time: ", download_time.unix()); - var record_offset = receiver_time - proto.sys_timestamp_sec; - var record_time = download_time.clone().subtract(record_offset, 'second'); - - console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); - - var obj = { - device: 'dexcom' - , date: record_time.unix() * 1000 - , dateString: record_time.format() - , filtered: new Long(proto.filtered).toInt() - , unfiltered: new Long(proto.unfiltered).toInt() - , rssi: proto.rssi - , type: 'sensor' - }; - return obj; -} - -function createSensorProtoStream(packet, download_time) { - var stream = es.readArray(packet.sensor); - var receiver_time = packet.receiver_system_time_sec; - - function map(item, next) { - var r = toSensor(item, receiver_time, download_time); - console.log("ITEM", item, "TO Sensor", r); - next(null, r); - } - - return stream.pipe(es.map(map)); +function toTimestamp (proto, receiver_time, download_time) { + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = download_time.clone( ).subtract(record_offset, 'second'); + var obj = { + device: 'dexcom' + , date: record_time.unix() * 1000 + , dateString: record_time.format( ) + }; + return obj; } -function toMeter(proto, receiver_time, download_time) { - console.log("Receiver time: ", receiver_time); - console.log("Record time: ", proto.sys_timestamp_sec); - console.log("Download time: ", download_time.unix()); - var record_offset = receiver_time - proto.sys_timestamp_sec; - var record_time = download_time.clone().subtract(record_offset, 'second'); - - console.log("errr", " Offset: ", record_offset, " Record time: ", record_time.format()); - - var obj = { - device: 'dexcom' - , date: record_time.unix() * 1000 - , dateString: record_time.format() - , mbg: proto.mbg || proto.meter_bg_mgdl - , type: 'mbg' - }; - return obj; -} - -function createMeterProtoStream(packet, download_time) { - var stream = es.readArray(packet.meter); - var receiver_time = packet.receiver_system_time_sec; - +function iter_mqtt_record_stream (packet, prop, sync) { + var list = packet[prop]; + console.log('incoming', prop, (list || [ ]).length); + var stream = es.readArray(list || [ ]); + var receiver_time = packet.receiver_system_time_sec; + var download_time = moment(packet.download_timestamp); function map(item, next) { - var r = toMeter(item, receiver_time, download_time); - console.log("ITEM", item, "TO Meter", r); + var timestamped = toTimestamp(item, receiver_time, download_time.clone( )); + var r = sync(item, timestamped); + if (!('type' in r)) { + r.type = prop; + } + console.log("ITEM", item, "TO", prop, r); next(null, r); } - return stream.pipe(es.map(map)); } @@ -211,22 +136,26 @@ function configure(env, core, devicestatus) { } console.log('DOWNLOAD msg', msg.length, packet); console.log('download SGV', packet.sgv[0]); - console.log('download_timestamp', packet.download_timestamp, Date.parse(packet.download_timestamp)); + console.log('download_timestamp', packet.download_timestamp, new Date(Date.parse(packet.download_timestamp))); console.log("WRITE TO MONGO"); var download_timestamp = moment(packet.download_timestamp); if (packet.download_status == 0) { - createProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { - console.log("DONE WRITING SGV TO MONGO", result); - })); - createCalProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { - console.log("DONE WRITING Cal TO MONGO", result); - })); - createMeterProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { - console.log("DONE WRITING Meter TO MONGO", result); - })); - createSensorProtoStream(packet, download_timestamp).pipe(core.persist(function empty(err, result) { - console.log("DONE WRITING Sensor TO MONGO", err); - })); + iter_mqtt_record_stream(packet, 'sgv', toSGV) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING SGV TO MONGO", err, result.length); + })); + iter_mqtt_record_stream(packet, 'cal', toCal) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Cal TO MONGO", err, result.length); + })); + iter_mqtt_record_stream(packet, 'meter', toMeter) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Meter TO MONGO", err, result.length); + })); + iter_mqtt_record_stream(packet, 'sensor', toSensor) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Sensor TO MONGO", err, result.length); + })); } packet.type = "download"; devicestatus.create({ From de7cb78dcafb51f01d44bd0aaba65ad626d64cbf Mon Sep 17 00:00:00 2001 From: Ben West Date: Fri, 9 Jan 2015 15:43:25 -0800 Subject: [PATCH 18/24] force strict zero check --- lib/mqtt.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mqtt.js b/lib/mqtt.js index b7c81d88a57..8f2a9fc6f57 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -139,7 +139,7 @@ function configure(env, core, devicestatus) { console.log('download_timestamp', packet.download_timestamp, new Date(Date.parse(packet.download_timestamp))); console.log("WRITE TO MONGO"); var download_timestamp = moment(packet.download_timestamp); - if (packet.download_status == 0) { + if (packet.download_status === 0) { iter_mqtt_record_stream(packet, 'sgv', toSGV) .pipe(core.persist(function empty(err, result) { console.log("DONE WRITING SGV TO MONGO", err, result.length); From 469d86878e6f1ef9a9d6f91496ae1be43861038b Mon Sep 17 00:00:00 2001 From: Ben West Date: Tue, 17 Mar 2015 22:40:02 -0700 Subject: [PATCH 19/24] rebasing, it seems to run --- app.js | 67 +++++++++++++++++++++++++++++ lib/devicestatus.js | 7 ++-- lib/entries.js | 7 ++-- lib/treatments.js | 6 +-- package.json | 4 +- server.js | 100 +++++++++++--------------------------------- 6 files changed, 102 insertions(+), 89 deletions(-) create mode 100644 app.js diff --git a/app.js b/app.js new file mode 100644 index 00000000000..2fc1033604f --- /dev/null +++ b/app.js @@ -0,0 +1,67 @@ + +var express = require('express'); +function create (env) { + var store = env.store; + var pushover = require('./lib/pushover')(env); + /////////////////////////////////////////////////// + // api and json object variables + /////////////////////////////////////////////////// + var entries = require('./lib/entries')(env.mongo_collection, store); + var settings = require('./lib/settings')(env.settings_collection, store); + var treatments = require('./lib/treatments')(env.treatments_collection, store, pushover); + var profile = require('./lib/profile')(env.profile_collection, store); + + var devicestatus = require('./lib/devicestatus')(env.devicestatus_collection, store); + var api = require('./lib/api/')(env, entries, settings, treatments, profile, devicestatus); + var pebble = require('./lib/pebble'); + var compression = require('compression'); + + var app = express(); + app.entries = entries; + app.treatments = treatments; + app.profiles = profile; + app.devicestatus = devicestatus; + var appInfo = env.name + ' ' + env.version; + app.set('title', appInfo); + app.enable('trust proxy'); // Allows req.secure test on heroku https connections. + + app.use(compression({filter: shouldCompress})); + + function shouldCompress(req, res) { + //TODO: return false here if we find a condition where we don't want to compress + // fallback to standard filter function + return compression.filter(req, res); + } + + //if (env.api_secret) { + // console.log("API_SECRET", env.api_secret); + //} + app.use('/api/v1', api); + + + // pebble data + app.get('/pebble', pebble(entries, treatments, profile, devicestatus)); + + //app.get('/package.json', software); + + // define static server + //TODO: JC - changed cache to 1 hour from 30d ays to bypass cache hell until we have a real solution + var staticFiles = express.static(env.static_files, {maxAge: 60 * 60 * 1000}); + + // serve the static content + app.use(staticFiles); + + var bundle = require('./bundle')(); + app.use(bundle); + +// Handle errors with express's errorhandler, to display more readable error messages. + + // Handle errors with express's errorhandler, to display more readable error messages. + var errorhandler = require('errorhandler'); + //if (process.env.NODE_ENV === 'development') { + app.use(errorhandler()); + //} + return app; +} +module.exports = create; + diff --git a/lib/devicestatus.js b/lib/devicestatus.js index 82a2781eb0b..5195ecb5f4f 100644 --- a/lib/devicestatus.js +++ b/lib/devicestatus.js @@ -42,7 +42,6 @@ function ensureIndexes(name, storage) { }); } -module.exports = { - storage: storage, - ensureIndexes: ensureIndexes -}; + +storage.ensureIndexes = ensureIndexes; +module.exports = storage; diff --git a/lib/entries.js b/lib/entries.js index 4363bf46967..4310e5db81d 100644 --- a/lib/entries.js +++ b/lib/entries.js @@ -196,7 +196,6 @@ function ensureIndexes(name, storage) { } // expose module -module.exports = { - storage: storage, - ensureIndexes: ensureIndexes -}; +storage.ensureIndexes = ensureIndexes; +module.exports = storage; + diff --git a/lib/treatments.js b/lib/treatments.js index c6dfae54bbb..27bd98e2444 100644 --- a/lib/treatments.js +++ b/lib/treatments.js @@ -137,8 +137,6 @@ function ensureIndexes(name, storage) { } }); } +storage.ensureIndexes = ensureIndexes; +module.exports = storage; -module.exports = { - storage: storage, - ensureIndexes: ensureIndexes -}; diff --git a/package.json b/package.json index 92b50a630df..27f30c05ab1 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,9 @@ "moment": "2.8.1", "pushover-notifications": "0.2.0", "sgvdata": "0.0.2", - "socket.io": "^0.9.17" + "socket.io": "^0.9.17", + "git-rev": "git://github.com/bewest/git-rev.git", + "bootevent": "0.0.1" }, "devDependencies": { "istanbul": "~0.3.5", diff --git a/server.js b/server.js index 10eb3602847..4bff559d76b 100644 --- a/server.js +++ b/server.js @@ -26,79 +26,15 @@ // DB Connection setup and utils /////////////////////////////////////////////////// -var software = require('./package.json'); var env = require('./env')( ); -var pushover = require('./lib/pushover')(env); - -var entries = require('./lib/entries'); -var treatments = require('./lib/treatments'); -var devicestatus = require('./lib/devicestatus'); - -var store = require('./lib/storage')(env, function() { - console.info("Mongo ready"); - entries.ensureIndexes(env.mongo_collection, store); - treatments.ensureIndexes(env.treatments_collection, store); - devicestatus.ensureIndexes(env.devicestatus_collection, store); -}); - - -var express = require('express'); -var compression = require('compression'); - -/////////////////////////////////////////////////// -// api and json object variables -/////////////////////////////////////////////////// -var entriesStorage = entries.storage(env.mongo_collection, store, pushover); -var settings = require('./lib/settings')(env.settings_collection, store); -var treatmentsStorage = treatments.storage(env.treatments_collection, store, pushover); -var profile = require('./lib/profile')(env.profile_collection, store); -var devicestatusStorage = devicestatus.storage(env.devicestatus_collection, store); -var api = require('./lib/api/')(env, entriesStorage, settings, treatmentsStorage, profile, devicestatusStorage); -var pebble = require('./lib/pebble'); -/////////////////////////////////////////////////// +var store = require('./lib/storage')(env) /////////////////////////////////////////////////// // setup http server /////////////////////////////////////////////////// var PORT = env.PORT; -var app = express(); -var appInfo = software.name + ' ' + software.version; -app.set('title', appInfo); -app.enable('trust proxy'); // Allows req.secure test on heroku https connections. - -app.use(compression({filter: shouldCompress})); - -function shouldCompress(req, res) { - //TODO: return false here if we find a condition where we don't want to compress - // fallback to standard filter function - return compression.filter(req, res); -} - -app.use('/api/v1', api); - -// pebble data -app.get('/pebble', pebble(entriesStorage, treatmentsStorage, profile, devicestatusStorage, env)); - -//app.get('/package.json', software); - -// define static server -//TODO: JC - changed cache to 1 hour from 30d ays to bypass cache hell until we have a real solution -var staticFiles = express.static(env.static_files, {maxAge: 60 * 60 * 1000}); - -// serve the static content -app.use(staticFiles); - -var bundle = require('./bundle')(); -app.use(bundle); - -// Handle errors with express's errorhandler, to display more readable error messages. -var errorhandler = require('errorhandler'); -//if (process.env.NODE_ENV === 'development') { - app.use(errorhandler()); -//} - -function create ( ) { +function create (app) { var transport = (env.ssl ? require('https') : require('http')); if (env.ssl) { @@ -107,15 +43,27 @@ function create ( ) { return transport.createServer(app); } -store(function ready ( ) { - var server = create( ).listen(PORT); - console.log('listening', PORT); - - /////////////////////////////////////////////////// - // setup socket io for data and message transmission - /////////////////////////////////////////////////// - var websocket = require('./lib/websocket'); - var io = websocket(env, server, entriesStorage, treatmentsStorage, profile, devicestatusStorage); -}); +var bootevent = require('bootevent'); +bootevent( ) + .acquire(function db (ctx, next) { + // initialize db connections + store( function ready ( ) { + console.log('storage system ready'); + ctx.store = store; + next( ); + }); + }) + .boot(function booted (ctx) { + env.store = ctx.store; + var app = require('./app')(env); + var server = create(app).listen(PORT); + console.log('listening', PORT); + /////////////////////////////////////////////////// + // setup socket io for data and message transmission + /////////////////////////////////////////////////// + var websocket = require('./lib/websocket'); + var io = websocket(env, server, app.entries, app.treatments, app.profiles, app.devicestatus); + }) +; /////////////////////////////////////////////////// From 1a38f4e1911c37ca644b35b4715ab7d21a3a631f Mon Sep 17 00:00:00 2001 From: Ben West Date: Sun, 2 Nov 2014 08:09:34 -0800 Subject: [PATCH 20/24] fix broken test --- lib/entries.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/entries.js b/lib/entries.js index 4310e5db81d..88b4f301dd7 100644 --- a/lib/entries.js +++ b/lib/entries.js @@ -196,6 +196,7 @@ function ensureIndexes(name, storage) { } // expose module +storage.storage = storage; storage.ensureIndexes = ensureIndexes; module.exports = storage; From 6705b621d5ce861dcc1c58ccaa390c3ac25b189a Mon Sep 17 00:00:00 2001 From: Ben West Date: Mon, 3 Nov 2014 20:47:51 -0800 Subject: [PATCH 21/24] bring back ensureIndexes Make sure indexes are created for commonly sought fields. --- app.js | 26 ++---- lib/bootevent.js | 39 +++++++++ lib/devicestatus.js | 15 +--- lib/entries.js | 13 +-- lib/models/downloads.js | 0 lib/models/entries.js | 42 +++++++++ lib/pebble.js~ | 187 ++++++++++++++++++++++++++++++++++++++++ lib/poller.js | 18 ++++ lib/treatments.js | 13 +-- server.js | 17 +--- 10 files changed, 308 insertions(+), 62 deletions(-) create mode 100644 lib/bootevent.js create mode 100644 lib/models/downloads.js create mode 100644 lib/models/entries.js create mode 100644 lib/pebble.js~ create mode 100644 lib/poller.js diff --git a/app.js b/app.js index 2fc1033604f..88852d3b718 100644 --- a/app.js +++ b/app.js @@ -1,26 +1,18 @@ var express = require('express'); -function create (env) { - var store = env.store; - var pushover = require('./lib/pushover')(env); +var compression = require('compression'); +function create (env, ctx) { /////////////////////////////////////////////////// // api and json object variables /////////////////////////////////////////////////// - var entries = require('./lib/entries')(env.mongo_collection, store); - var settings = require('./lib/settings')(env.settings_collection, store); - var treatments = require('./lib/treatments')(env.treatments_collection, store, pushover); - var profile = require('./lib/profile')(env.profile_collection, store); - - var devicestatus = require('./lib/devicestatus')(env.devicestatus_collection, store); - var api = require('./lib/api/')(env, entries, settings, treatments, profile, devicestatus); - var pebble = require('./lib/pebble'); - var compression = require('compression'); + var api = require('./lib/api/')(env, ctx.entries, ctx.settings, ctx.treatments, ctx.profiles, ctx.devicestatus); + var pebble = ctx.pebble; var app = express(); - app.entries = entries; - app.treatments = treatments; - app.profiles = profile; - app.devicestatus = devicestatus; + app.entries = ctx.entries; + app.treatments = ctx.treatments; + app.profiles = ctx.profiles; + app.devicestatus = ctx.devicestatus; var appInfo = env.name + ' ' + env.version; app.set('title', appInfo); app.enable('trust proxy'); // Allows req.secure test on heroku https connections. @@ -40,7 +32,7 @@ function create (env) { // pebble data - app.get('/pebble', pebble(entries, treatments, profile, devicestatus)); + app.get('/pebble', pebble(ctx.entries, ctx.treatments, ctx.profiles, ctx.devicestatus)); //app.get('/package.json', software); diff --git a/lib/bootevent.js b/lib/bootevent.js new file mode 100644 index 00000000000..c6c87196c24 --- /dev/null +++ b/lib/bootevent.js @@ -0,0 +1,39 @@ + +var bootevent = require('bootevent'); + +function boot (env) { + var store = require('./storage')(env); + var proc = bootevent( ) + .acquire(function db (ctx, next) { + // initialize db connections + store( function ready ( ) { + console.log('storage system ready'); + ctx.store = store; + next( ); + }); + }) + .acquire(function data (ctx, next) { + /////////////////////////////////////////////////// + // api and json object variables + /////////////////////////////////////////////////// + ctx.pushover = require('./pushover')(env); + ctx.entries = require('./entries')(env.mongo_collection, ctx.store, ctx.pushover); + ctx.settings = require('./settings')(env.settings_collection, ctx.store); + ctx.treatments = require('./treatments')(env.treatments_collection, ctx.store, ctx.pushover); + ctx.devicestatus = require('./devicestatus')(env.devicestatus_collection, ctx.store); + ctx.profiles = require('./profile')(env.profile_collection, ctx.store); + ctx.pebble = require('./pebble'); + console.info("Ensuring indexes"); + + console.log(ctx.entries, ctx.entries.indexedFields); + store.ensureIndexes(ctx.entries( ), ctx.entries.indexedFields); + store.ensureIndexes(ctx.treatments( ), ctx.treatments.indexedFields); + store.ensureIndexes(ctx.devicestatus( ), ctx.devicestatus.indexedFields); + + next( ); + }) + ; + return proc; + +} +module.exports = boot; diff --git a/lib/devicestatus.js b/lib/devicestatus.js index 5195ecb5f4f..b839944442e 100644 --- a/lib/devicestatus.js +++ b/lib/devicestatus.js @@ -29,19 +29,10 @@ function storage (collection, storage) { api.list = list; api.create = create; api.last = last; + api.indexedFields = indexedFields; return api; } +var indexedFields = ['created_at']; +storage.indexedFields = indexedFields; -function ensureIndexes(name, storage) { - storage.with_collection(name)(function (err, collection) { - if (err) { - console.error("ensureIndexes, unable to get collection for: " + name + " - " + err); - } else { - storage.ensureIndexes(collection, ['created_at']); - } - }); -} - - -storage.ensureIndexes = ensureIndexes; module.exports = storage; diff --git a/lib/entries.js b/lib/entries.js index 88b4f301dd7..5854905be44 100644 --- a/lib/entries.js +++ b/lib/entries.js @@ -182,21 +182,14 @@ function storage(name, storage, pushover) { api.persist = persist; api.getEntries = getEntries; api.getEntry = getEntry; + api.indexedFields = indexedFields; return api; } -function ensureIndexes(name, storage) { - storage.with_collection(name)(function(err, collection) { - if (err) { - console.error("ensureIndexes, unable to get collection for: " + name + " - " + err); - } else { - storage.ensureIndexes(collection, ['date', 'type', 'sgv']); - } - }); -} +var indexedFields = [ 'date', 'type', 'sgv' ]; +storage.indexedFields = indexedFields; // expose module storage.storage = storage; -storage.ensureIndexes = ensureIndexes; module.exports = storage; diff --git a/lib/models/downloads.js b/lib/models/downloads.js new file mode 100644 index 00000000000..e69de29bb2d diff --git a/lib/models/entries.js b/lib/models/entries.js new file mode 100644 index 00000000000..8bea7ae6b25 --- /dev/null +++ b/lib/models/entries.js @@ -0,0 +1,42 @@ + +var _ = require('lodash'); + +var config = { + keys: ['sgv' ] + , fields: ['sgv', 'timestamp'] + // , +}; + +function sgvs ( ) { + +} + + +function make_timestamp ( ) { +} + +function make_field ( ) { +} + +function sanitizer (opts) { + +} + +function index (opts) { + function calc (data) { + var o = lo.pick(data, opts.keys); + var scheme, r; + var stack = [ ]; + if (o.type) { + scheme = o.type + ':/'; + } + opts.fields.forEach(function iter (elem) { + stack.push(data[elem]); + }); + r = [ scheme ].concat(stack).join('/'); + return r; + } + return calc; +} + +module.exports = index; diff --git a/lib/pebble.js~ b/lib/pebble.js~ new file mode 100644 index 00000000000..cf328b416d7 --- /dev/null +++ b/lib/pebble.js~ @@ -0,0 +1,187 @@ +'use strict'; + +var DIRECTIONS = { + NONE: 0 +, DoubleUp: 1 +, SingleUp: 2 +, FortyFiveUp: 3 +, Flat: 4 +, FortyFiveDown: 5 +, SingleDown: 6 +, DoubleDown: 7 +, 'NOT COMPUTABLE': 8 +, 'RATE OUT OF RANGE': 9 +}; + +var iob = require("./iob")(); +var async = require('async'); + +function directionToTrend (direction) { + var trend = 8; + if (direction in DIRECTIONS) { + trend = DIRECTIONS[direction]; + } + return trend; +} + +function pebble (req, res) { + var ONE_DAY = 24 * 60 * 60 * 1000 + , uploaderBattery + , treatmentResults + , profileResult + , sgvData = [ ] + , calData = [ ]; + + function scaleBg(bg) { + if (req.mmol) { + return (Math.round((bg / 18) * 10) / 10).toFixed(1); + } else { + return bg; + } + } + + function sendData () { + var now = Date.now(); + + //for compatibility we're keeping battery and iob here, but they would be better somewhere else + if (sgvData.length > 0) { + sgvData[0].battery = uploaderBattery ? "" + uploaderBattery : undefined; + if (req.iob) { + sgvData[0].iob = iob.calcTotal(treatmentResults.slice(0, 20), profileResult, new Date(now)).display; + } + } + + var result = { status: [ {now: now} ], bgs: sgvData.slice(0, req.count), cals: calData }; + res.setHeader('content-type', 'application/json'); + res.write(JSON.stringify(result)); + res.end( ); + } + + var earliest_data = Date.now() - ONE_DAY; + + async.parallel({ + devicestatus: function (callback) { + req.devicestatus.last(function (err, value) { + if (!err && value) { + uploaderBattery = value.uploaderBattery; + } else { + console.error("req.devicestatus.tail", err); + } + callback(); + }); + } + , treatments: function(callback) { + loadTreatments(req, earliest_data, function (err, trs) { + treatmentResults = trs; + callback(); + }); + } + , profile: function(callback) { + loadProfile(req, function (err, profileResults) { + if (!err && profileResults) { + profileResults.forEach(function (profile) { + if (profile) { + if (profile.dia) { + profileResult = profile; + } + } + }); + } else { + console.error("pebble profile error", arguments); + } + callback(); + }); + } + , cal: function(callback) { + if (req.rawbg) { + var cq = { count: req.count, find: {type: 'cal'} }; + req.entries.list(cq, function (err, results) { + results.forEach(function (element) { + if (element) { + calData.push({ + slope: Math.round(element.slope) + , intercept: Math.round(element.intercept) + , scale: Math.round(element.scale) + }); + } + }); + callback(); + }); + } else { + callback(); + } + } + , entries: function(callback) { + var q = { count: req.count + 1, find: { "sgv": { $exists: true }} }; + + req.entries.list(q, function(err, results) { + results.forEach(function(element, index) { + if (element) { + var obj = {}; + var next = null; + var sgvs = results.filter(function(d) { + return !!d.sgv; + }); + if (index + 1 < sgvs.length) { + next = sgvs[index + 1]; + } + obj.sgv = scaleBg(element.sgv).toString(); + obj.bgdelta = (next ? (scaleBg(element.sgv) - scaleBg(next.sgv) ) : 0); + if (req.mmol) { + obj.bgdelta = obj.bgdelta.toFixed(1); + } + if ('direction' in element) { + obj.trend = directionToTrend(element.direction); + obj.direction = element.direction; + } + obj.datetime = element.date; + if (req.rawbg) { + obj.filtered = element.filtered; + obj.unfiltered = element.unfiltered; + obj.noise = element.noise; + } + sgvData.push(obj); + } + }); + callback(); + }); + } + }, sendData); + +} + +function loadTreatments(req, earliest_data, fn) { + if (req.iob) { + var q = { find: {"created_at": {"$gte": new Date(earliest_data).toISOString()}} }; + req.treatments.list(q, fn); + } else { + fn(null, []); + } +} + +function loadProfile(req, fn) { + if (req.iob) { + req.profile.list(fn); + } else { + fn(null, []); + } +} + +function configure (entries, treatments, profile, devicestatus, env) { + function middle (req, res, next) { + req.entries = entries; + req.treatments = treatments; + req.profile = profile; + req.devicestatus = devicestatus; + req.rawbg = env.enable && env.enable.indexOf('rawbg') > -1; + req.iob = env.enable && env.enable.indexOf('iob') > -1; + req.mmol = (req.query.units || env.DISPLAY_UNITS) === 'mmol'; + req.count = parseInt(req.query.count) || 1; + + next( ); + } + return [middle, pebble]; +} + +configure.pebble = pebble; +module.exports = configure; diff --git a/lib/poller.js b/lib/poller.js new file mode 100644 index 00000000000..fd78327a1b2 --- /dev/null +++ b/lib/poller.js @@ -0,0 +1,18 @@ + +var es = require('event-stream'); + +function create (core) { + function heartbeat (ev) { + + } + core.inputs.on('heartbeat', heartbeat); + function make (beat) { + var poll = {heart: beat}; + return poll; + } + function writer (data) { + } + var poller = es.through(writer); +} + +module.exports = create; diff --git a/lib/treatments.js b/lib/treatments.js index 27bd98e2444..ddbc56cadd7 100644 --- a/lib/treatments.js +++ b/lib/treatments.js @@ -125,18 +125,11 @@ function storage (collection, storage, pushover) { api.list = list; api.create = create; + api.indexedFields = indexedFields; return api; } +var indexedFields = ['created_at', 'eventType']; +storage.indexedFields = indexedFields; -function ensureIndexes(name, storage) { - storage.with_collection(name)(function (err, collection) { - if (err) { - console.error("ensureIndexes, unable to get collection for: " + name + " - " + err); - } else { - storage.ensureIndexes(collection, ['created_at', 'eventType']); - } - }); -} -storage.ensureIndexes = ensureIndexes; module.exports = storage; diff --git a/server.js b/server.js index 4bff559d76b..1924ae4624c 100644 --- a/server.js +++ b/server.js @@ -27,7 +27,7 @@ /////////////////////////////////////////////////// var env = require('./env')( ); -var store = require('./lib/storage')(env) + /////////////////////////////////////////////////// // setup http server @@ -43,19 +43,10 @@ function create (app) { return transport.createServer(app); } -var bootevent = require('bootevent'); -bootevent( ) - .acquire(function db (ctx, next) { - // initialize db connections - store( function ready ( ) { - console.log('storage system ready'); - ctx.store = store; - next( ); - }); - }) - .boot(function booted (ctx) { +var bootevent = require('./lib/bootevent'); +bootevent(env).boot(function booted (ctx) { env.store = ctx.store; - var app = require('./app')(env); + var app = require('./app')(env, ctx); var server = create(app).listen(PORT); console.log('listening', PORT); /////////////////////////////////////////////////// From 0cdd3c0835e55cd3009d68e7ed1334c07343f84f Mon Sep 17 00:00:00 2001 From: Ben West Date: Wed, 18 Mar 2015 01:42:33 -0700 Subject: [PATCH 22/24] rm spurious tmp file --- lib/pebble.js~ | 187 ------------------------------------------------- 1 file changed, 187 deletions(-) delete mode 100644 lib/pebble.js~ diff --git a/lib/pebble.js~ b/lib/pebble.js~ deleted file mode 100644 index cf328b416d7..00000000000 --- a/lib/pebble.js~ +++ /dev/null @@ -1,187 +0,0 @@ -'use strict'; - -var DIRECTIONS = { - NONE: 0 -, DoubleUp: 1 -, SingleUp: 2 -, FortyFiveUp: 3 -, Flat: 4 -, FortyFiveDown: 5 -, SingleDown: 6 -, DoubleDown: 7 -, 'NOT COMPUTABLE': 8 -, 'RATE OUT OF RANGE': 9 -}; - -var iob = require("./iob")(); -var async = require('async'); - -function directionToTrend (direction) { - var trend = 8; - if (direction in DIRECTIONS) { - trend = DIRECTIONS[direction]; - } - return trend; -} - -function pebble (req, res) { - var ONE_DAY = 24 * 60 * 60 * 1000 - , uploaderBattery - , treatmentResults - , profileResult - , sgvData = [ ] - , calData = [ ]; - - function scaleBg(bg) { - if (req.mmol) { - return (Math.round((bg / 18) * 10) / 10).toFixed(1); - } else { - return bg; - } - } - - function sendData () { - var now = Date.now(); - - //for compatibility we're keeping battery and iob here, but they would be better somewhere else - if (sgvData.length > 0) { - sgvData[0].battery = uploaderBattery ? "" + uploaderBattery : undefined; - if (req.iob) { - sgvData[0].iob = iob.calcTotal(treatmentResults.slice(0, 20), profileResult, new Date(now)).display; - } - } - - var result = { status: [ {now: now} ], bgs: sgvData.slice(0, req.count), cals: calData }; - res.setHeader('content-type', 'application/json'); - res.write(JSON.stringify(result)); - res.end( ); - } - - var earliest_data = Date.now() - ONE_DAY; - - async.parallel({ - devicestatus: function (callback) { - req.devicestatus.last(function (err, value) { - if (!err && value) { - uploaderBattery = value.uploaderBattery; - } else { - console.error("req.devicestatus.tail", err); - } - callback(); - }); - } - , treatments: function(callback) { - loadTreatments(req, earliest_data, function (err, trs) { - treatmentResults = trs; - callback(); - }); - } - , profile: function(callback) { - loadProfile(req, function (err, profileResults) { - if (!err && profileResults) { - profileResults.forEach(function (profile) { - if (profile) { - if (profile.dia) { - profileResult = profile; - } - } - }); - } else { - console.error("pebble profile error", arguments); - } - callback(); - }); - } - , cal: function(callback) { - if (req.rawbg) { - var cq = { count: req.count, find: {type: 'cal'} }; - req.entries.list(cq, function (err, results) { - results.forEach(function (element) { - if (element) { - calData.push({ - slope: Math.round(element.slope) - , intercept: Math.round(element.intercept) - , scale: Math.round(element.scale) - }); - } - }); - callback(); - }); - } else { - callback(); - } - } - , entries: function(callback) { - var q = { count: req.count + 1, find: { "sgv": { $exists: true }} }; - - req.entries.list(q, function(err, results) { - results.forEach(function(element, index) { - if (element) { - var obj = {}; - var next = null; - var sgvs = results.filter(function(d) { - return !!d.sgv; - }); - if (index + 1 < sgvs.length) { - next = sgvs[index + 1]; - } - obj.sgv = scaleBg(element.sgv).toString(); - obj.bgdelta = (next ? (scaleBg(element.sgv) - scaleBg(next.sgv) ) : 0); - if (req.mmol) { - obj.bgdelta = obj.bgdelta.toFixed(1); - } - if ('direction' in element) { - obj.trend = directionToTrend(element.direction); - obj.direction = element.direction; - } - obj.datetime = element.date; - if (req.rawbg) { - obj.filtered = element.filtered; - obj.unfiltered = element.unfiltered; - obj.noise = element.noise; - } - sgvData.push(obj); - } - }); - callback(); - }); - } - }, sendData); - -} - -function loadTreatments(req, earliest_data, fn) { - if (req.iob) { - var q = { find: {"created_at": {"$gte": new Date(earliest_data).toISOString()}} }; - req.treatments.list(q, fn); - } else { - fn(null, []); - } -} - -function loadProfile(req, fn) { - if (req.iob) { - req.profile.list(fn); - } else { - fn(null, []); - } -} - -function configure (entries, treatments, profile, devicestatus, env) { - function middle (req, res, next) { - req.entries = entries; - req.treatments = treatments; - req.profile = profile; - req.devicestatus = devicestatus; - req.rawbg = env.enable && env.enable.indexOf('rawbg') > -1; - req.iob = env.enable && env.enable.indexOf('iob') > -1; - req.mmol = (req.query.units || env.DISPLAY_UNITS) === 'mmol'; - req.count = parseInt(req.query.count) || 1; - - next( ); - } - return [middle, pebble]; -} - -configure.pebble = pebble; -module.exports = configure; From 7578c1af5a9cceba4ac6ed0f5998b397094ce54e Mon Sep 17 00:00:00 2001 From: Ben West Date: Wed, 18 Mar 2015 01:52:23 -0700 Subject: [PATCH 23/24] make mqtt listener more unique --- lib/models/downloads.js | 0 lib/models/entries.js | 42 ----------------------------------------- lib/mqtt.js | 2 +- 3 files changed, 1 insertion(+), 43 deletions(-) delete mode 100644 lib/models/downloads.js delete mode 100644 lib/models/entries.js diff --git a/lib/models/downloads.js b/lib/models/downloads.js deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/lib/models/entries.js b/lib/models/entries.js deleted file mode 100644 index 8bea7ae6b25..00000000000 --- a/lib/models/entries.js +++ /dev/null @@ -1,42 +0,0 @@ - -var _ = require('lodash'); - -var config = { - keys: ['sgv' ] - , fields: ['sgv', 'timestamp'] - // , -}; - -function sgvs ( ) { - -} - - -function make_timestamp ( ) { -} - -function make_field ( ) { -} - -function sanitizer (opts) { - -} - -function index (opts) { - function calc (data) { - var o = lo.pick(data, opts.keys); - var scheme, r; - var stack = [ ]; - if (o.type) { - scheme = o.type + ':/'; - } - opts.fields.forEach(function iter (elem) { - stack.push(data[elem]); - }); - r = [ scheme ].concat(stack).join('/'); - return r; - } - return calc; -} - -module.exports = index; diff --git a/lib/mqtt.js b/lib/mqtt.js index 8f2a9fc6f57..b67095fe66c 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -101,7 +101,7 @@ function configure(env, core, devicestatus) { var opts = { encoding: 'binary', clean: false, - clientId: 'master' + clientId: env.head }; var client = mqtt.connect(uri, opts); var downloads = downloader(); From d185010538138d2394894b66ff3e13d31b1415f8 Mon Sep 17 00:00:00 2001 From: Ben West Date: Wed, 18 Mar 2015 01:53:21 -0700 Subject: [PATCH 24/24] remove spurious, unused code --- lib/entries.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/entries.js b/lib/entries.js index 58d949cb7f6..5854905be44 100644 --- a/lib/entries.js +++ b/lib/entries.js @@ -167,11 +167,6 @@ function storage(name, storage, pushover) { }); } - function writeStream (opts) { - function map (item, next) { - } - } - // closure to represent the API function api ( ) { // obtain handle usable for querying the collection associated