-
Notifications
You must be signed in to change notification settings - Fork 6
/
memstore.js
73 lines (65 loc) · 1.82 KB
/
memstore.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
var _ = require('lodash')
, util = require('util')
, events = require('events')
, through = require('through')
;
function MemoryStore () {
this.dict = {}
this.sequences = {}
this.i = 0
}
util.inherits(MemoryStore, events.EventEmitter)
MemoryStore.prototype.get = function (key) {
return this.dict[key]
}
MemoryStore.prototype.put = function (key, value) {
this.i += 1
this.sequences[key] = {seq:this.i, id:key}
this.dict[key] = value
this.emit('change', this.sequences[key])
}
MemoryStore.prototype.getSequences = function (opts) {
var self = this
var seqStream = through()
setImmediate(function() {
self.changes(opts).map(function(ch) {
seqStream.queue(ch)
})
seqStream.queue(null)
})
return seqStream
}
MemoryStore.prototype.changes = function (opts) {
var self = this
opts.since = opts.since || 0
function _map (key) {
var change = _.clone(self.sequences[key], true)
if (opts.include_data) change.data = self.dict[key]
return change
}
function _filter (change) {
return change.seq > opts.since
}
return _.sortBy(Object.keys(this.sequences).map(_map).filter(_filter), 'seq')
}
MemoryStore.prototype.getContinuousSequences = function (opts) {
var ee = new events.EventEmitter()
, self = this
, emitted = 0
;
opts.limit = opts.limit || Infinity
function onChange (change) {
if (emitted > opts.limit) return ee.close()
var c = _.clone(change, true)
if (opts.include_data) c.data = self.dict[change.id]
ee.emit()
}
setImmediate(function () {
self.changes(opts).forEach(function (c) { if (emitted < opts.limit) ee.emit('data', c)})
self.on('change', onChange)
})
ee.on('data', function () {emitted += 1})
ee.close = function ( ) { self.removeListener('change', onChange) }
return ee
}
exports.MemoryStore = MemoryStore