diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 4044dae1c..a00dabec9 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -74,7 +74,9 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void export class Parser { - private remainingBuffer: Buffer = emptyBuffer + private buffer: Buffer = emptyBuffer + private bufferLength: number = 0 + private bufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -86,35 +88,65 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { - let combinedBuffer = buffer - if (this.remainingBuffer.byteLength) { - combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength) - this.remainingBuffer.copy(combinedBuffer) - buffer.copy(combinedBuffer, this.remainingBuffer.byteLength) - } - let offset = 0 - while (offset + HEADER_LENGTH <= combinedBuffer.byteLength) { + this.mergeBuffer(buffer) + const bufferFullLength = this.bufferOffset + this.bufferLength + let offset = this.bufferOffset + while (offset + HEADER_LENGTH <= bufferFullLength) { // code is 1 byte long - it identifies the message type - const code = combinedBuffer[offset] - + const code = this.buffer[offset] // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) - + const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) const fullMessageLength = CODE_LENGTH + length - - if (fullMessageLength + offset <= combinedBuffer.byteLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) + if (fullMessageLength + offset <= bufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) callback(message) offset += fullMessageLength } else { break } } + if (offset === bufferFullLength) { + // No more use for the buffer + this.buffer = emptyBuffer + this.bufferLength = 0 + this.bufferOffset = 0 + } else { + // Adjust the cursors of remainingBuffer + this.bufferLength = bufferFullLength - offset + this.bufferOffset = offset + } + } - if (offset === combinedBuffer.byteLength) { - this.remainingBuffer = emptyBuffer + private mergeBuffer(buffer: Buffer): void { + if (this.bufferLength > 0) { + const newLength = this.bufferLength + buffer.byteLength + const newFullLength = newLength + this.bufferOffset + if (newFullLength > this.buffer.byteLength) { + // We can't concat the new buffer with the remaining one + let newBuffer: Buffer + if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { + // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer + newBuffer = this.buffer + } else { + // Allocate a new larger buffer + let newBufferLength = this.buffer.byteLength * 2 + while (newLength >= newBufferLength) { + newBufferLength *= 2 + } + newBuffer = Buffer.allocUnsafe(newBufferLength) + } + // Move the remaining buffer to the new one + this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) + this.buffer = newBuffer + this.bufferOffset = 0 + } + // Concat the new buffer with the remaining one + buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) + this.bufferLength = newLength } else { - this.remainingBuffer = combinedBuffer.slice(offset) + this.buffer = buffer + this.bufferOffset = 0 + this.bufferLength = buffer.byteLength } } diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 80c07dc19..1c1aa641d 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -1,5 +1,4 @@ const pg = require('./lib') -const pool = new pg.Pool() const params = { text: @@ -17,7 +16,7 @@ const seq = { } const exec = async (client, q) => { - const result = await client.query({ + await client.query({ text: q.text, values: q.values, rowMode: 'array', @@ -39,7 +38,9 @@ const bench = async (client, q, time) => { const run = async () => { const client = new pg.Client() await client.connect() + console.log('start') await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') + await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)') await bench(client, params, 1000) console.log('warmup done') const seconds = 5 @@ -61,7 +62,21 @@ const run = async () => { console.log('insert queries:', queries) console.log('qps', queries / seconds) console.log('on my laptop best so far seen 5799 qps') - console.log() + + console.log('') + console.log('Warming up bytea test') + await client.query({ + text: 'INSERT INTO buf(name, data) VALUES ($1, $2)', + values: ['test', Buffer.allocUnsafe(104857600)], + }) + console.log('bytea warmup done') + const start = Date.now() + const results = await client.query('SELECT * FROM buf') + const time = Date.now() - start + console.log('bytea time:', time, 'ms') + console.log('bytea length:', results.rows[0].data.byteLength, 'bytes') + console.log('on my laptop best so far seen 1107ms and 104857600 bytes') + await client.end() await client.end() }