Skip to content

Commit

Permalink
resolves #84 configure backoff strategy using primitive data (#88)
Browse files Browse the repository at this point in the history
* resolves #84 configure backoff strategy using primitive data

* require perf_hooks
  • Loading branch information
ggrossetie authored Aug 17, 2022
1 parent 54525da commit 8020f8b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 21 deletions.
35 changes: 20 additions & 15 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,26 @@ pino(transport)

### Options

| Name | Description |
|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `address` | The host address to connect to. Default: `127.0.0.1`. |
| `port` | The host port to connect to. Default: `514`. |
| `unixsocket` | The unix socket path for the destination. Default: `​`. |
| `mode` | Either `tcp` or `udp`. Default: `udp`. |
| `secure` | Enable secure (TLS) connection. Default: false. |
| `noverify` | Allow connection to server with self-signed certificates. Default: false. |
| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. |
| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. |
| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`. |
| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. |
| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. |
| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. |
| `onBeforeDataWrite` | Function used to manipulate TCP data before being written to the socket. Operations preformed here must be synchronous. Format: `(data) => Buffer`. Default: `null` |
| Name | Description |
|---------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `address` | The host address to connect to. Default: `127.0.0.1`. |
| `port` | The host port to connect to. Default: `514`. |
| `unixsocket` | The unix socket path for the destination. Default: `​`. |
| `mode` | Either `tcp` or `udp`. Default: `udp`. |
| `secure` | Enable secure (TLS) connection. Default: false. |
| `noverify` | Allow connection to server with self-signed certificates. Default: false. |
| `reconnect` | Enable reconnecting to dropped TCP destinations. Default: false. |
| `reconnectTries` | Number of times to attempt reconnection before giving up. Default: `Infinity`. |
| `backoffStrategy` | The backoff strategy to use on TCP destinations. The backoff strategy must implement the `BackoffStrategy` interface. lternatively, you can configure the backoff strategy using primitive data (see below). Default: `new FibonacciStrategy()`. |
| `backoffStrategy.name` | The backoff strategy name, either `exponential` or `fibonacci`. Default: `fibonacci`. |
| `backoffStrategy.randomisationFactor` | The backoff randomisation factor, must be between 0 and 1. Default: `0`. |
| `backoffStrategy.initialDelay` | The backoff initial delay in milliseconds. Default: `100`. |
| `backoffStrategy.maxDelay` | The backoff maximum delay in milliseconds. Default: `10000`. |
| `backoffStrategy.factor` | The exponential backoff factor, must be greater than 1. Default: `2`. |
| `recovery` | Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`. |
| `recoveryQueueMaxSize` | The maximum size of items added to the queue. When reached, oldest items "First In" will be evicted to stay below this size. Default: `1024`. |
| `recoveryQueueSizeCalculation` | Function used to calculate the size of stored items. The item is passed as the first argument and contains a `data` (Buffer) and `encoding` (String) attribute. Default: `(item) => item.data.length + item.encoding.length`. |
| `onBeforeDataWrite` | Function used to manipulate TCP data before being written to the socket. Operations preformed here must be synchronous. Format: `(data) => Buffer`. Default: `null` |

### Events

Expand Down
30 changes: 25 additions & 5 deletions lib/TcpConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const net = require('net')
const tls = require('tls')
const stream = require('stream')
const { Backoff, FibonacciStrategy } = require('backoff')
const { Backoff, FibonacciStrategy, ExponentialStrategy } = require('backoff')
const Queue = require('./Queue')

