Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Here we go... #1

Merged
merged 1 commit into from
Oct 2, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions test/many-streams.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,40 +131,39 @@ async function makeTableP({gbqClient, gbqDatasetName, gbqTableName}) {
}

function createReadStream() {
var results = Array(streamLength).fill(stringifyObject(testData))

return new Readable({

objectMode : true,
highWaterMark: streamLength,

read(_size) {

const thisStream = this
this.push(results.pop())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of:

  • For every pull from this stream, emit 200 rows at once

I changed it to:

  • For every pull from this stream, emit 1 row

Then, you don't have to get into the flow control aspect of Node stream internals with highWatermark-- we will let the requesting stream pull as it needs and get one thing at a time.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is way better thank you.

Obviously it's not important in this case, but do you have any idea if reading one item at a time would have performance implications in other projects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't say definitively, but in general, I would trust Node's internals to handle as much as possible. So switching to the pattern of pull (1) -> get (1) where possible should show performance improvements. If you aren't seeing good performance, play with highWaterMark (16kb default / 16 objects default for objectMode), but still return one thing at a time from read.

(If you find that to be bad advice, let me know!)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this sounds like good advice and I'll write readable streams this way in the future.


ramda.range(0, streamLength).map(() => {

let obj = testData

if (randomizeData) {
obj = ramda.clone(testData)
randomizeM(obj)
}

const str = stringifyObject(obj)

thisStream.push(str)
})
thisStream.push(null)
if (!results.length) {
this.push(null)
}
},
})
}

var allOutputRows = 0
function streamsToGcloudPromiseP(readStream, writeStream) {

return new Promise((resolve, reject) => {

writeStream.on('error', reject)
writeStream.on('complete', resolve)
writeStream.on('complete', job => {
job.on('error', reject)
job.on('complete', job => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary, but using the job's events, we can skip the optimistic delay later in the test, and instead know for certain that the rows were inserted.

This also gives us what turned out to be the key component of this issue, the logging of the job object's state. Most of the time, everything works as expected. However, an occasional test run returns:

  Rejected promise returned by test. Reason:

  ErrorClass {
    code: 503,
    errors: [
      {
        domain: 'global',
        message: 'Error encountered during execution. Retrying may solve the problem.',
        reason: 'backendError',
      },
    ],
    message: 'Error encountered during execution. Retrying may solve the problem.',
    response: undefined,
  }

  Object.parseHttpRespBody (node_modules/@google-cloud/common/src/util.js:192:30)
  Object.handleResp (node_modules/@google-cloud/common/src/util.js:132:18)
  Request._callback (node_modules/@google-cloud/common/src/util.js:255:14)
  Request.self.callback (node_modules/request/request.js:186:22)
  Request.<anonymous> (node_modules/request/request.js:1163:10)
  IncomingMessage.<anonymous> (node_modules/request/request.js:1085:12)
  Test.__dirname [as fn] (test/many-streams.test.js:189:38)

I'm going to take this finding back to the original issue and discuss what we should do about it.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thank you. I didn't quite understand how to use job from the documentation but this makes much more sense.

try {
allOutputRows += parseInt(job.statistics.load.outputRows, 10)
} catch (e) {
console.log(e)
console.log(job)
}
console.log(job.status.state, job.statistics.load.outputRows)
resolve()
})
})

readStream.pipe(writeStream)
})
Expand Down Expand Up @@ -197,10 +196,12 @@ test('run and verify result', async (t) => {
{concurrency: streamConcurrency}
)

console.log(`writes complete, waiting ${queryLatencyMs / 1000}s for data to process...`)
console.log('All output rows', allOutputRows)

// console.log(`writes complete, waiting ${queryLatencyMs / 1000}s for data to process...`)

// give BQ time to process the data
await bluebird.delay(queryLatencyMs)
// await bluebird.delay(queryLatencyMs)

const queryString = `SELECT COUNT(bucket) FROM [${projectId}:${bQDatasetName}.${bQTableName}]`

Expand Down