diff --git a/Readme.md b/Readme.md index bae9cb0..40e8cf4 100644 --- a/Readme.md +++ b/Readme.md @@ -36,21 +36,26 @@ pino(transport) ### Options -| Name | Description | -|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `address` | The host address to connect to. Default: `127.0.0.1`. | -| `port` | The host port to connect to. Default: `514`. | -| `unixsocket` | The unix socket path for the destination. Default: `​`. | -| `mode` | Either `tcp` or `udp`. Default: `udp`. | -| `secure` | Enable secure (TLS) connection. Default: false. | -| `noverify` | Allow connection to server with self-signed certificates. Default: false. | -| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. | -| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. | -| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`. | -| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. | -| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. | -| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. | -| `onBeforeDataWrite` | Function used to manipulate TCP data before being written to the socket. Operations preformed here must be synchronous. Format: `(data) => Buffer`. Default: `null` | +| Name | Description | +|---------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `address` | The host address to connect to. Default: `127.0.0.1`. | +| `port` | The host port to connect to. Default: `514`. | +| `unixsocket` | The unix socket path for the destination. Default: `​`. | +| `mode` | Either `tcp` or `udp`. Default: `udp`. | +| `secure` | Enable secure (TLS) connection. Default: false. | +| `noverify` | Allow connection to server with self-signed certificates. Default: false. | +| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. | +| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. | +| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. lternatively, you can configure the backoff strategy using primitive data (see below). Default: `new FibonacciStrategy()`. | +| `backoffStrategy.name` | The backoff strategy name, either `exponential` or `fibonacci`. Default: `fibonacci`. | +| `backoffStrategy.randomisationFactor` | The backoff randomisation factor, must be between 0 and 1. Default: `0`. | +| `backoffStrategy.initialDelay` | The backoff initial delay in milliseconds. Default: `100`. | +| `backoffStrategy.maxDelay` | The backoff maximum delay in milliseconds. Default: `10000`. | +| `backoffStrategy.factor` | The exponential backoff factor, must be greater than 1. Default: `2`. | +| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. | +| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. | +| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. | +| `onBeforeDataWrite` | Function used to manipulate TCP data before being written to the socket. Operations preformed here must be synchronous. Format: `(data) => Buffer`. Default: `null` | ### Events diff --git a/lib/TcpConnection.js b/lib/TcpConnection.js index df940ba..b6c937b 100644 --- a/lib/TcpConnection.js +++ b/lib/TcpConnection.js @@ -3,7 +3,7 @@ const net = require('net') const tls = require('tls') const stream = require('stream') -const { Backoff, FibonacciStrategy } = require('backoff') +const { Backoff, FibonacciStrategy, ExponentialStrategy } = require('backoff') const Queue = require('./Queue') /** @@ -15,7 +15,12 @@ const Queue = require('./Queue') * @prop {boolean} [reconnect] Enable reconnecting to dropped TCP destinations. Default: false. * @prop {number} [reconnectTries] Number of times to attempt reconnection before giving up. Default: `Infinity` * @prop {string?} [unixsocket] The unix socket path for the destination. Default: ``. - * @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`. + * @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Alternatively, you can configure the backoff strategy using primitive data. Default: `new FibonacciStrategy()`. + * @prop {string} [backoffStrategy.name] The backoff strategy name, either `exponential` or `fibonacci`. Default: `fibonacci`. + * @prop {number} [backoffStrategy.randomisationFactor] The backoff randomisation factor, must be between 0 and 1. Default: `0`. + * @prop {number} [backoffStrategy.initialDelay] The backoff initial delay in milliseconds. Default: `100`. + * @prop {number} [backoffStrategy.maxDelay] The backoff maximum delay in milliseconds. Default: `10000`. + * @prop {number} [backoffStrategy.factor] The exponential backoff factor, must be greater than 1. Default: `2`. * @prop {stream.Readable?} [sourceStream] * @prop {boolean} [recovery] Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. * @prop {({data: Buffer, encoding: string}) => number?} [recoveryQueueSizeCalculation] Function used to calculate the size of stored items. Default: `item => item.data.length + item.encoding.length`. @@ -49,7 +54,7 @@ module.exports = function factory (userOptions) { const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream') ? options.sourceStream - : buildCliSourceStream() + : buildStdinSourceStream() /** @type {net.Socket} */ let socket = null @@ -205,7 +210,7 @@ module.exports = function factory (userOptions) { }) function createRetryBackoff () { - const retry = new Backoff(options.backoffStrategy ? options.backoffStrategy : new FibonacciStrategy()) + const retry = new Backoff(getBackoffStrategy(options)) retry.failAfter(options.reconnectTries) retry.on('ready', () => { connect((err) => { @@ -226,7 +231,22 @@ module.exports = function factory (userOptions) { return outputStream } -function buildCliSourceStream () { +function getBackoffStrategy (options) { + if (options.backoffStrategy) { + if (options.backoffStrategy.constructor && options.backoffStrategy.constructor.name === 'Object') { + // primitive data + if (options.backoffStrategy.name === 'exponential') { + return new ExponentialStrategy(options.backoffStrategy) + } + return new FibonacciStrategy(options.backoffStrategy) + } + // backoff strategy instance + return options.backoffStrategy + } + return new FibonacciStrategy() +} + +function buildStdinSourceStream () { // We use this passthrough to buffer incoming messages. const inputStream = new stream.PassThrough() process.stdin.pipe(inputStream) diff --git a/test/tcpBackoff.js b/test/tcpBackoff.js index ad6bffa..9304006 100644 --- a/test/tcpBackoff.js +++ b/test/tcpBackoff.js @@ -4,6 +4,7 @@ const TcpConnection = require('../lib/TcpConnection') const { ExponentialStrategy } = require('backoff') const { expect } = require('chai') +const { performance } = require('perf_hooks') test('tcp backoff', function testTcpBackoff (done) { let closeCount = 0 @@ -23,7 +24,37 @@ test('tcp backoff', function testTcpBackoff (done) { const nextBackoffDelay = exponentialStrategy.next() // initial, 10, 100... next delay should be 1000 expect(nextBackoffDelay).to.eq(1000) - tcpConnection.end(() => done()) + tcpConnection.end(() => { + process.stdin.removeAllListeners() + done() + }) + } + }) +}) + +test('tcp backoff (primitive data)', function testTcpBackoffUsingPrimitiveData (done) { + let closeCount = 0 + const tcpConnection = TcpConnection({ + address: '127.0.0.1', + port: 0, + reconnect: true, + backoffStrategy: { + name: 'exponential', + initialDelay: 10, + factor: 10 // 10, 100, 1000, 2000... + } + }) + const start = performance.now() + tcpConnection.on('socketClose', () => { + closeCount++ + if (closeCount === 3) { + // initial, 10, 100... next delay should be 1000 + const elapsed = performance.now() - start + expect(elapsed).to.be.below(1000) + tcpConnection.end(() => { + process.stdin.removeAllListeners() + done() + }) } }) }) diff --git a/test/tcpReconnect.js b/test/tcpReconnect.js index dc08a07..b47548d 100644 --- a/test/tcpReconnect.js +++ b/test/tcpReconnect.js @@ -137,6 +137,7 @@ test('tcp reconnect after initial failure', async function testTcpReconnectAfter expect(received.length).to.eq(1) expect(received[0].data.toString('utf8')).to.eq(`log${counter}\n`) tcpConnection.end() + process.stdin.removeAllListeners() }) test('tcp no reconnect when socket is gracefully closed', function testTcpNoReconnectSocketGracefullyClosed (done) { @@ -165,6 +166,7 @@ test('tcp no reconnect when socket is gracefully closed', function testTcpNoReco expect(msgCount).to.eq(1) // tcp connection should be closed! server.close(() => { + process.stdin.removeAllListeners() done() }) })