This repository has been archived by the owner on Nov 29, 2017. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 98
/
index.js
152 lines (118 loc) · 3.11 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* phant
* https://github.com/sparkfun/phant
*
* Copyright (c) 2014 SparkFun Electronics
* Licensed under the GPL v3 license.
*/
'use strict';
/**** Module dependencies ****/
var events = require('events'),
util = require('util');
/**** Make Phant an event emitter ****/
util.inherits(Phant, events.EventEmitter);
/**** Phant prototype ****/
var app = Phant.prototype;
/**** Expose Phant ****/
exports = module.exports = Phant;
/**** Expose Submodules ****/
Phant.HttpServer = require('./lib/http_server');
Phant.HttpsServer = require('./lib/https_server');
Phant.HttpInput = require('./lib/http_input');
Phant.HttpOutput = require('./lib/http_output');
Phant.Validator = require('./lib/validator');
Phant.TelnetManager = require('./lib/telnet_manager');
Phant.MemoryThrottler = require('./lib/memory_throttler');
Phant.AtomStream = require('./lib/atom_stream');
Phant.FieldStream = require('./lib/field_stream');
Phant.LimitStream = require('./lib/limit_stream');
Phant.SampleStream = require('./lib/sample_stream');
/**** Initialize a new Phant ****/
function Phant() {
if (!(this instanceof Phant)) {
return new Phant();
}
events.EventEmitter.call(this);
this.on('error', this.handleError);
this.managers = [];
this.inputs = [];
this.outputs = [];
}
/**** Log errors to console ****/
app.handleError = function() {
console.error.apply(console, arguments);
};
/**
* registerManager
*
* adds a new manager to the list of managers
*/
app.registerManager = function(manager) {
// push to list of managers
this.managers.push(manager);
};
/**
* registerInput
*
* adds a new input to the list of
* inputs, and listens for data and
* errors.
*/
app.registerInput = function(input) {
// push to list of inputs
this.inputs.push(input);
// listen for data, and pipe it to outputs
input.on('data', this.dataReceived.bind(this));
// listen for clear events and tell the outputs
// to wipe data if they are storing it
input.on('clear', this.clearStream.bind(this));
// pipe input errors to phant error handler
input.on('error', this.handleError.bind(
input,
input.moduleName + ':'
));
};
/**
* registerOutput
*
* adds a new output to the list of
* outputs, and listens for output errors.
*/
app.registerOutput = function(output) {
// push to list of outputs
this.outputs.push(output);
// pipe output errors to phant error handler
output.on('error', this.handleError.bind(
output,
output.moduleName + ':'
));
};
/**
* dataReceived
*
* send data to all registered outputs
*/
app.dataReceived = function(id, data) {
// loop through all outputs and give
// them the new data.
this.outputs.forEach(function(output) {
output.write(id, data);
});
// let the managers know data was received
// so they can update the last_push timestamp
this.managers.forEach(function(manager) {
manager.touch(id);
});
};
/**
* clearStream
*
* wipe the data from all persistent stores
*/
app.clearStream = function(id) {
// loop through all outputs and give
// them the new data.
this.outputs.forEach(function(output) {
output.clear(id);
});
};