Skip to content

Commit

Permalink
resolves pinojs#72 introduce a recovery queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ggrossetie committed Jul 9, 2022
1 parent 4d79b0e commit b5abeb1
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 3 deletions.
98 changes: 98 additions & 0 deletions lib/Queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'use strict'

const { emitWarning } = require('process')

/**
* @typedef {object} QueueOptions
* @prop {number} [maxSize] Positive integer to track the sizes of items added to the cache, and automatically evict items in order to stay below this size. Default: `Infinity`
* @prop {(any) => number?} [sizeCalculation] Function used to calculate the size of stored elements. Default: `element => element.length`.
*/

/**
* A linear data structure of a FIFO (First In - First Out) type.
*
* Inspired by:
* - https://vhudyma-blog.eu/implement-queue-in-javascript/
* - https://github.com/isaacs/node-lru-cache
*/
module.exports = class Queue {
/**
* @param {QueueOptions} opts
*/
constructor (opts) {
// The actual queue
this.elements = {}
// The index of the head element
this.head = 0
// The index of the tail element
this.tail = 0
this.calculatedSize = 0
this.sizes = {}
this.opts = {
maxSize: Infinity,
sizeCalculation: (element) => element.length,
...opts
}
}

enqueue (element) {
const index = this.tail
const size = this.opts.sizeCalculation(element)
if (size > this.opts.maxSize) {
emitWarning(`Unable to enqueue element because element size ${size} is greater than maxSize ${this.opts.maxSize}`)
return
}
this.sizes[index] = size
const maxSize = this.opts.maxSize - size
while (this.calculatedSize > maxSize) {
this.evict()
}
this.calculatedSize += size
// Add an element on the current tail index
this.elements[index] = element
// Increase the index of the tail element
// So the next elements are added at the end
this.tail++
}

evict () {
// If the queue is empty
if (this.tail === this.head) {
return undefined
}
const index = this.head
this.calculatedSize -= this.sizes[index]
delete this.sizes[index]
this.dequeue()
}

dequeue () {
const element = this.peek()
if (element === undefined) {
return element
}
// Delete it
delete this.elements[this.head]
// Increase the head index
this.head++
// Return the element
return element
}

peek () {
// If the queue is empty, return "undefined"
if (this.tail === this.head) {
return undefined
}
// Pick an element
return this.elements[this.head]
}

size () {
return Object.keys(this.elements).length
}

isEmpty () {
return this.size() === 0
}
}
41 changes: 38 additions & 3 deletions lib/TcpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const net = require('net')
const tls = require('tls')
const stream = require('stream')
const backoff = require('backoff')
const Queue = require('./Queue')

/**
* @typedef {object} TcpConnectionOptions
Expand All @@ -15,6 +16,9 @@ const backoff = require('backoff')
* @prop {number} [reconnectTries] Number of times to attempt reconnection before giving up. Default: `Infinity`
* @prop {string?} [unixsocket] the unix socket path for the destination. Default: ``.
* @prop {stream.Readable?} [sourceStream]
* @prop {boolean} [recovery] Enable recovering data when reconnecting after the connection was dropped. Default: `false`.
* @prop {({data: Buffer, encoding: string}) => number?} [recoveryQueueSizeCalculation] Function used to calculate the size of stored elements. Default: `element => element.data.length + element.encoding.length`.
* @prop {number?} [recoveryQueueMaxSize] Positive integer to track the sizes of items added to the cache, and automatically evict items in order to stay below this size. Default: `Infinity`
*/

/**
Expand All @@ -32,7 +36,9 @@ module.exports = function factory (userOptions) {
address: '127.0.0.1',
port: 514,
reconnect: false,
reconnectTries: Infinity
reconnectTries: Infinity,
recovery: false,
recoveryQueueSizeCalculation: (element) => element.data.length + element.encoding.length
},
userOptions
)
Expand All @@ -46,13 +52,22 @@ module.exports = function factory (userOptions) {
let connected = false
let connecting = false
let socketError = null
const recoveryQueue = new Queue({
maxSize: options.recoveryQueueMaxSize,
sizeCalculation: options.recoveryQueueSizeCalculation
})

// This stream is the one returned to psock.js.
const outputStream = stream.Writable({
autoDestroy: true,
close () { socket.end() },
write (data, encoding, callback) {
socket.write(data)
socket.write(data, encoding, (err) => {
if (err && options.recovery) {
// unable to write data, will try later when the server becomes available again
recoveryQueue.enqueue({ data, encoding })
}
})
callback()
}
})
Expand Down Expand Up @@ -96,7 +111,12 @@ module.exports = function factory (userOptions) {
retry.failAfter(options.reconnectTries)
retry.on('ready', () => {
connect((err) => {
if (connected === false) return retry.backoff(err)
if (connected === false) {
return retry.backoff(err)
}
if (options.recovery) {
recoverEnqueuedData()
}
})
})
retry.on('fail', (err) => process.stderr.write(`could not reconnect: ${err.message}`))
Expand Down Expand Up @@ -142,6 +162,21 @@ module.exports = function factory (userOptions) {
.removeAllListeners('error')
}

function recoverEnqueuedData () {
if (recoveryQueue.isEmpty()) {
return
}
const item = recoveryQueue.peek()
socket.write(item.data, item.encoding, (err) => {
if (err) {
// stop
return
}
recoveryQueue.dequeue()
recoverEnqueuedData()
})
}

connect(() => {
// TODO we must propagate the events from the socket to the outputStream
outputStream.emit('open')
Expand Down
96 changes: 96 additions & 0 deletions test/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
'use strict'
/* eslint-env node, mocha */

