From 55d86baaf554247c0c3d063fc790a37569f3e77b Mon Sep 17 00:00:00 2001 From: Alexander Fenster Date: Tue, 30 May 2023 11:08:49 -0700 Subject: [PATCH] fix: properly handle asynchronous read from stream (#1284) * test: test ReadRows logic with local gRPC server * test: PR feedback * test: fix race condition in initialization * test: PR feedback, renaming a variable for readability * fix: properly handle asynchronous read from stream * test: skip failing Windows test * test: increase timeout on Windows * fix: PR feedback * test: only set lastScannedRowKey for completed rows * fix: bring back the lastScannedRowKey logic * test: pick latest changes from main branch * fix: add transform method to userStream, handle cancellation in it * fix: PR feedback --------- Co-authored-by: danieljbruce --- src/chunktransformer.ts | 16 ++++++++++--- src/table.ts | 49 ++++++++++++++++++++++++++++++++-------- test/chunktransformer.ts | 6 ----- test/readrows.ts | 5 ++-- 4 files changed, 54 insertions(+), 22 deletions(-) diff --git a/src/chunktransformer.ts b/src/chunktransformer.ts index 986ceaf6f..4aec3d9ba 100644 --- a/src/chunktransformer.ts +++ b/src/chunktransformer.ts @@ -129,10 +129,10 @@ export class ChunkTransformer extends Transform { * @public * * @param {object} data readrows response containing array of chunks. - * @param {object} [enc] encoding options. + * @param {object} [_encoding] encoding options. * @param {callback} next callback will be called once data is processed, with error if any error in processing */ - _transform(data: Data, enc: string, next: Function): void { + _transform(data: Data, _encoding: string, next: Function): void { for (const chunk of data.chunks!) { switch (this.state) { case RowStateEnum.NEW_ROW: @@ -148,6 +148,7 @@ export class ChunkTransformer extends Transform { break; } if (this._destroyed) { + next(); return; } } @@ -226,7 +227,16 @@ export class ChunkTransformer extends Transform { chunk.familyName || chunk.qualifier || (chunk.value && chunk.value.length !== 0) || - chunk.timestampMicros! > 0; + // timestampMicros is an int64 in the protobuf definition, + // which can be either a number or an instance of Long. + // If it's a number... + (typeof chunk.timestampMicros === 'number' && + chunk.timestampMicros! > 0) || + // If it's an instance of Long... + (typeof chunk.timestampMicros === 'object' && + 'compare' in chunk.timestampMicros && + typeof chunk.timestampMicros.compare === 'function' && + chunk.timestampMicros.compare(0) === 1); if (chunk.resetRow && containsData) { this.destroy( new TransformError({ diff --git a/src/table.ts b/src/table.ts index bb94470af..425d640e1 100644 --- a/src/table.ts +++ b/src/table.ts @@ -745,22 +745,51 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); filter = Filter.parse(options.filter); } - const userStream = new PassThrough({objectMode: true}); - const end = userStream.end.bind(userStream); - userStream.end = () => { + let chunkTransformer: ChunkTransformer; + let rowStream: Duplex; + + let userCanceled = false; + const userStream = new PassThrough({ + objectMode: true, + transform(row, _encoding, callback) { + if (userCanceled) { + callback(); + return; + } + callback(null, row); + }, + }); + + // The caller should be able to call userStream.end() to stop receiving + // more rows and cancel the stream prematurely. But also, the 'end' event + // will be emitted if the stream ended normally. To tell these two + // situations apart, we'll save the "original" end() function, and + // will call it on rowStream.on('end'). + const originalEnd = userStream.end.bind(userStream); + + // Taking care of this extra listener when piping and unpiping userStream: + const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => { + rowStream.pipe(userStream, {end: false}); + rowStream.on('end', originalEnd); + }; + const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => { rowStream?.unpipe(userStream); + rowStream?.removeListener('end', originalEnd); + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { + rowStreamUnpipe(rowStream, userStream); + userCanceled = true; if (activeRequestStream) { activeRequestStream.abort(); } if (retryTimer) { clearTimeout(retryTimer); } - return end(); + return originalEnd(chunk, encoding, cb); }; - let chunkTransformer: ChunkTransformer; - let rowStream: Duplex; - const makeNewRequest = () => { // Avoid cancelling an expired timer if user // cancelled the stream in the middle of a retry @@ -882,7 +911,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const toRowStream = new Transform({ transform: (rowData, _, next) => { if ( - chunkTransformer._destroyed || + userCanceled || // eslint-disable-next-line @typescript-eslint/no-explicit-any (userStream as any)._writableState.ended ) { @@ -913,7 +942,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); rowStream .on('error', (error: ServiceError) => { - rowStream.unpipe(userStream); + rowStreamUnpipe(rowStream, userStream); activeRequestStream = null; if (IGNORED_STATUS_CODES.has(error.code)) { // We ignore the `cancelled` "error", since we are the ones who cause @@ -947,7 +976,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); .on('end', () => { activeRequestStream = null; }); - rowStream.pipe(userStream); + rowStreamPipe(rowStream, userStream); }; makeNewRequest(); diff --git a/test/chunktransformer.ts b/test/chunktransformer.ts index 4696d3765..d58a2d434 100644 --- a/test/chunktransformer.ts +++ b/test/chunktransformer.ts @@ -997,12 +997,6 @@ describe('Bigtable/ChunkTransformer', () => { const err = callback.getCall(0).args[0]; assert(!err, 'did not expect error'); }); - it('should return when stream is destroyed', () => { - chunkTransformer._destroyed = true; - const chunks = [{key: 'key'}]; - chunkTransformer._transform({chunks}, {}, callback); - assert(!callback.called, 'unexpected call to next'); - }); it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => { chunkTransformer._transform( {chunks: [], lastScannedRowKey: 'foo'}, diff --git a/test/readrows.ts b/test/readrows.ts index 3da2cb9cf..b02f3a12e 100644 --- a/test/readrows.ts +++ b/test/readrows.ts @@ -122,8 +122,7 @@ describe('Bigtable/ReadRows', () => { pipeline(readStream, transform, passThrough, () => {}); }); - // TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed - it.skip('should create read stream and read asynchronously using Transform stream', function (done) { + it('should create read stream and read asynchronously using Transform stream', function (done) { if (process.platform === 'win32') { this.timeout(60000); // it runs much slower on Windows! } @@ -222,7 +221,7 @@ describe('Bigtable/ReadRows', () => { }); }); - // TODO(@alexander-fenster): enable after it's fixed + // TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed it.skip('should be able to stop reading from the read stream when reading asynchronously', function (done) { if (process.platform === 'win32') { this.timeout(60000); // it runs much slower on Windows!