Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolves #84 configure backoff strategy using primitive data #88

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,26 @@ pino(transport)

### Options

| Name | Description |
|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `address` | The host address to connect to. Default: `127.0.0.1`. |
| `port` | The host port to connect to. Default: `514`. |
| `unixsocket` | The unix socket path for the destination. Default: `​`. |
| `mode` | Either `tcp` or `udp`. Default: `udp`. |
| `secure` | Enable secure (TLS) connection. Default: false. |
| `noverify` | Allow connection to server with self-signed certificates. Default: false. |
| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. |
| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. |
| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`. |
| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. |
| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. |
| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. |
| `onBeforeDataWrite` | Function used to manipulate TCP data before being written to the socket. Operations preformed here must be synchronous. Format: `(data) => Buffer`. Default: `null` |
| Name | Description |
|---------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `address` | The host address to connect to. Default: `127.0.0.1`. |
| `port` | The host port to connect to. Default: `514`. |
| `unixsocket` | The unix socket path for the destination. Default: `​`. |
| `mode` | Either `tcp` or `udp`. Default: `udp`. |
| `secure` | Enable secure (TLS) connection. Default: false. |
| `noverify` | Allow connection to server with self-signed certificates. Default: false. |
| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. |
| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. |
| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. lternatively, you can configure the backoff strategy using primitive data (see below). Default: `new FibonacciStrategy()`. |
| `backoffStrategy.name` | The backoff strategy name, either `exponential` or `fibonacci`. Default: `fibonacci`. |
| `backoffStrategy.randomisationFactor` | The backoff randomisation factor, must be between 0 and 1. Default: `0`. |
| `backoffStrategy.initialDelay` | The backoff initial delay in milliseconds. Default: `100`. |
| `backoffStrategy.maxDelay` | The backoff maximum delay in milliseconds. Default: `10000`. |
| `backoffStrategy.factor` | The exponential backoff factor, must be greater than 1. Default: `2`. |
| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. |
| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. |
| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. |
| `onBeforeDataWrite` | Function used to manipulate TCP data before being written to the socket. Operations preformed here must be synchronous. Format: `(data) => Buffer`. Default: `null` |

### Events

Expand Down
30 changes: 25 additions & 5 deletions lib/TcpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const net = require('net')
const tls = require('tls')
const stream = require('stream')
const { Backoff, FibonacciStrategy } = require('backoff')
const { Backoff, FibonacciStrategy, ExponentialStrategy } = require('backoff')
const Queue = require('./Queue')

/**
Expand All @@ -15,7 +15,12 @@ const Queue = require('./Queue')
* @prop {boolean} [reconnect] Enable reconnecting to dropped TCP destinations. Default: false.
* @prop {number} [reconnectTries] Number of times to attempt reconnection before giving up. Default: `Infinity`
* @prop {string?} [unixsocket] The unix socket path for the destination. Default: ``.
* @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`.
* @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Alternatively, you can configure the backoff strategy using primitive data. Default: `new FibonacciStrategy()`.
* @prop {string} [backoffStrategy.name] The backoff strategy name, either `exponential` or `fibonacci`. Default: `fibonacci`.
* @prop {number} [backoffStrategy.randomisationFactor] The backoff randomisation factor, must be between 0 and 1. Default: `0`.
* @prop {number} [backoffStrategy.initialDelay] The backoff initial delay in milliseconds. Default: `100`.
* @prop {number} [backoffStrategy.maxDelay] The backoff maximum delay in milliseconds. Default: `10000`.
* @prop {number} [backoffStrategy.factor] The exponential backoff factor, must be greater than 1. Default: `2`.
* @prop {stream.Readable?} [sourceStream]
* @prop {boolean} [recovery] Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`.
* @prop {({data: Buffer, encoding: string}) => number?} [recoveryQueueSizeCalculation] Function used to calculate the size of stored items. Default: `item => item.data.length + item.encoding.length`.
Expand Down Expand Up @@ -49,7 +54,7 @@ module.exports = function factory (userOptions) {

const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream')
? options.sourceStream
: buildCliSourceStream()
: buildStdinSourceStream()

/** @type {net.Socket} */
let socket = null
Expand Down Expand Up @@ -205,7 +210,7 @@ module.exports = function factory (userOptions) {
})

function createRetryBackoff () {
const retry = new Backoff(options.backoffStrategy ? options.backoffStrategy : new FibonacciStrategy())
const retry = new Backoff(getBackoffStrategy(options))
retry.failAfter(options.reconnectTries)
retry.on('ready', () => {
connect((err) => {
Expand All @@ -226,7 +231,22 @@ module.exports = function factory (userOptions) {
return outputStream
}

function buildCliSourceStream () {
function getBackoffStrategy (options) {
if (options.backoffStrategy) {
if (options.backoffStrategy.constructor && options.backoffStrategy.constructor.name === 'Object') {
// primitive data
if (options.backoffStrategy.name === 'exponential') {
return new ExponentialStrategy(options.backoffStrategy)
}
return new FibonacciStrategy(options.backoffStrategy)
}
// backoff strategy instance
return options.backoffStrategy
}
return new FibonacciStrategy()
}

function buildStdinSourceStream () {
// We use this passthrough to buffer incoming messages.
const inputStream = new stream.PassThrough()
process.stdin.pipe(inputStream)
Expand Down
33 changes: 32 additions & 1 deletion test/tcpBackoff.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const TcpConnection = require('../lib/TcpConnection')
const { ExponentialStrategy } = require('backoff')
const { expect } = require('chai')
const { performance } = require('perf_hooks')

test('tcp backoff', function testTcpBackoff (done) {
let closeCount = 0
Expand All @@ -23,7 +24,37 @@ test('tcp backoff', function testTcpBackoff (done) {
const nextBackoffDelay = exponentialStrategy.next()
// initial, 10, 100... next delay should be 1000
expect(nextBackoffDelay).to.eq(1000)
tcpConnection.end(() => done())
tcpConnection.end(() => {
process.stdin.removeAllListeners()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to explicitly remove all listeners on process.stdin otherwise we get:

(node:32722) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added to [ReadStream]. Use emitter.setMaxListeners() to increase limit
    at _addListener (node:events:595:17)
    at ReadStream.addListener (node:events:617:10)
    at ReadStream.Readable.on (node:internal/streams/readable:876:35)
    at ReadStream.once (node:events:661:8)
    at ReadStream.Readable.pipe (node:internal/streams/readable:678:9)
    at buildCliSourceStream (/path/to/pino-socket/lib/TcpConnection.js:252:17)
    at factory (/path/to/pino-socket/lib/TcpConnection.js:57:7)
    at Context.<anonymous> (/path/to/pino-socket/test/tcpSocketError.js:31:25)
    at callFnAsync (/path/to/pino-socket/node_modules/mocha/lib/runnable.js:394:21)
    at Test.Runnable.run (/path/to/pino-socket/node_modules/mocha/lib/runnable.js:338:7)
    at Runner.runTest (/path/to/pino-socket/node_modules/mocha/lib/runner.js:666:10)
    at /path/to/pino-socket/node_modules/mocha/lib/runner.js:789:12
    at next (/path/to/pino-socket/node_modules/mocha/lib/runner.js:581:14)
    at /path/to/pino-socket/node_modules/mocha/lib/runner.js:591:7
    at next (/path/to/pino-socket/node_modules/mocha/lib/runner.js:474:14)
    at Immediate.<anonymous> (/path/to/pino-socket/node_modules/mocha/lib/runner.js:559:5)
    at processImmediate (node:internal/timers:466:21)

Not sure if there's a better way to fix this issue...

done()
})
}
})
})

test('tcp backoff (primitive data)', function testTcpBackoffUsingPrimitiveData (done) {
let closeCount = 0
const tcpConnection = TcpConnection({
address: '127.0.0.1',
port: 0,
reconnect: true,
backoffStrategy: {
name: 'exponential',
initialDelay: 10,
factor: 10 // 10, 100, 1000, 2000...
}
})
const start = performance.now()
tcpConnection.on('socketClose', () => {
closeCount++
if (closeCount === 3) {
// initial, 10, 100... next delay should be 1000
const elapsed = performance.now() - start
expect(elapsed).to.be.below(1000)
tcpConnection.end(() => {
process.stdin.removeAllListeners()
done()
})
}
})
})
2 changes: 2 additions & 0 deletions test/tcpReconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ test('tcp reconnect after initial failure', async function testTcpReconnectAfter
expect(received.length).to.eq(1)
expect(received[0].data.toString('utf8')).to.eq(`log${counter}\n`)
tcpConnection.end()
process.stdin.removeAllListeners()
})

test('tcp no reconnect when socket is gracefully closed', function testTcpNoReconnectSocketGracefullyClosed (done) {
Expand Down Expand Up @@ -165,6 +166,7 @@ test('tcp no reconnect when socket is gracefully closed', function testTcpNoReco
expect(msgCount).to.eq(1)
// tcp connection should be closed!
server.close(() => {
process.stdin.removeAllListeners()
done()
})
})
Expand Down