From 56676d8189db58db2dc9babc63214526109dc8a5 Mon Sep 17 00:00:00 2001 From: Chris Chew Date: Thu, 26 Mar 2020 18:53:47 -0500 Subject: [PATCH 1/2] Added maxUses pool config option; Added dev setup notes to main readme --- README.md | 8 ++ packages/pg-pool/README.md | 26 ++++++ packages/pg-pool/index.js | 91 +++++++++++-------- .../pg-pool/test/bring-your-own-promise.js | 27 +++--- packages/pg-pool/test/error-handling.js | 2 +- packages/pg-pool/test/max-uses.js | 59 ++++++++++++ .../connection-pool/error-tests.js | 4 +- 7 files changed, 164 insertions(+), 53 deletions(-) create mode 100644 packages/pg-pool/test/max-uses.js diff --git a/README.md b/README.md index d22ac0c61..7c5d57904 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,14 @@ I will __happily__ accept your pull request if it: If your change involves breaking backwards compatibility please please point that out in the pull request & we can discuss & plan when and how to release it and what type of documentation or communication it will require. +### Setting up for local development + +1. Clone the repo +2. From your workspace root run `yarn` and then `yarn lerna bootstrap` +3. Ensure you have a PostgreSQL instance running with SSL enabled and an empty database for test +4. Ensure you have the proper environment variables configured for connecting to the instance +5. Run `yarn test` to run all the tests + ## Troubleshooting and FAQ The causes and solutions to common errors can be found among the [Frequently Asked Questions (FAQ)](https://github.com/brianc/node-postgres/wiki/FAQ) diff --git a/packages/pg-pool/README.md b/packages/pg-pool/README.md index b77b65d86..84ea67b65 100644 --- a/packages/pg-pool/README.md +++ b/packages/pg-pool/README.md @@ -34,6 +34,7 @@ var pool2 = new Pool({ max: 20, // set pool max size to 20 idleTimeoutMillis: 1000, // close idle clients after 1 second connectionTimeoutMillis: 1000, // return an error after 1 second if connection could not be established + maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion) }) //you can supply a custom client constructor @@ -330,6 +331,31 @@ var bluebirdPool = new Pool({ __please note:__ in node `<=0.12.x` the pool will throw if you do not provide a promise constructor in one of the two ways mentioned above. In node `>=4.0.0` the pool will use the native promise implementation by default; however, the two methods above still allow you to "bring your own." +## maxUses and read-replica autoscaling (e.g. AWS Aurora) + +The maxUses config option can help an application instance rebalance load against a replica set that has been auto-scaled after the point in time which the connection pool is full of healthy connections. + +The mechanism here is that a connection is considered "expended" after it has been acquired and released `maxUses` number of times. Depending on the load on your system, this means there will be an approximate time in which any given connection will live, thus creating a window for rebalancing. + +Imagine a scenario where you have 10 app instances providing an API running against a replica cluster of 3 that are accessed via a round-robin DNS entry. Each instance runs a connection pool size of 20. With an ambient load of 50 requests per second, the connection pool will fill likely up in a few minutes with healthy connections. + +If you have weekly bursts of traffic which peak at 1,000 requests per second, you might want to grow your replicas to 10 during this period. Without setting `maxUses`, the new replicas will not be adopted by the app servers without an intervention -- namely, restarting each in turn in order to build up a new connection pool that is balanced. Adding additional app server instances will help to some extent because they will adopt all the replicas in an even way, but the initial app servers will continue to focus additional load on the original replicas. + +However, setting `maxUses` to 7500, will ensure that over a period of 30 minutes or so the new replicas will be adopted as the pre-existing connections are closed and replaced with new ones, this creating a short window for eventual balance. + +You'll want to test based on your own scenarios, but one way to make a first guess at `maxUses` is to identify an acceptable window for rebalancing and then solve for the value: + +``` +maxUses = rebalanceWindowSeconds * totalRequestsPerSecond / numAppInstances / poolSize +``` + +In the example above, assuming we acquire and release 1 connection per request and we are aiming for a 30 minute rebalancing window: + +``` +maxUses = rebalanceWindowSeconds * totalRequestsPerSecond / numAppInstances / poolSize + 7200 = 1800 * 1000 / 10 / 25 +``` + ## tests To run tests clone the repo, `npm i` in the working dir, and then run `npm test` diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 83ec51e09..1950f716b 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -11,9 +11,16 @@ const removeWhere = (list, predicate) => { : list.splice(i, 1)[0] } -class IdleItem { - constructor (client, idleListener, timeoutId) { +class ClientItem { + constructor(client, useCount) { this.client = client + this.useCount = useCount + } +} + +class IdleItem { + constructor (clientItem, idleListener, timeoutId) { + this.clientItem = clientItem this.idleListener = idleListener this.timeoutId = timeoutId } @@ -45,18 +52,18 @@ function promisify (Promise, callback) { return { callback: cb, result: result } } -function makeIdleListener (pool, client) { +function makeIdleListener (pool, clientItem) { return function idleListener (err) { - err.client = client + err.client = clientItem.client - client.removeListener('error', idleListener) - client.on('error', () => { + clientItem.client.removeListener('error', idleListener) + clientItem.client.on('error', () => { pool.log('additional client error after disconnection due to error', err) }) - pool._remove(client) + pool._remove(clientItem) // TODO - document that once the pool emits an error // the client has already been closed & purged and is unusable - pool.emit('error', err, client) + pool.emit('error', err, clientItem.client) } } @@ -65,6 +72,7 @@ class Pool extends EventEmitter { super() this.options = Object.assign({}, options) this.options.max = this.options.max || this.options.poolSize || 10 + this.options.maxUses = this.options.maxUses || Infinity this.log = this.options.log || function () { } this.Client = this.options.Client || Client || require('pg').Client this.Promise = this.options.Promise || global.Promise @@ -95,7 +103,7 @@ class Pool extends EventEmitter { this.log('pulse queue on ending') if (this._idle.length) { this._idle.slice().map(item => { - this._remove(item.client) + this._remove(item.clientItem) }) } if (!this._clients.length) { @@ -117,10 +125,10 @@ class Pool extends EventEmitter { if (this._idle.length) { const idleItem = this._idle.pop() clearTimeout(idleItem.timeoutId) - const client = idleItem.client + const clientItem = idleItem.clientItem const idleListener = idleItem.idleListener - return this._acquireClient(client, pendingItem, idleListener, false) + return this._acquireClient(clientItem, pendingItem, idleListener, false) } if (!this._isFull()) { return this.newClient(pendingItem) @@ -128,19 +136,19 @@ class Pool extends EventEmitter { throw new Error('unexpected condition') } - _remove (client) { + _remove (clientItem) { const removed = removeWhere( this._idle, - item => item.client === client + item => item.clientItem === clientItem ) if (removed !== undefined) { clearTimeout(removed.timeoutId) } - this._clients = this._clients.filter(c => c !== client) - client.end() - this.emit('remove', client) + this._clients = this._clients.filter(c => c !== clientItem) + clientItem.client.end() + this.emit('remove', clientItem.client) } connect (cb) { @@ -191,8 +199,9 @@ class Pool extends EventEmitter { newClient (pendingItem) { const client = new this.Client(this.options) - this._clients.push(client) - const idleListener = makeIdleListener(this, client) + const clientItem = new ClientItem(client, 0) + this._clients.push(clientItem) + const idleListener = makeIdleListener(this, clientItem) this.log('checking client timeout') @@ -217,7 +226,7 @@ class Pool extends EventEmitter { if (err) { this.log('client failed to connect', err) // remove the dead client from our list of clients - this._clients = this._clients.filter(c => c !== client) + this._clients = this._clients.filter(c => c !== clientItem) if (timeoutHit) { err.message = 'Connection terminated due to connection timeout' } @@ -230,63 +239,67 @@ class Pool extends EventEmitter { } } else { this.log('new client connected') - - return this._acquireClient(client, pendingItem, idleListener, true) + return this._acquireClient(clientItem, pendingItem, idleListener, true) } }) } // acquire a client for a pending work item - _acquireClient (client, pendingItem, idleListener, isNew) { + _acquireClient (clientItem, pendingItem, idleListener, isNew) { if (isNew) { - this.emit('connect', client) + this.emit('connect', clientItem.client) } - this.emit('acquire', client) + this.emit('acquire', clientItem.client) let released = false - client.release = (err) => { + clientItem.useCount += 1 + + clientItem.client.release = (err) => { if (released) { throwOnDoubleRelease() } released = true - this._release(client, idleListener, err) + this._release(clientItem, idleListener, err) } - client.removeListener('error', idleListener) + clientItem.client.removeListener('error', idleListener) if (!pendingItem.timedOut) { if (isNew && this.options.verify) { - this.options.verify(client, (err) => { + this.options.verify(clientItem.client, (err) => { if (err) { - client.release(err) + clientItem.client.release(err) return pendingItem.callback(err, undefined, NOOP) } - pendingItem.callback(undefined, client, client.release) + pendingItem.callback(undefined, clientItem.client, clientItem.client.release) }) } else { - pendingItem.callback(undefined, client, client.release) + pendingItem.callback(undefined, clientItem.client, clientItem.client.release) } } else { if (isNew && this.options.verify) { - this.options.verify(client, client.release) + this.options.verify(clientItem.client, clientItem.client.release) } else { - client.release() + clientItem.client.release() } } } // release a client back to the poll, include an error // to remove it from the pool - _release (client, idleListener, err) { - client.on('error', idleListener) + _release (clientItem, idleListener, err) { + clientItem.client.on('error', idleListener) // TODO(bmc): expose a proper, public interface _queryable and _ending - if (err || this.ending || !client._queryable || client._ending) { - this._remove(client) + if (err || this.ending || !clientItem.client._queryable || clientItem.client._ending || clientItem.useCount >= this.options.maxUses) { + if (clientItem.useCount >= this.options.maxUses) { + this.log('removing expended client') + } + this._remove(clientItem) this._pulseQueue() return } @@ -296,11 +309,11 @@ class Pool extends EventEmitter { if (this.options.idleTimeoutMillis) { tid = setTimeout(() => { this.log('remove idle client') - this._remove(client) + this._remove(clientItem) }, this.options.idleTimeoutMillis) } - this._idle.push(new IdleItem(client, idleListener, tid)) + this._idle.push(new IdleItem(clientItem, idleListener, tid)) this._pulseQueue() } diff --git a/packages/pg-pool/test/bring-your-own-promise.js b/packages/pg-pool/test/bring-your-own-promise.js index f7fe3bde9..92f11a61a 100644 --- a/packages/pg-pool/test/bring-your-own-promise.js +++ b/packages/pg-pool/test/bring-your-own-promise.js @@ -8,29 +8,34 @@ const BluebirdPromise = require('bluebird') const Pool = require('../') -const checkType = promise => { +const checkType = (promise, expectError) => { expect(promise).to.be.a(BluebirdPromise) - return promise.catch(e => undefined) + return promise.catch(e => { + if (expectError) { + return undefined + } + expect().fail(e) + }) } describe('Bring your own promise', function () { it('uses supplied promise for operations', co.wrap(function * () { const pool = new Pool({ Promise: BluebirdPromise }) - const client1 = yield checkType(pool.connect()) + const client1 = yield checkType(pool.connect(), false) client1.release() - yield checkType(pool.query('SELECT NOW()')) - const client2 = yield checkType(pool.connect()) + yield checkType(pool.query('SELECT NOW()'), false) + const client2 = yield checkType(pool.connect(), false) // TODO - make sure pg supports BYOP as well client2.release() - yield checkType(pool.end()) + yield checkType(pool.end(), false) })) it('uses promises in errors', co.wrap(function * () { const pool = new Pool({ Promise: BluebirdPromise, port: 48484 }) - yield checkType(pool.connect()) - yield checkType(pool.end()) - yield checkType(pool.connect()) - yield checkType(pool.query()) - yield checkType(pool.end()) + yield checkType(pool.connect(), true) + yield checkType(pool.end(), true) + yield checkType(pool.connect(), true) + yield checkType(pool.query(), true) + yield checkType(pool.end(), true) })) }) diff --git a/packages/pg-pool/test/error-handling.js b/packages/pg-pool/test/error-handling.js index 90de4ec41..3c37de502 100644 --- a/packages/pg-pool/test/error-handling.js +++ b/packages/pg-pool/test/error-handling.js @@ -238,7 +238,7 @@ describe('pool error handling', function () { }) setTimeout(() => { - pool._clients[0].end() + pool._clients[0].client.end() }, 1000) }) }) diff --git a/packages/pg-pool/test/max-uses.js b/packages/pg-pool/test/max-uses.js new file mode 100644 index 000000000..138b0805e --- /dev/null +++ b/packages/pg-pool/test/max-uses.js @@ -0,0 +1,59 @@ +const expect = require('expect.js') +const co = require('co') +const _ = require('lodash') + +const describe = require('mocha').describe +const it = require('mocha').it + +const Pool = require('../') + +describe('maxUses of 2', () => { + it('can create a single client and use it once', co.wrap(function * () { + const pool = new Pool({ maxUses: 2 }) + expect(pool.waitingCount).to.equal(0) + const client = yield pool.connect() + const res = yield client.query('SELECT $1::text as name', ['hi']) + expect(res.rows[0].name).to.equal('hi') + client.release() + pool.end() + })) + + it('getting a connection a second time returns the same connection and releasing it also closes it', co.wrap(function * () { + const pool = new Pool({ maxUses: 2 }) + expect(pool.waitingCount).to.equal(0) + const client = yield pool.connect() + client.release() + const client2 = yield pool.connect() + expect(client).to.equal(client2) + expect(client2._ending).to.equal(false) + client2.release() + expect(client2._ending).to.equal(true) + return yield pool.end() + })) + + it('getting a connection a third time returns a new connection', co.wrap(function * () { + const pool = new Pool({ maxUses: 2 }) + expect(pool.waitingCount).to.equal(0) + const client = yield pool.connect() + client.release() + const client2 = yield pool.connect() + expect(client).to.equal(client2) + client2.release() + const client3 = yield pool.connect() + expect(client3).not.to.equal(client2) + client3.release() + return yield pool.end() + })) + + it('logs when removing an expended client', co.wrap(function * () { + const messages = [] + const log = function (msg) { + messages.push(msg) + } + const pool = new Pool({ maxUses: 1, log }) + const client = yield pool.connect() + client.release() + expect(messages).to.contain('removing expended client') + return yield pool.end() + })) +}) diff --git a/packages/pg/test/integration/connection-pool/error-tests.js b/packages/pg/test/integration/connection-pool/error-tests.js index 9fe760431..94d1c6fae 100644 --- a/packages/pg/test/integration/connection-pool/error-tests.js +++ b/packages/pg/test/integration/connection-pool/error-tests.js @@ -111,7 +111,7 @@ suite.test('handles socket error during pool.query and destroys it immediately', }) setTimeout(() => { - pool._clients[0].native.cancel((err) => { + pool._clients[0].client.native.cancel((err) => { assert.ifError(err) }) }, 100) @@ -122,7 +122,7 @@ suite.test('handles socket error during pool.query and destroys it immediately', cb() }) - const stream = pool._clients[0].connection.stream + const stream = pool._clients[0].client.connection.stream setTimeout(() => { stream.emit('error', new Error('network issue')) }, 100) From 9ab66e8741155fcbc71998ddd067305eef5b3e8e Mon Sep 17 00:00:00 2001 From: Chris Chew Date: Thu, 26 Mar 2020 19:03:32 -0500 Subject: [PATCH 2/2] Documenation grammar fixes --- README.md | 2 +- packages/pg-pool/README.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7c5d57904..d963edc20 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ If your change involves breaking backwards compatibility please please point tha 1. Clone the repo 2. From your workspace root run `yarn` and then `yarn lerna bootstrap` -3. Ensure you have a PostgreSQL instance running with SSL enabled and an empty database for test +3. Ensure you have a PostgreSQL instance running with SSL enabled and an empty database for tests 4. Ensure you have the proper environment variables configured for connecting to the instance 5. Run `yarn test` to run all the tests diff --git a/packages/pg-pool/README.md b/packages/pg-pool/README.md index 84ea67b65..3de0435a6 100644 --- a/packages/pg-pool/README.md +++ b/packages/pg-pool/README.md @@ -333,15 +333,15 @@ __please note:__ in node `<=0.12.x` the pool will throw if you do not provide a ## maxUses and read-replica autoscaling (e.g. AWS Aurora) -The maxUses config option can help an application instance rebalance load against a replica set that has been auto-scaled after the point in time which the connection pool is full of healthy connections. +The maxUses config option can help an application instance rebalance load against a replica set that has been auto-scaled after the connection pool is already full of healthy connections. The mechanism here is that a connection is considered "expended" after it has been acquired and released `maxUses` number of times. Depending on the load on your system, this means there will be an approximate time in which any given connection will live, thus creating a window for rebalancing. Imagine a scenario where you have 10 app instances providing an API running against a replica cluster of 3 that are accessed via a round-robin DNS entry. Each instance runs a connection pool size of 20. With an ambient load of 50 requests per second, the connection pool will fill likely up in a few minutes with healthy connections. -If you have weekly bursts of traffic which peak at 1,000 requests per second, you might want to grow your replicas to 10 during this period. Without setting `maxUses`, the new replicas will not be adopted by the app servers without an intervention -- namely, restarting each in turn in order to build up a new connection pool that is balanced. Adding additional app server instances will help to some extent because they will adopt all the replicas in an even way, but the initial app servers will continue to focus additional load on the original replicas. +If you have weekly bursts of traffic which peak at 1,000 requests per second, you might want to grow your replicas to 10 during this period. Without setting `maxUses`, the new replicas will not be adopted by the app servers without an intervention -- namely, restarting each in turn in order to build up new connection pools that are balanced against all the replicas. Adding additional app server instances will help to some extent because they will adopt all the replicas in an even way, but the initial app servers will continue to focus additional load on the original replicas. -However, setting `maxUses` to 7500, will ensure that over a period of 30 minutes or so the new replicas will be adopted as the pre-existing connections are closed and replaced with new ones, this creating a short window for eventual balance. +This is where the `maxUses` configuration option comes into play. Setting `maxUses` to 7500, will ensure that over a period of 30 minutes or so the new replicas will be adopted as the pre-existing connections are closed and replaced with new ones, thus creating a window for eventual balance. You'll want to test based on your own scenarios, but one way to make a first guess at `maxUses` is to identify an acceptable window for rebalancing and then solve for the value: