Skip to content

Commit

Permalink
resolves #72 introduce a recovery queue (#74)
Browse files Browse the repository at this point in the history
* resolves #72 introduce a recovery queue

* remove suite

* ensure that no error is thrown

* default maxSize 1024 and remove arbitrary setTimeout

* add documentation

* copy editing and emit a warning when recovery failed

* fix typo in comment

* events consistency and update CLI

* improve doc

* copy editing

* fix for node 14
  • Loading branch information
ggrossetie authored Jul 21, 2022
1 parent 989b395 commit a89fd71
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 33 deletions.
37 changes: 25 additions & 12 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,29 @@ pino(transport)

### Options

| Name | Description |
|-------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| `unixsocket` | The unix socket path for the destination. Default: `​`. |
| `address` | The host address to connect to. Default: `127.0.0.1`. |
| `port` | The host port to connect to. Default: `514`. |
| `mode` | Either `tcp` or `udp`. Default: `udp`. |
| `secure` | Enable secure (TLS) connection. Default: false. |
| `noverify` | Allow connection to server with self-signed certificates. Default: false. |
| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. |
| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity` |
| `onSocketClose` | The callback when the socket is closed on TCP destinations. Default: `(socketError) => socketError && process.stderr.write(socketError.message)` |
| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`. |
| Name | Description |
|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `address` | The host address to connect to. Default: `127.0.0.1`. |
| `port` | The host port to connect to. Default: `514`. |
| `unixsocket` | The unix socket path for the destination. Default: `​`. |
| `mode` | Either `tcp` or `udp`. Default: `udp`. |
| `secure` | Enable secure (TLS) connection. Default: false. |
| `noverify` | Allow connection to server with self-signed certificates. Default: false. |
| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. |
| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. |
| `onSocketClose` | The callback when the socket is closed on TCP destinations. Default: `(socketError) => socketError && process.stderr.write(socketError.message)`. |
| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`. |
| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. |
| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. |
| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. |

### Events

| Name | Callback Signature | Description |
|---------|----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
| `open` | `(address: AddressInfo) => void` | Emitted when the TCP or UDP connection is established. |
| `error` | `(error: Error) => void` | Emitted when an error occurs on the TCP or UDP socket. |
| `close` | `(hadError: Boolean) => void` | Emitted after the TCP or UDP socket is closed. The argument `hadError` is a boolean which says if the socket was closed due to a transmission error. |

## Usage as Pino Legacy Transport

Expand Down Expand Up @@ -78,6 +89,8 @@ $ node foo | pino-socket -u /tmp/unix.sock
+ `--reconnectTries <n>` (`-t <n>`): set number (`<n>`) of reconnect attempts before giving up. Default: infinite.
+ `--echo` (`-e`): echo the received messages to stdout. Default: enabled.
+ `--no-echo` (`-ne`): disable echoing received messages to stdout.
+ `--recovery`: enable recovery mode for TCP (only works with `--mode=tcp`). Default: off.
+ `--recovery-queue-max-size <n>`: maximum size of items (`<n>`) added to the recovery queue. Default: 1024.

[rsyscee]: http://www.rsyslog.com/doc/mmjsonparse.html

Expand Down
21 changes: 11 additions & 10 deletions help.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

Flags
-h | --help Display Help
-v | --version display Version
-s | --settings read settings from a JSON file (switches take precedence)
-a | --address the address for the destination socket; default: 127.0.0.1
-m | --mode the protocol, either 'udp' or 'tcp'; default: 'udp'
-p | --port the port on the destination socket; default: '514'
-u | --unixsocket the path of unix socket, default: ''
-r | --reconnect enable tcp mode reconnecting
-t | --reconnectTries number of reconnect attempts to make; default 'Inifinity'
-ne | --no-echo disable echoing received messages to stdout

-v | --version display Version
-s | --settings read settings from a JSON file (switches take precedence)
-a | --address the address for the destination socket; default: 127.0.0.1
-m | --mode the protocol, either 'udp' or 'tcp'; default: 'udp'
-p | --port the port on the destination socket; default: '514'
-u | --unixsocket the path of unix socket; default: ''
-r | --reconnect enable tcp mode reconnecting
-t | --reconnectTries number of reconnect attempts to make; default: 'Inifinity'
-ne | --no-echo disable echoing received messages to stdout
| --recovery enable recovery mode
| --recovery-queue-max-size maximum size of items added to the recovery queue; default: 1024
98 changes: 98 additions & 0 deletions lib/Queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'use strict'

const { emitWarning } = require('process')

/**
* @typedef {object} QueueOptions
* @prop {number} [maxSize] Positive integer to track the sizes of items added to the cache, and automatically evict items in order to stay below this size. Default: `Infinity`
* @prop {(any) => number?} [sizeCalculation] Function used to calculate the size of stored items. Default: `item => item.length`.
*/

/**
* A linear data structure of a FIFO (First In - First Out) type.
*
* Inspired by:
* - https://vhudyma-blog.eu/implement-queue-in-javascript/
* - https://github.com/isaacs/node-lru-cache
*/
module.exports = class Queue {
/**
* @param {QueueOptions} opts
*/
constructor (opts) {
// The actual queue
this.items = {}
// The index of the head item
this.head = 0
// The index of the tail item
this.tail = 0
this.calculatedSize = 0
this.sizes = {}
this.opts = {
maxSize: Infinity,
sizeCalculation: (item) => item.length,
...opts
}
}

enqueue (item) {
const index = this.tail
const size = this.opts.sizeCalculation(item)
if (size > this.opts.maxSize) {
emitWarning(`unable to enqueue item because item size ${size} is greater than maxSize ${this.opts.maxSize}`)
return
}
this.sizes[index] = size
const maxSize = this.opts.maxSize - size
while (this.calculatedSize > maxSize) {
this.evict()
}
this.calculatedSize += size
// Add an item on the current tail index
this.items[index] = item
// Increase the index of the tail item
// So the next items are added at the end
this.tail++
}

evict () {
// If the queue is empty
if (this.tail === this.head) {
return undefined
}
const index = this.head
this.calculatedSize -= this.sizes[index]
delete this.sizes[index]
this.dequeue()
}

dequeue () {
const item = this.peek()
if (item === undefined) {
return item
}
// Delete it
delete this.items[this.head]
// Increase the head index
this.head++
// Return the item
return item
}

peek () {
// If the queue is empty, return "undefined"
if (this.tail === this.head) {
return undefined
}
// Pick an item
return this.items[this.head]
}

size () {
return Object.keys(this.items).length
}

isEmpty () {
return this.size() === 0
}
}
71 changes: 63 additions & 8 deletions lib/TcpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const net = require('net')
const tls = require('tls')
const stream = require('stream')
const { Backoff, FibonacciStrategy } = require('backoff')
const Queue = require('./Queue')

/**
* @typedef {object} TcpConnectionOptions
Expand All @@ -17,6 +18,9 @@ const { Backoff, FibonacciStrategy } = require('backoff')
* @prop {(error: Error|null) => void?} [onSocketClose] The callback when the socket is closed. Default: ``.
* @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`.
* @prop {stream.Readable?} [sourceStream]
* @prop {boolean} [recovery] Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`.
* @prop {({data: Buffer, encoding: string}) => number?} [recoveryQueueSizeCalculation] Function used to calculate the size of stored items. Default: `item => item.data.length + item.encoding.length`.
* @prop {number?} [recoveryQueueMaxSize] The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`.
*/

