Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle bad message ordering - make it catchable. Fixes 3174 #3289

Merged
merged 11 commits into from
Sep 17, 2024
7 changes: 3 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- run: yarn install --frozen-lockfile
- run: yarn lint
build:
timeout-minutes: 10
timeout-minutes: 15
needs: lint
services:
postgres:
Expand All @@ -44,8 +44,8 @@ jobs:
- '22'
os:
- ubuntu-latest
name: Node.js ${{ matrix.node }} (${{ matrix.os }})
runs-on: ${{ matrix.os }}
name: Node.js ${{ matrix.node }}
runs-on: ubuntu-latest
env:
PGUSER: postgres
PGPASSWORD: postgres
Expand All @@ -71,5 +71,4 @@ jobs:
node-version: ${{ matrix.node }}
cache: yarn
- run: yarn install --frozen-lockfile
# TODO(bmc): get ssl tests working in ci
- run: yarn test
13 changes: 1 addition & 12 deletions packages/pg-native/test/many-connections.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var bytes = require('crypto').pseudoRandomBytes
describe('many connections', function () {
describe('async', function () {
var test = function (count, times) {
it('connecting ' + count + ' clients ' + times, function (done) {
it(`connecting ${count} clients ${times} times`, function (done) {
this.timeout(200000)

var connectClient = function (n, cb) {
Expand Down Expand Up @@ -38,20 +38,9 @@ describe('many connections', function () {
}

test(1, 1)
test(1, 1)
test(1, 1)
test(5, 5)
test(5, 5)
test(5, 5)
test(5, 5)
test(10, 10)
test(10, 10)
test(10, 10)
test(20, 20)
test(20, 20)
test(20, 20)
test(30, 10)
test(30, 10)
test(30, 10)
})
})
6 changes: 5 additions & 1 deletion packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,11 @@ class Client extends EventEmitter {
this.activeQuery.handleCommandComplete(msg, this.connection)
}

_handleParseComplete(msg) {
_handleParseComplete() {
if (this.activeQuery == null) {
this.emit('error', new Error('Received parseComplete when not in parsing state'))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use _handleErrorEvent, since the connection is in an unpredictable state afterwards?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see how using _handleErrorEvent might be a good idea here too, I think what @brianc communicated to me about this PR is that he thinks the issue is always related to an unexpected message arriving at a client which is in the idle state and so if that is the case it might not be required to do the full _handleErrorEvent but I can see how maybe doing full _handleErrorEvent is more robust.

return
}
// if a prepared statement has a name and properly parses
// we track that its already been executed so we don't parse
// it again on the same client
Expand Down
57 changes: 21 additions & 36 deletions packages/pg/script/create-test-tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,26 @@ var people = [
{ name: 'Zanzabar', age: 260 },
]

var con = new pg.Client({
user: args.user,
password: args.password,
host: args.host,
port: args.port,
database: args.database,
})

con.connect((err) => {
if (err) {
throw err
}

con.query(
'DROP TABLE IF EXISTS person;' + ' CREATE TABLE person (id serial, name varchar(10), age integer)',
(err) => {
if (err) {
throw err
}

console.log('Created table person')
console.log('Filling it with people')

con.query(
'INSERT INTO person (name, age) VALUES' +
people.map((person) => ` ('${person.name}', ${person.age})`).join(','),
(err, result) => {
if (err) {
throw err
}

console.log(`Inserted ${result.rowCount} people`)
con.end()
}
)
}
async function run() {
var con = new pg.Client({
user: args.user,
password: args.password,
host: args.host,
port: args.port,
database: args.database,
})
console.log('creating test dataset')
await con.connect()
await con.query('DROP TABLE IF EXISTS person')
await con.query('CREATE TABLE person (id serial, name varchar(10), age integer)')
await con.query(
'INSERT INTO person (name, age) VALUES' + people.map((person) => ` ('${person.name}', ${person.age})`).join(',')
)
await con.end()
console.log('created test dataset')
}

run().catch((e) => {
console.log('setup failed', e)
process.exit(255)
})
147 changes: 147 additions & 0 deletions packages/pg/test/integration/gh-issues/3174-tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
const net = require('net')
const buffers = require('../../test-buffers')
const helper = require('../test-helper')
const assert = require('assert')
const cli = require('../../cli')

const suite = new helper.Suite()

const options = {
host: 'localhost',
port: Math.floor(Math.random() * 2000) + 2000,
connectionTimeoutMillis: 2000,
user: 'not',
database: 'existing',
}

const startMockServer = (port, timeout, callback) => {
const sockets = new Set()

const server = net.createServer((socket) => {
sockets.add(socket)
socket.once('end', () => sockets.delete(socket))

socket.on('data', (data) => {
// deny request for SSL
if (data.length === 8) {
socket.write(Buffer.from('N', 'utf8'))
return
// consider all authentication requests as good
}
// the initial message coming in has a 0 message type for authentication negotiation
if (!data[0]) {
socket.write(buffers.authenticationOk())
// send ReadyForQuery `timeout` ms after authentication
socket.write(buffers.readyForQuery())
return
// respond with our canned response
}
const code = data.toString('utf8', 0, 1)
switch (code) {
// parse
case 'P':
socket.write(buffers.parseComplete())
socket.write(buffers.bindComplete())
socket.write(buffers.rowDescription())
socket.write(buffers.dataRow())
socket.write(buffers.commandComplete('FOO BAR'))
socket.write(buffers.readyForQuery())
// this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer
setImmediate(() => {
socket.write(buffers.parseComplete())
})
break
case 'Q':
socket.write(buffers.rowDescription())
socket.write(buffers.dataRow())
socket.write(buffers.commandComplete('FOO BAR'))
socket.write(buffers.readyForQuery())
// this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer
setImmediate(() => {
socket.write(buffers.parseComplete())
})
default:
// console.log('got code', code)
}
})
})

const closeServer = () => {
for (const socket of sockets) {
socket.destroy()
}
return new Promise((resolve) => {
server.close(resolve)
})
}

server.listen(port, options.host, () => callback(closeServer))
}

const delay = (ms) =>
new Promise((resolve) => {
setTimeout(resolve, ms)
})

suite.testAsync('Out of order parseComplete on simple query is catchable', async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, 0, (closeServer) => resolve(closeServer))
})
const client = new helper.Client(options)
await client.connect()

let errorHit = false
client.on('error', () => {
errorHit = true
})

await client.query('SELECT NOW')
await delay(50)
await client.query('SELECT NOW')
await delay(50)
await client.query('SELECT NOW')
await delay(50)
await client.end()
assert(cli.native || errorHit)

await closeServer()
})

suite.testAsync('Out of order parseComplete on extended query is catchable', async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, 0, (closeServer) => resolve(closeServer))
})
const client = new helper.Client(options)
await client.connect()

let errorHit = false
client.on('error', () => {
errorHit = true
})

await client.query('SELECT $1', ['foo'])
await delay(40)
assert(cli.native || errorHit)
await client.end()

await closeServer()
})

suite.testAsync('Out of order parseComplete on pool is catchable', async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, 0, (closeServer) => resolve(closeServer))
})
const pool = new helper.pg.Pool(options)

let errorHit = false
pool.on('error', () => {
errorHit = true
})

await pool.query('SELECT $1', ['foo'])
await delay(100)
assert(cli.native || errorHit)

await pool.end()
await closeServer()
})