Skip to content

Commit

Permalink
WIP: resolves pinojs#72 introduce a recovery queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ggrossetie committed Jul 9, 2022
1 parent 4d79b0e commit 830c87c
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 3 deletions.
55 changes: 55 additions & 0 deletions lib/Queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Linear data structure of a FIFO (First In - First Out) type.
* Reference: https://vhudyma-blog.eu/implement-queue-in-javascript/
*/
module.exports = class Queue {
constructor (opts) {
// The actual queue
this.elements = {}
// The index of the head element
this.head = 0
// The index of the tail element
this.tail = 0
// max
// maxSize = 0,
// sizeCalculation,
}

enqueue (element) {
// Add an element on the current tail index
this.elements[this.tail] = element
// Increase the index of the tail element
// So the next elements are added at the end
this.tail++
}

dequeue () {
const element = this.peek()
if (element === undefined) {
return element
}
// Delete it
delete this.elements[this.head]
// Increase the head index
this.head++
// Return the element
return element
}

peek () {
// If the queue is empty, return "undefined"
if (this.tail === this.head) {
return undefined
}
// Pick an element
return this.elements[this.head]
}

size () {
return Object.keys(this.elements).length
}

isEmpty () {
return this.size() === 0
}
}
33 changes: 30 additions & 3 deletions lib/TcpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const net = require('net')
const tls = require('tls')
const stream = require('stream')
const backoff = require('backoff')
const Queue = require('./Queue')

/**
* @typedef {object} TcpConnectionOptions
Expand Down Expand Up @@ -32,7 +33,8 @@ module.exports = function factory (userOptions) {
address: '127.0.0.1',
port: 514,
reconnect: false,
reconnectTries: Infinity
reconnectTries: Infinity,
onSocketDataWritten: () => {}
},
userOptions
)
Expand All @@ -46,13 +48,20 @@ module.exports = function factory (userOptions) {
let connected = false
let connecting = false
let socketError = null
const recoveryQueue = new Queue()

// This stream is the one returned to psock.js.
const outputStream = stream.Writable({
autoDestroy: true,
close () { socket.end() },
write (data, encoding, callback) {
socket.write(data)
socket.write(data, encoding, (err) => {
options.onSocketDataWritten(err)
if (err) {
// unable to write data, will try later when the server becomes available again
recoveryQueue.enqueue({ data, encoding })
}
})
callback()
}
})
Expand Down Expand Up @@ -96,7 +105,10 @@ module.exports = function factory (userOptions) {
retry.failAfter(options.reconnectTries)
retry.on('ready', () => {
connect((err) => {
if (connected === false) return retry.backoff(err)
if (connected === false) {
return retry.backoff(err)
}
recoverEnqueuedData()
})
})
retry.on('fail', (err) => process.stderr.write(`could not reconnect: ${err.message}`))
Expand Down Expand Up @@ -142,6 +154,21 @@ module.exports = function factory (userOptions) {
.removeAllListeners('error')
}

function recoverEnqueuedData () {
if (recoveryQueue.isEmpty()) {
return
}
const item = recoveryQueue.peek()
socket.write(item.data, item.encoding, (err) => {
if (err) {
// stop
return
}
recoveryQueue.dequeue()
recoverEnqueuedData()
})
}

connect(() => {
// TODO we must propagate the events from the socket to the outputStream
outputStream.emit('open')
Expand Down
22 changes: 22 additions & 0 deletions test/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'
/* eslint-env node, mocha */

const Queue = require('../lib/Queue')
const { expect } = require('chai')

test('queue', function () {
const q = new Queue()
q.enqueue('1')
q.enqueue('2')
q.enqueue('3')
q.enqueue('4')
expect(q.size()).to.eq(4)
expect(q.dequeue()).to.eq('1')
expect(q.dequeue()).to.eq('2')
expect(q.dequeue()).to.eq('3')
expect(q.size()).to.eq(1)
expect(q.dequeue()).to.eq('4')
expect(q.size()).to.eq(0)
expect(q.dequeue()).to.eq(undefined) // empty
expect(q.size()).to.eq(0)
})
96 changes: 96 additions & 0 deletions test/recovery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
'use strict'
/* eslint-env node, mocha */

const TcpConnection = require('../lib/TcpConnection')
const { expect } = require('chai')
const net = require('net')

function startServer ({ address, port, next }) {
const socket = net.createServer((connection) => {
connection.on('data', (data) => {
next({ action: 'data', data })
connection.destroy()
})
})

socket.listen(port || 0, address || '127.0.0.1', () => {
next({
action: 'started',
address: socket.address().address,
port: socket.address().port
})
})

return socket
}

test('recovery', function (done) {
let address
let port
let tcpConnection
let counter = 0
let closing = false
const received = []

function sendData () {
setInterval(() => {
counter++
tcpConnection.write(`log${counter}\n`, 'utf8')
}, 50)
}

function startSecondServer () {
const secondServer = startServer({
address,
port,
next: (msg) => {
switch (msg.action) {
case 'data':
received.push(msg)
if (!closing) {
closing = true
setTimeout(() => {
secondServer.close(() => {
const logs = received
.map(it => it.data.toString('utf8'))
.reduce((previousValue, currentValue) => previousValue + currentValue)
.split('\n')
.filter(it => it !== '')
const logNumbers = logs.map(it => parseInt(it.replace('log', '')))
expect(logs.length).to.eq(logNumbers[logNumbers.length - 1])
// make sure that no number is missing
expect(logNumbers).to.deep.eq(Array.from({ length: logNumbers.length }, (_, i) => i + 1))
done()
})
}, 200) // wait recovery a bit to make sure that enqueued data have been recovered
}
break
}
}
})
}

const firstServer = startServer({
next: (msg) => {
switch (msg.action) {
case 'started':
address = msg.address
port = msg.port
tcpConnection = TcpConnection({
address,
port,
reconnect: true
})
sendData()
break
case 'data':
received.push(msg)
firstServer.close(() => {
// first server is closed
setTimeout(startSecondServer, 50)
})
break
}
}
})
})

0 comments on commit 830c87c

Please sign in to comment.