-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Here we go... #1
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,40 +131,39 @@ async function makeTableP({gbqClient, gbqDatasetName, gbqTableName}) { | |
} | ||
|
||
function createReadStream() { | ||
var results = Array(streamLength).fill(stringifyObject(testData)) | ||
|
||
return new Readable({ | ||
|
||
objectMode : true, | ||
highWaterMark: streamLength, | ||
|
||
read(_size) { | ||
|
||
const thisStream = this | ||
this.push(results.pop()) | ||
|
||
ramda.range(0, streamLength).map(() => { | ||
|
||
let obj = testData | ||
|
||
if (randomizeData) { | ||
obj = ramda.clone(testData) | ||
randomizeM(obj) | ||
} | ||
|
||
const str = stringifyObject(obj) | ||
|
||
thisStream.push(str) | ||
}) | ||
thisStream.push(null) | ||
if (!results.length) { | ||
this.push(null) | ||
} | ||
}, | ||
}) | ||
} | ||
|
||
var allOutputRows = 0 | ||
function streamsToGcloudPromiseP(readStream, writeStream) { | ||
|
||
return new Promise((resolve, reject) => { | ||
|
||
writeStream.on('error', reject) | ||
writeStream.on('complete', resolve) | ||
writeStream.on('complete', job => { | ||
job.on('error', reject) | ||
job.on('complete', job => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't necessary, but using the This also gives us what turned out to be the key component of this issue, the logging of the
I'm going to take this finding back to the original issue and discuss what we should do about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perfect, thank you. I didn't quite understand how to use |
||
try { | ||
allOutputRows += parseInt(job.statistics.load.outputRows, 10) | ||
} catch (e) { | ||
console.log(e) | ||
console.log(job) | ||
} | ||
console.log(job.status.state, job.statistics.load.outputRows) | ||
resolve() | ||
}) | ||
}) | ||
|
||
readStream.pipe(writeStream) | ||
}) | ||
|
@@ -197,10 +196,12 @@ test('run and verify result', async (t) => { | |
{concurrency: streamConcurrency} | ||
) | ||
|
||
console.log(`writes complete, waiting ${queryLatencyMs / 1000}s for data to process...`) | ||
console.log('All output rows', allOutputRows) | ||
|
||
// console.log(`writes complete, waiting ${queryLatencyMs / 1000}s for data to process...`) | ||
|
||
// give BQ time to process the data | ||
await bluebird.delay(queryLatencyMs) | ||
// await bluebird.delay(queryLatencyMs) | ||
|
||
const queryString = `SELECT COUNT(bucket) FROM [${projectId}:${bQDatasetName}.${bQTableName}]` | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of:
I changed it to:
Then, you don't have to get into the flow control aspect of Node stream internals with
highWatermark
-- we will let the requesting stream pull as it needs and get one thing at a time.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is way better thank you.
Obviously it's not important in this case, but do you have any idea if reading one item at a time would have performance implications in other projects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't say definitively, but in general, I would trust Node's internals to handle as much as possible. So switching to the pattern of pull (1) -> get (1) where possible should show performance improvements. If you aren't seeing good performance, play with
highWaterMark
(16kb default / 16 objects default for objectMode), but still return one thing at a time fromread
.(If you find that to be bad advice, let me know!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this sounds like good advice and I'll write readable streams this way in the future.