Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: major performance issues with bytea performance #2240 #2241

Merged
merged 11 commits into from
Jul 7, 2020
Merged
70 changes: 51 additions & 19 deletions packages/pg-protocol/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ const enum MessageCodes {
export type MessageCallback = (msg: BackendMessage) => void

export class Parser {
private remainingBuffer: Buffer = emptyBuffer
private buffer: Buffer = emptyBuffer
private bufferLength: number = 0
private bufferOffset: number = 0
private reader = new BufferReader()
private mode: Mode

Expand All @@ -86,35 +88,65 @@ export class Parser {
}

public parse(buffer: Buffer, callback: MessageCallback) {
let combinedBuffer = buffer
if (this.remainingBuffer.byteLength) {
combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength)
this.remainingBuffer.copy(combinedBuffer)
buffer.copy(combinedBuffer, this.remainingBuffer.byteLength)
}
let offset = 0
while (offset + HEADER_LENGTH <= combinedBuffer.byteLength) {
this.mergeBuffer(buffer)
const bufferFullLength = this.bufferOffset + this.bufferLength
let offset = this.bufferOffset
while (offset + HEADER_LENGTH <= bufferFullLength) {
// code is 1 byte long - it identifies the message type
const code = combinedBuffer[offset]

const code = this.buffer[offset]
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH)

const length = this.buffer.readUInt32BE(offset + CODE_LENGTH)
const fullMessageLength = CODE_LENGTH + length

if (fullMessageLength + offset <= combinedBuffer.byteLength) {
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer)
if (fullMessageLength + offset <= bufferFullLength) {
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer)
callback(message)
offset += fullMessageLength
} else {
break
}
}
if (offset === bufferFullLength) {
// No more use for the buffer
this.buffer = emptyBuffer
this.bufferLength = 0
this.bufferOffset = 0
} else {
// Adjust the cursors of remainingBuffer
this.bufferLength = bufferFullLength - offset
this.bufferOffset = offset
}
}

if (offset === combinedBuffer.byteLength) {
this.remainingBuffer = emptyBuffer
private mergeBuffer(buffer: Buffer): void {
if (this.bufferLength > 0) {
const newLength = this.bufferLength + buffer.byteLength
const newFullLength = newLength + this.bufferOffset
if (newFullLength > this.buffer.byteLength) {
// We can't concat the new buffer with the remaining one
let newBuffer: Buffer
if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
// We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
newBuffer = this.buffer
} else {
// Allocate a new larger buffer
let newBufferLength = this.buffer.byteLength * 2
while (newLength >= newBufferLength) {
newBufferLength *= 2
}
newBuffer = Buffer.allocUnsafe(newBufferLength)
}
// Move the remaining buffer to the new one
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
this.buffer = newBuffer
this.bufferOffset = 0
}
// Concat the new buffer with the remaining one
buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
this.bufferLength = newLength
} else {
this.remainingBuffer = combinedBuffer.slice(offset)
this.buffer = buffer
this.bufferOffset = 0
this.bufferLength = buffer.byteLength
}
}

Expand Down
21 changes: 18 additions & 3 deletions packages/pg/bench.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const pg = require('./lib')
const pool = new pg.Pool()

const params = {
text:
Expand All @@ -17,7 +16,7 @@ const seq = {
}

const exec = async (client, q) => {
const result = await client.query({
await client.query({
text: q.text,
values: q.values,
rowMode: 'array',
Expand All @@ -39,7 +38,9 @@ const bench = async (client, q, time) => {
const run = async () => {
const client = new pg.Client()
await client.connect()
console.log('start')
await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)')
await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)')
await bench(client, params, 1000)
console.log('warmup done')
const seconds = 5
Expand All @@ -61,7 +62,21 @@ const run = async () => {
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 5799 qps')
console.log()

console.log('')
console.log('Warming up bytea test')
await client.query({
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
values: ['test', Buffer.allocUnsafe(104857600)],
})
console.log('bytea warmup done')
const start = Date.now()
const results = await client.query('SELECT * FROM buf')
const time = Date.now() - start
console.log('bytea time:', time, 'ms')
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')

await client.end()
await client.end()
}
Expand Down