diff --git a/app.js b/app.js new file mode 100644 index 00000000000..88852d3b718 --- /dev/null +++ b/app.js @@ -0,0 +1,59 @@ + +var express = require('express'); +var compression = require('compression'); +function create (env, ctx) { + /////////////////////////////////////////////////// + // api and json object variables + /////////////////////////////////////////////////// + var api = require('./lib/api/')(env, ctx.entries, ctx.settings, ctx.treatments, ctx.profiles, ctx.devicestatus); + var pebble = ctx.pebble; + + var app = express(); + app.entries = ctx.entries; + app.treatments = ctx.treatments; + app.profiles = ctx.profiles; + app.devicestatus = ctx.devicestatus; + var appInfo = env.name + ' ' + env.version; + app.set('title', appInfo); + app.enable('trust proxy'); // Allows req.secure test on heroku https connections. + + app.use(compression({filter: shouldCompress})); + + function shouldCompress(req, res) { + //TODO: return false here if we find a condition where we don't want to compress + // fallback to standard filter function + return compression.filter(req, res); + } + + //if (env.api_secret) { + // console.log("API_SECRET", env.api_secret); + //} + app.use('/api/v1', api); + + + // pebble data + app.get('/pebble', pebble(ctx.entries, ctx.treatments, ctx.profiles, ctx.devicestatus)); + + //app.get('/package.json', software); + + // define static server + //TODO: JC - changed cache to 1 hour from 30d ays to bypass cache hell until we have a real solution + var staticFiles = express.static(env.static_files, {maxAge: 60 * 60 * 1000}); + + // serve the static content + app.use(staticFiles); + + var bundle = require('./bundle')(); + app.use(bundle); + +// Handle errors with express's errorhandler, to display more readable error messages. + + // Handle errors with express's errorhandler, to display more readable error messages. + var errorhandler = require('errorhandler'); + //if (process.env.NODE_ENV === 'development') { + app.use(errorhandler()); + //} + return app; +} +module.exports = create; + diff --git a/env.js b/env.js index 2669dee95fd..38f4822df75 100644 --- a/env.js +++ b/env.js @@ -32,11 +32,14 @@ function config ( ) { } env.version = software.version; env.name = software.name; - + env.MQTT_MONITOR = process.env.MQTT_MONITOR || null; env.DISPLAY_UNITS = readENV('DISPLAY_UNITS', 'mg/dl'); env.PORT = readENV('PORT', 1337); env.mongo = readENV('MONGO_CONNECTION') || readENV('MONGO') || readENV('MONGOLAB_URI'); env.mongo_collection = readENV('MONGO_COLLECTION', 'entries'); + if (env.MQTT_MONITOR) { + env.mqtt_client_id = [env.mongo.split('/').pop( ), env.mongo_collection].join('.'); + } env.settings_collection = readENV('MONGO_SETTINGS_COLLECTION', 'settings'); env.treatments_collection = readENV('MONGO_TREATMENTS_COLLECTION', 'treatments'); env.profile_collection = readENV('MONGO_PROFILE_COLLECTION', 'profile'); diff --git a/lib/bootevent.js b/lib/bootevent.js new file mode 100644 index 00000000000..c6c87196c24 --- /dev/null +++ b/lib/bootevent.js @@ -0,0 +1,39 @@ + +var bootevent = require('bootevent'); + +function boot (env) { + var store = require('./storage')(env); + var proc = bootevent( ) + .acquire(function db (ctx, next) { + // initialize db connections + store( function ready ( ) { + console.log('storage system ready'); + ctx.store = store; + next( ); + }); + }) + .acquire(function data (ctx, next) { + /////////////////////////////////////////////////// + // api and json object variables + /////////////////////////////////////////////////// + ctx.pushover = require('./pushover')(env); + ctx.entries = require('./entries')(env.mongo_collection, ctx.store, ctx.pushover); + ctx.settings = require('./settings')(env.settings_collection, ctx.store); + ctx.treatments = require('./treatments')(env.treatments_collection, ctx.store, ctx.pushover); + ctx.devicestatus = require('./devicestatus')(env.devicestatus_collection, ctx.store); + ctx.profiles = require('./profile')(env.profile_collection, ctx.store); + ctx.pebble = require('./pebble'); + console.info("Ensuring indexes"); + + console.log(ctx.entries, ctx.entries.indexedFields); + store.ensureIndexes(ctx.entries( ), ctx.entries.indexedFields); + store.ensureIndexes(ctx.treatments( ), ctx.treatments.indexedFields); + store.ensureIndexes(ctx.devicestatus( ), ctx.devicestatus.indexedFields); + + next( ); + }) + ; + return proc; + +} +module.exports = boot; diff --git a/lib/devicestatus.js b/lib/devicestatus.js index 82a2781eb0b..c2f8089b624 100644 --- a/lib/devicestatus.js +++ b/lib/devicestatus.js @@ -2,47 +2,47 @@ function storage (collection, storage) { - function create (obj, fn) { - obj.created_at = (new Date( )).toISOString( ); - api( ).insert(obj, function (err, doc) { - fn(null, doc); - }); - } - - function last(fn) { - return api( ).find({ }).sort({created_at: -1}).limit(1).toArray(function(err, entries) { - if (entries && entries.length > 0) - fn(err, entries[0]); - else - fn(err, null); - }); - } - - function list (fn) { - return api( ).find({ }).sort({created_at: -1}).toArray(fn); - } - - function api ( ) { - return storage.pool.db.collection(collection); - } + function create(obj, fn) { + if (! obj.hasOwnProperty("created_at")){ + obj.created_at = (new Date()).toISOString(); + } + api().insert(obj, function (err, doc) { + fn(null, doc); + }); + } + + function create_date_included(obj, fn) { + api().insert(obj, function (err, doc) { + fn(null, doc); + }); + + } + + function last(fn) { + return api().find({}).sort({created_at: -1}).limit(1).toArray(function (err, entries) { + if (entries && entries.length > 0) + fn(err, entries[0]); + else + fn(err, null); + }); + } + + function list(fn) { + return api().find({}).sort({created_at: -1}).toArray(fn); + } + + function api() { + return storage.pool.db.collection(collection); + } + api.list = list; api.create = create; api.last = last; + api.indexedFields = indexedFields; return api; } +var indexedFields = ['created_at']; +storage.indexedFields = indexedFields; -function ensureIndexes(name, storage) { - storage.with_collection(name)(function (err, collection) { - if (err) { - console.error("ensureIndexes, unable to get collection for: " + name + " - " + err); - } else { - storage.ensureIndexes(collection, ['created_at']); - } - }); -} - -module.exports = { - storage: storage, - ensureIndexes: ensureIndexes -}; +module.exports = storage; diff --git a/lib/entries.js b/lib/entries.js index 4363bf46967..5854905be44 100644 --- a/lib/entries.js +++ b/lib/entries.js @@ -182,21 +182,14 @@ function storage(name, storage, pushover) { api.persist = persist; api.getEntries = getEntries; api.getEntry = getEntry; + api.indexedFields = indexedFields; return api; } -function ensureIndexes(name, storage) { - storage.with_collection(name)(function(err, collection) { - if (err) { - console.error("ensureIndexes, unable to get collection for: " + name + " - " + err); - } else { - storage.ensureIndexes(collection, ['date', 'type', 'sgv']); - } - }); -} +var indexedFields = [ 'date', 'type', 'sgv' ]; +storage.indexedFields = indexedFields; // expose module -module.exports = { - storage: storage, - ensureIndexes: ensureIndexes -}; +storage.storage = storage; +module.exports = storage; + diff --git a/lib/mqtt.js b/lib/mqtt.js new file mode 100644 index 00000000000..b67095fe66c --- /dev/null +++ b/lib/mqtt.js @@ -0,0 +1,185 @@ +'use strict'; + +var es = require('event-stream'); +var Long = require('long'); +var decoders = require('sgvdata/lib/protobuf'); +var direction = require('sgvdata/lib/utils').direction; +var mqtt = require('mqtt'); +var moment = require('moment'); + +function process(client) { + var stream = es.through( + function _write(data) { + this.push(data); + } + ); + return stream; +} + +function every(storage) { + function iter(item, next) { + storage.create(item, next); + } + + return es.map(iter); +} + +function downloader() { + var opts = { + model: decoders.models.G4Download + , json: function (o) { + return o; + } + , payload: function (o) { + return o; + } + }; + return decoders(opts); +} + +function toSGV (proto, vars) { + vars.sgv = proto.sgv_mgdl; + vars.direction = direction(proto.trend); + vars.noise = proto.noise; + vars.type = 'sgv'; + return vars; +} + +function toCal (proto, vars) { + vars.slope = proto.slope; + vars.intercept = proto.intercept; + vars.scale = proto.scale; + vars.type = 'cal'; + return vars; +} + +function toSensor (proto, vars) { + vars.filtered = new Long(proto.filtered).toInt(); + vars.unfiltered = new Long(proto.unfiltered).toInt(); + vars.rssi = proto.rssi; + vars.type = 'sensor'; + return vars; +} + +function toMeter (proto, result) { + result.type = 'mbg'; + result.mbg = proto.mbg || proto.meter_bg_mgdl; + return result; +} + +function toTimestamp (proto, receiver_time, download_time) { + var record_offset = receiver_time - proto.sys_timestamp_sec; + var record_time = download_time.clone( ).subtract(record_offset, 'second'); + var obj = { + device: 'dexcom' + , date: record_time.unix() * 1000 + , dateString: record_time.format( ) + }; + return obj; +} + +function iter_mqtt_record_stream (packet, prop, sync) { + var list = packet[prop]; + console.log('incoming', prop, (list || [ ]).length); + var stream = es.readArray(list || [ ]); + var receiver_time = packet.receiver_system_time_sec; + var download_time = moment(packet.download_timestamp); + function map(item, next) { + var timestamped = toTimestamp(item, receiver_time, download_time.clone( )); + var r = sync(item, timestamped); + if (!('type' in r)) { + r.type = prop; + } + console.log("ITEM", item, "TO", prop, r); + next(null, r); + } + return stream.pipe(es.map(map)); +} + +function configure(env, core, devicestatus) { + var uri = env['MQTT_MONITOR']; + var opts = { + encoding: 'binary', + clean: false, + clientId: env.head + }; + var client = mqtt.connect(uri, opts); + var downloads = downloader(); + client.subscribe('sgvs'); + client.subscribe('published'); + client.subscribe('/downloads/protobuf', {qos: 2}, granted); + client.subscribe('/uploader', granted); + client.subscribe('/entries/sgv', granted); + function granted() { + console.log('granted', arguments); + } + + + client.on('message', function (topic, msg) { + console.log('topic', topic); + console.log(topic, 'on message', 'msg', msg.length); + switch (topic) { + case '/uploader': + console.log({type: topic, msg: msg.toString()}); + break; + case '/downloads/protobuf': + var b = new Buffer(msg, 'binary'); + console.log("BINARY", b.length, b.toString('hex')); + try { + var packet = downloads.parse(b); + if (!packet.type) { + packet.type = topic; + } + } catch (e) { + console.log("DID NOT PARSE", e); + break; + } + console.log('DOWNLOAD msg', msg.length, packet); + console.log('download SGV', packet.sgv[0]); + console.log('download_timestamp', packet.download_timestamp, new Date(Date.parse(packet.download_timestamp))); + console.log("WRITE TO MONGO"); + var download_timestamp = moment(packet.download_timestamp); + if (packet.download_status === 0) { + iter_mqtt_record_stream(packet, 'sgv', toSGV) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING SGV TO MONGO", err, result.length); + })); + iter_mqtt_record_stream(packet, 'cal', toCal) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Cal TO MONGO", err, result.length); + })); + iter_mqtt_record_stream(packet, 'meter', toMeter) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Meter TO MONGO", err, result.length); + })); + iter_mqtt_record_stream(packet, 'sensor', toSensor) + .pipe(core.persist(function empty(err, result) { + console.log("DONE WRITING Sensor TO MONGO", err, result.length); + })); + } + packet.type = "download"; + devicestatus.create({ + uploaderBattery: packet.uploader_battery, + created_at: download_timestamp.toISOString() + }, function empty(err, result) { + console.log("DONE WRITING TO MONGO devicestatus ", result, err); + }); + + core.create([ packet ], function empty(err, res) { + console.log("Download written to mongo: ", packet) + }); + + + // core.write(packet); + break; + default: + console.log(topic, 'on message', 'msg', msg); + // core.write(msg); + break; + } + }); + client.entries = process(client); + client.every = every; + return client; +} +module.exports = configure; diff --git a/lib/poller.js b/lib/poller.js new file mode 100644 index 00000000000..fd78327a1b2 --- /dev/null +++ b/lib/poller.js @@ -0,0 +1,18 @@ + +var es = require('event-stream'); + +function create (core) { + function heartbeat (ev) { + + } + core.inputs.on('heartbeat', heartbeat); + function make (beat) { + var poll = {heart: beat}; + return poll; + } + function writer (data) { + } + var poller = es.through(writer); +} + +module.exports = create; diff --git a/lib/treatments.js b/lib/treatments.js index c6dfae54bbb..ddbc56cadd7 100644 --- a/lib/treatments.js +++ b/lib/treatments.js @@ -125,20 +125,11 @@ function storage (collection, storage, pushover) { api.list = list; api.create = create; + api.indexedFields = indexedFields; return api; } +var indexedFields = ['created_at', 'eventType']; +storage.indexedFields = indexedFields; -function ensureIndexes(name, storage) { - storage.with_collection(name)(function (err, collection) { - if (err) { - console.error("ensureIndexes, unable to get collection for: " + name + " - " + err); - } else { - storage.ensureIndexes(collection, ['created_at', 'eventType']); - } - }); -} +module.exports = storage; -module.exports = { - storage: storage, - ensureIndexes: ensureIndexes -}; diff --git a/package.json b/package.json index 92b50a630df..8f1283ac747 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,16 @@ "test": "make test", "postinstall": "node node_modules/bower/bin/bower install" }, + "config": { + "blanket": { + "pattern": [ + "tests", "lib", "server", "app", "static/js" + ], + "data-cover-never": [ + "node_modules" + ] + } + }, "engines": { "node": "0.10.x" }, @@ -42,11 +52,15 @@ "express": "^4.6.1", "express-extension-to-accept": "0.0.2", "git-rev": "git://github.com/bewest/git-rev.git", + "long": "~2.2.3", "mongodb": "^1.4.7", "moment": "2.8.1", + "mqtt": "~0.3.11", "pushover-notifications": "0.2.0", - "sgvdata": "0.0.2", - "socket.io": "^0.9.17" + "sgvdata": "git://github.com/ktind/sgvdata.git#wip/protobuf", + "socket.io": "^0.9.17", + "git-rev": "git://github.com/bewest/git-rev.git", + "bootevent": "0.0.1" }, "devDependencies": { "istanbul": "~0.3.5", diff --git a/server.js b/server.js index 10eb3602847..bd8c7e0497c 100644 --- a/server.js +++ b/server.js @@ -26,79 +26,15 @@ // DB Connection setup and utils /////////////////////////////////////////////////// -var software = require('./package.json'); var env = require('./env')( ); -var pushover = require('./lib/pushover')(env); -var entries = require('./lib/entries'); -var treatments = require('./lib/treatments'); -var devicestatus = require('./lib/devicestatus'); - -var store = require('./lib/storage')(env, function() { - console.info("Mongo ready"); - entries.ensureIndexes(env.mongo_collection, store); - treatments.ensureIndexes(env.treatments_collection, store); - devicestatus.ensureIndexes(env.devicestatus_collection, store); -}); - - -var express = require('express'); -var compression = require('compression'); - -/////////////////////////////////////////////////// -// api and json object variables -/////////////////////////////////////////////////// -var entriesStorage = entries.storage(env.mongo_collection, store, pushover); -var settings = require('./lib/settings')(env.settings_collection, store); -var treatmentsStorage = treatments.storage(env.treatments_collection, store, pushover); -var profile = require('./lib/profile')(env.profile_collection, store); -var devicestatusStorage = devicestatus.storage(env.devicestatus_collection, store); -var api = require('./lib/api/')(env, entriesStorage, settings, treatmentsStorage, profile, devicestatusStorage); -var pebble = require('./lib/pebble'); -/////////////////////////////////////////////////// /////////////////////////////////////////////////// // setup http server /////////////////////////////////////////////////// var PORT = env.PORT; -var app = express(); -var appInfo = software.name + ' ' + software.version; -app.set('title', appInfo); -app.enable('trust proxy'); // Allows req.secure test on heroku https connections. - -app.use(compression({filter: shouldCompress})); - -function shouldCompress(req, res) { - //TODO: return false here if we find a condition where we don't want to compress - // fallback to standard filter function - return compression.filter(req, res); -} - -app.use('/api/v1', api); - -// pebble data -app.get('/pebble', pebble(entriesStorage, treatmentsStorage, profile, devicestatusStorage, env)); - -//app.get('/package.json', software); - -// define static server -//TODO: JC - changed cache to 1 hour from 30d ays to bypass cache hell until we have a real solution -var staticFiles = express.static(env.static_files, {maxAge: 60 * 60 * 1000}); - -// serve the static content -app.use(staticFiles); - -var bundle = require('./bundle')(); -app.use(bundle); - -// Handle errors with express's errorhandler, to display more readable error messages. -var errorhandler = require('errorhandler'); -//if (process.env.NODE_ENV === 'development') { - app.use(errorhandler()); -//} - -function create ( ) { +function create (app) { var transport = (env.ssl ? require('https') : require('http')); if (env.ssl) { @@ -107,15 +43,25 @@ function create ( ) { return transport.createServer(app); } -store(function ready ( ) { - var server = create( ).listen(PORT); - console.log('listening', PORT); - - /////////////////////////////////////////////////// - // setup socket io for data and message transmission - /////////////////////////////////////////////////// - var websocket = require('./lib/websocket'); - var io = websocket(env, server, entriesStorage, treatmentsStorage, profile, devicestatusStorage); -}); +var bootevent = require('./lib/bootevent'); +bootevent(env).boot(function booted (ctx) { + env.store = ctx.store; + var app = require('./app')(env, ctx); + var server = create(app).listen(PORT); + console.log('listening', PORT); + + if (env.MQTT_MONITOR) { + var mqtt = require('./lib/mqtt')(env, app.entries, app.devicestatus); + var es = require('event-stream'); + es.pipeline(mqtt.entries, app.entries.map( ), mqtt.every(app.entries)); + } + + /////////////////////////////////////////////////// + // setup socket io for data and message transmission + /////////////////////////////////////////////////// + var websocket = require('./lib/websocket'); + var io = websocket(env, server, app.entries, app.treatments, app.profiles, app.devicestatus); + }) +; ///////////////////////////////////////////////////