Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pg-stream] Async iterable #2050

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 41 additions & 32 deletions packages/pg-query-stream/index.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable

const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values, options)
constructor(text, values, { rowMode = undefined, types = undefined, batchSize = 100 } = {}) {
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, { rowMode, types })

this._reading = false
this._closed = false
this.batchSize = (options || {}).batchSize || 100
this._destroyCallback = undefined

// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
Expand All @@ -19,40 +21,47 @@ class PgQueryStream extends Readable {
this.handleError = this.cursor.handleError.bind(this.cursor)
}

submit (connection) {
submit(connection) {
this.cursor.submit(connection)
}

close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
// Backwards compatibility.
// A stream should be 'closed' using destroy().
close(callback) {
if (this.destroyed) {
if (callback) setImmediate(callback)
} else {
if (callback) this.once('close', callback)
this.destroy()
}
}

_read (size) {
if (this._reading || this._closed) {
return false
_destroy(_err, callback) {
if (this._reading) {
this._destroyCallback = callback
} else {
this.cursor.close(callback)
}
}

// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
// Prevent _destroy() from closing while reading
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

// push each row into the stream
this.cursor.read(size, (err, rows, result) => {
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])

if (this.destroyed) {
// Destroyed while reading
this.cursor.close(this._destroyCallback)
this._destroyCallback = undefined
} else if (err) {
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
Expand Down