const Queue = require('../lib/Queue')
const { expect } = require('chai')

suite('Queue')

test('#enqueue elements', function () {
const q = new Queue()
q.enqueue('1')
q.enqueue('2')
q.enqueue('3')
q.enqueue('4')
expect(q.size()).to.eq(4)
expect(q.dequeue()).to.eq('1')
expect(q.dequeue()).to.eq('2')
expect(q.dequeue()).to.eq('3')
expect(q.size()).to.eq(1)
expect(q.dequeue()).to.eq('4')
expect(q.size()).to.eq(0)
expect(q.dequeue()).to.eq(undefined) // empty
expect(q.size()).to.eq(0)
})

test('#dequeue non empty queue', function () {
const q = new Queue()
q.enqueue('1')
expect(q.dequeue()).to.eq('1')
q.enqueue('2')
expect(q.dequeue()).to.eq('2')
expect(q.dequeue()).to.eq(undefined) // empty
expect(q.size()).to.eq(0)
})

test('#dequeue empty queue', function () {
const q = new Queue()
expect(q.dequeue()).to.eq(undefined) // empty
})

test('#enqueue with max size should evict first in (same size)', function () {
const q = new Queue({
maxSize: 10
})
q.enqueue('a')
q.enqueue('bc')
q.enqueue('de')
q.enqueue('f')
q.enqueue('g')
q.enqueue('hij')
q.enqueue('k') // exceed max size, will dequeue 'a' to make space
expect(q.size()).to.eq(6)
expect(q.dequeue()).to.eq('bc')
})

test('#enqueue with max size should evict first in (different size)', function () {
const q = new Queue({
maxSize: 10
})
q.enqueue('abc')
q.enqueue('de')
q.enqueue('f')
q.enqueue('ghi')
q.enqueue('j')
q.enqueue('k')
q.enqueue('l') // exceed max size, will dequeue 'abc' to make space
expect(q.size()).to.eq(6)
expect(q.dequeue()).to.eq('de')
})

test('#enqueue with max size should evict until the total size is below max size', function () {
const q = new Queue({
maxSize: 10
})
q.enqueue('a')
q.enqueue('b')
q.enqueue('c')
q.enqueue('def')
q.enqueue('ghi')
q.enqueue('klmno') // exceed max size, will dequeue 'a', 'b', 'c' and 'def' to make space
expect(q.size()).to.eq(2)
expect(q.dequeue()).to.eq('ghi')
})

test('#enqueue an element that exceeds max size', function (done) {
const q = new Queue({
maxSize: 2
})
process.on('warning', (event) => {
expect(event.message).to.eq('Unable to enqueue element because element size 3 is greater than maxSize 2')
done()
})
q.enqueue('abc') // should emit a warning
expect(q.size()).to.eq(0)
expect(q.dequeue()).to.eq(undefined)
})
97 changes: 97 additions & 0 deletions test/recovery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict'
/* eslint-env node, mocha */

const TcpConnection = require('../lib/TcpConnection')
const { expect } = require('chai')
const net = require('net')

function startServer ({ address, port, next }) {
const socket = net.createServer((connection) => {
connection.on('data', (data) => {
next({ action: 'data', data })
connection.destroy()
})
})

socket.listen(port || 0, address || '127.0.0.1', () => {
next({
action: 'started',
address: socket.address().address,
port: socket.address().port
})
})

return socket
}

test('recovery', function (done) {
let address
let port
let tcpConnection
let counter = 0
let closing = false
const received = []

function sendData () {
setInterval(() => {
counter++
tcpConnection.write(`log${counter}\n`, 'utf8')
}, 50)
}

function startSecondServer () {
const secondServer = startServer({
address,
port,
next: (msg) => {
switch (msg.action) {
case 'data':
received.push(msg)
if (!closing) {
closing = true
setTimeout(() => {
secondServer.close(() => {
const logs = received
.map(it => it.data.toString('utf8'))
.reduce((previousValue, currentValue) => previousValue + currentValue)
.split('\n')
.filter(it => it !== '')
const logNumbers = logs.map(it => parseInt(it.replace('log', '')))
expect(logs.length).to.eq(logNumbers[logNumbers.length - 1])
// make sure that no number is missing
expect(logNumbers).to.deep.eq(Array.from({ length: logNumbers.length }, (_, i) => i + 1))
done()
})
}, 200) // wait recovery a bit to make sure that enqueued data have been recovered
}
break
}
}
})
}

const firstServer = startServer({
next: (msg) => {
switch (msg.action) {
case 'started':
address = msg.address
port = msg.port
tcpConnection = TcpConnection({
address,
port,
reconnect: true,
recovery: true
})
sendData()
break
case 'data':
received.push(msg)
firstServer.close(() => {
// first server is closed
setTimeout(startSecondServer, 50)
})
break
}
}
})
})

0 comments on commit b5abeb1

Please sign in to comment.