/**
Expand All @@ -35,7 +39,10 @@ module.exports = function factory (userOptions) {
port: 514,
reconnect: false,
reconnectTries: Infinity,
onSocketClose: (socketError) => socketError && process.stderr.write(socketError.message)
onSocketClose: (socketError) => socketError && process.stderr.write(socketError.message),
recovery: false,
recoveryQueueMaxSize: 1024,
recoveryQueueSizeCalculation: (item) => item.data.length + item.encoding.length
},
userOptions
)
Expand All @@ -49,15 +56,37 @@ module.exports = function factory (userOptions) {
let connected = false
let connecting = false
let socketError = null
const recoveryQueue = new Queue({
maxSize: options.recoveryQueueMaxSize,
sizeCalculation: options.recoveryQueueSizeCalculation
})

const retryBackoff = createRetryBackoff()

// This stream is the one returned to psock.js.
function handleSocketWriteError (err, data, encoding) {
outputStream.emit('error', new Error(`unable to write data to the TCP socket: ${err.message}`))
if (options.recovery) {
// unable to write data, will try later when the server becomes available again
recoveryQueue.enqueue({ data, encoding })
}
}

// this stream is the one returned to psock.js.
const outputStream = stream.Writable({
autoDestroy: true,
close () { socket.end() },
write (data, encoding, callback) {
socket.write(data)
// node 14 throws an Error if the socket has ended (instead of calling the callback with an Error)
// remind: can be removed once we drop node 14!
if (socket.writableEnded) {
handleSocketWriteError(new Error('This socket has been ended by the other party'), data, encoding)
} else {
socket.write(data, encoding, (err) => {
if (err) {
handleSocketWriteError(err, data, encoding)
}
})
}
callback()
}
})
Expand Down Expand Up @@ -111,20 +140,29 @@ module.exports = function factory (userOptions) {
options.onSocketClose(socketError)
}
}
if (options.reconnect) reconnect()
if (options.reconnect) {
reconnect()
} else {
outputStream.emit('close', hadError)
}
}

function connectListener () {}

function endListener () {
disconnect()
if (options.reconnect) reconnect()
if (options.reconnect) {
reconnect()
} else {
outputStream.emit('end')
}
}

function errorListener (err) {
socketError = err
outputStream.emit('error', socketError)
}
// end: connection listerners
// end: connection listeners

function addListeners () {
socket
Expand All @@ -142,9 +180,23 @@ module.exports = function factory (userOptions) {
.removeAllListeners('error')
}

function recoverEnqueuedData () {
if (recoveryQueue.isEmpty()) {
return
}
const item = recoveryQueue.peek()
socket.write(item.data, item.encoding, (err) => {
if (err) {
outputStream.emit('error', new Error(`unable to write data to the TCP socket while recovering data: ${err.message}`))
return
}
recoveryQueue.dequeue()
recoverEnqueuedData()
})
}

connect(() => {
// TODO we must propagate the events from the socket to the outputStream
outputStream.emit('open')
outputStream.emit('open', socket.address())
})

function createRetryBackoff () {
Expand All @@ -155,6 +207,9 @@ module.exports = function factory (userOptions) {
if (connected === false) {
return retry.backoff(err)
}
if (options.recovery) {
recoverEnqueuedData()
}
})
})
retry.on('fail', (err) => process.stderr.write(`could not reconnect: ${err.message}`))
Expand Down
6 changes: 5 additions & 1 deletion lib/UdpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ module.exports = function factory (userOptions) {
sourceStream.pipe(writableStream, { end: false })
}

socket.on('close', () => {
writableStream.emit('close', false)
})

socket.on('error', (err) => {
writableStream.emit('error', err)
})

socket.connect(options.port, options.address, () => {
writableStream.emit('open')
writableStream.emit('open', socket.address())
})

return writableStream
Expand Down
4 changes: 3 additions & 1 deletion lib/pino-transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ const defaultOptions = {
noverify: false,
reconnect: false,
reconnectTries: Infinity,
settings: null
settings: null,
recovery: false,
recoveryQueueMaxSize: 1024
}

async function socketTransport (opts) {
Expand Down
6 changes: 5 additions & 1 deletion psock.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ function cli () {
echo: Boolean,
help: Boolean,
version: Boolean,
settings: String
settings: String,
recovery: Boolean,
'recovery-queue-max-size': Number
}
const shortOpts = {
u: '--unixsocket',
Expand Down Expand Up @@ -79,6 +81,8 @@ function cli () {
connection = udpConnectionFactory(options)
}

connection.on('error', (err) => console.error(err.message))

function shutdown () {
try {
connection.close()
Expand Down
Loading

0 comments on commit a89fd71

Please sign in to comment.