/**
Expand All @@ -15,7 +15,12 @@ const Queue = require('./Queue')
* @prop {boolean} [reconnect] Enable reconnecting to dropped TCP destinations. Default: false.
* @prop {number} [reconnectTries] Number of times to attempt reconnection before giving up. Default: `Infinity`
* @prop {string?} [unixsocket] The unix socket path for the destination. Default: ``.
* @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Default: `new FibonacciStrategy()`.
* @prop {BackoffStrategy?} [backoffStrategy] The backoff strategy to use. The backoff strategy must implement the `BackoffStrategy` interface. Alternatively, you can configure the backoff strategy using primitive data. Default: `new FibonacciStrategy()`.
* @prop {string} [backoffStrategy.name] The backoff strategy name, either `exponential` or `fibonacci`. Default: `fibonacci`.
* @prop {number} [backoffStrategy.randomisationFactor] The backoff randomisation factor, must be between 0 and 1. Default: `0`.
* @prop {number} [backoffStrategy.initialDelay] The backoff initial delay in milliseconds. Default: `100`.
* @prop {number} [backoffStrategy.maxDelay] The backoff maximum delay in milliseconds. Default: `10000`.
* @prop {number} [backoffStrategy.factor] The exponential backoff factor, must be greater than 1. Default: `2`.
* @prop {stream.Readable?} [sourceStream]
* @prop {boolean} [recovery] Enable a recovery mode when the TCP connection is lost which store data in a memory queue (FIFO) until the queue max size is reached or the TCP connection is restored. Default: `false`.
* @prop {({data: Buffer, encoding: string}) => number?} [recoveryQueueSizeCalculation] Function used to calculate the size of stored items. Default: `item => item.data.length + item.encoding.length`.
Expand Down Expand Up @@ -49,7 +54,7 @@ module.exports = function factory (userOptions) {

const sourceStream = Object.prototype.hasOwnProperty.call(options, 'sourceStream')
? options.sourceStream
: buildCliSourceStream()
: buildStdinSourceStream()

/** @type {net.Socket} */
let socket = null
Expand Down Expand Up @@ -205,7 +210,7 @@ module.exports = function factory (userOptions) {
})

function createRetryBackoff () {
const retry = new Backoff(options.backoffStrategy ? options.backoffStrategy : new FibonacciStrategy())
const retry = new Backoff(getBackoffStrategy(options))
retry.failAfter(options.reconnectTries)
retry.on('ready', () => {
connect((err) => {
Expand All @@ -226,7 +231,22 @@ module.exports = function factory (userOptions) {
return outputStream
}

function buildCliSourceStream () {
function getBackoffStrategy (options) {
if (options.backoffStrategy) {
if (options.backoffStrategy.constructor && options.backoffStrategy.constructor.name === 'Object') {
// primitive data
if (options.backoffStrategy.name === 'exponential') {
return new ExponentialStrategy(options.backoffStrategy)
}
return new FibonacciStrategy(options.backoffStrategy)
}
// backoff strategy instance
return options.backoffStrategy
}
return new FibonacciStrategy()
}

function buildStdinSourceStream () {
// We use this passthrough to buffer incoming messages.
const inputStream = new stream.PassThrough()
process.stdin.pipe(inputStream)
Expand Down
33 changes: 32 additions & 1 deletion test/tcpBackoff.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
const TcpConnection = require('../lib/TcpConnection')
const { ExponentialStrategy } = require('backoff')
const { expect } = require('chai')
const { performance } = require('perf_hooks')

test('tcp backoff', function testTcpBackoff (done) {
let closeCount = 0
Expand All @@ -23,7 +24,37 @@ test('tcp backoff', function testTcpBackoff (done) {
const nextBackoffDelay = exponentialStrategy.next()
// initial, 10, 100... next delay should be 1000
expect(nextBackoffDelay).to.eq(1000)
tcpConnection.end(() => done())
tcpConnection.end(() => {
process.stdin.removeAllListeners()
done()
})
}
})
})

test('tcp backoff (primitive data)', function testTcpBackoffUsingPrimitiveData (done) {
let closeCount = 0
const tcpConnection = TcpConnection({
address: '127.0.0.1',
port: 0,
reconnect: true,
backoffStrategy: {
name: 'exponential',
initialDelay: 10,
factor: 10 // 10, 100, 1000, 2000...
}
})
const start = performance.now()
tcpConnection.on('socketClose', () => {
closeCount++
if (closeCount === 3) {
// initial, 10, 100... next delay should be 1000
const elapsed = performance.now() - start
expect(elapsed).to.be.below(1000)
tcpConnection.end(() => {
process.stdin.removeAllListeners()
done()
})
}
})
})
2 changes: 2 additions & 0 deletions test/tcpReconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ test('tcp reconnect after initial failure', async function testTcpReconnectAfter
expect(received.length).to.eq(1)
expect(received[0].data.toString('utf8')).to.eq(`log${counter}\n`)
tcpConnection.end()
process.stdin.removeAllListeners()
})

test('tcp no reconnect when socket is gracefully closed', function testTcpNoReconnectSocketGracefullyClosed (done) {
Expand Down Expand Up @@ -165,6 +166,7 @@ test('tcp no reconnect when socket is gracefully closed', function testTcpNoReco
expect(msgCount).to.eq(1)
// tcp connection should be closed!
server.close(() => {
process.stdin.removeAllListeners()
done()
})
})
Expand Down

0 comments on commit 8020f8b

Please sign in to comment.