-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pg-query-stream module #2035
Conversation
Consider a system where one component is scheduling tasks that yield streams, and passing them to (unknown) clients for consumption. It would be useful for the scheduler to know that the query underlying the stream is completed (so it can continue on to it's next task) without having to wait for the consumer to finish reading all results.
closes #2
Emit 'close' events when query completes
pg-cursor no longer returns the empty array 'done' signal to the callback until the cursor recieves a readyForQuery message. This means pg-query-stream will not emit 'close' or 'end' events until the server is __truly__ ready for the next query. This fixes some race-conditions where some queries are triggered off of the `end` event of the query-stream closes #3
- appears that timestamp queries emit a lot of `rows` with length == 0 - `self.once('end')` is added each of these times - assertion on listener count shows that more than 10 listeners are applied
- moves 'end' event listener to constructor, only listen once - ensures all existing tests still green
maxListeners on timestamp queries
This includes fixes in [email protected]. I've relaxed semver a touch so I don't have to release a new version here just for patch changes to pg-cursor.
Bumps [eslint](https://github.com/eslint/eslint) from 4.4.0 to 4.18.2. - [Release notes](https://github.com/eslint/eslint/releases) - [Changelog](https://github.com/eslint/eslint/blob/master/CHANGELOG.md) - [Commits](eslint/eslint@v4.4.0...v4.18.2) Signed-off-by: dependabot[bot] <[email protected]>
* Bump version of pg-cursor This includes fixes in [email protected]. I've relaxed semver a touch so I don't have to release a new version here just for patch changes to pg-cursor. * Pass options to pg-cursor fixes #55
Delete accidental addition
084b1a2
to
ef2f2d2
Compare
looks like there's a flaky test on node 8 I'll need to take a look at |
oh wasn't a flaky test it was just running tests from all packages in |
Thanks for the review! |
const query = client.query(stream) | ||
const iteratorRows = [] | ||
for await (const row of query) { | ||
iteratorRows.push(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should make:
- a test that breaks in the first iteration
- a test that triggers two breaking loops
- a test that
await new Promise(r => setTimeout(r))
in the loop body
These tests will fail due to the poor implementation of pg-query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to brianc/node-pg-query-stream#52 (comment) for an alternative implementation
You really shouldn't have just copied the code from brianc/node-pg-query-stream as it is an invalid implementation of the Readable interface. Using the current implementation breaks the pg connection when used with async iterables. |
Ohhh yeah I wanted to keep the operations discrete: first merge the old repo as is just to keep it contained in a single step, then implement your changes, add tests, release a new version, then convert to typescript, etc....versus doing it all in 1 big bang as it's harder to follow that way for me. Thanks for reminding me though, I'll get to fixing the stream right now! what do you mean by
|
There were some subtle behaviors with the stream being implemented incorrectly & not working as expected with async iteration. I've modified the code based on #2050 and comments in #2035 to have better test coverage of async iterables and update the internals significantly to more closely match the readable stream interface. Note: this is a __breaking__ (semver major) change to this package as the close event behavior is changed slightly, and `highWaterMark` is no longer supported. It shouldn't impact most usage, but breaking regardless.
for await (const r of getNewStream()) break
for await (const r of getNewStream()) break With each stream running a query that results in more than (I am on mobile, sorry for the poor example) |
no worries! appreciate the help! its my wifes birthday today so prolly wont
have time to finish this until tomorrow but i put up a PR and i think i
covered that case with a test!
…On Sat, Dec 28, 2019 at 11:29 AM Matthieu Sieben ***@***.***> wrote:
for await(const r of new stream) breakfor await (const r of new stream) break
With each stream running a query that results in more than batchSize
entries.
(I am on mobile, sorry for the poor example)
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#2035?email_source=notifications&email_token=AAAMHIOHHR2FA2XPRW6OFETQ26EHZA5CNFSM4J6BJF6KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEHYOQLQ#issuecomment-569436206>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAAMHIM5R74DOR4KU6KDPLDQ26EHZANCNFSM4J6BJF6A>
.
|
* Fix pg-query-stream There were some subtle behaviors with the stream being implemented incorrectly & not working as expected with async iteration. I've modified the code based on #2050 and comments in #2035 to have better test coverage of async iterables and update the internals significantly to more closely match the readable stream interface. Note: this is a __breaking__ (semver major) change to this package as the close event behavior is changed slightly, and `highWaterMark` is no longer supported. It shouldn't impact most usage, but breaking regardless. * Remove a bunch of additional code * Add test for destroy + error propagation * Add failing test for destroying unsubmitted stream * Do not throw an uncatchable error when closing an unused cursor
This uses
git subtree
as awesomely recommended by @charmander to pull in the history of thepg-query-stream
into this module. As was done with #2030 I haven't used lerna to manage sub-module dependencies between this & pg-cursor yet. I'll do that in a follow on PR.important: need to use merge commit button here to merge this, not squash.