diff --git a/src/chunktransformer.ts b/src/chunktransformer.ts index 986ceaf6f..8c70a83c1 100644 --- a/src/chunktransformer.ts +++ b/src/chunktransformer.ts @@ -80,6 +80,7 @@ export enum RowStateEnum { export class ChunkTransformer extends Transform { options: TransformOptions; _destroyed: boolean; + _userCanceled: boolean; lastRowKey?: Value; state?: number; row?: Row; @@ -91,10 +92,15 @@ export class ChunkTransformer extends Transform { super(options); this.options = options; this._destroyed = false; + this._userCanceled = false; this.lastRowKey = undefined; this.reset(); } + get canceled() { + return this._userCanceled; + } + /** * called at end of the stream. * @public @@ -129,10 +135,10 @@ export class ChunkTransformer extends Transform { * @public * * @param {object} data readrows response containing array of chunks. - * @param {object} [enc] encoding options. + * @param {object} [_encoding] encoding options. * @param {callback} next callback will be called once data is processed, with error if any error in processing */ - _transform(data: Data, enc: string, next: Function): void { + _transform(data: Data, _encoding: string, next: Function): void { for (const chunk of data.chunks!) { switch (this.state) { case RowStateEnum.NEW_ROW: @@ -147,17 +153,6 @@ export class ChunkTransformer extends Transform { default: break; } - if (this._destroyed) { - return; - } - } - if (data.lastScannedRowKey && data.lastScannedRowKey.length > 0) { - this.lastRowKey = Mutation.convertFromBytes( - data.lastScannedRowKey as Bytes, - { - userOptions: this.options, - } - ); } next(); } @@ -226,7 +221,14 @@ export class ChunkTransformer extends Transform { chunk.familyName || chunk.qualifier || (chunk.value && chunk.value.length !== 0) || - chunk.timestampMicros! > 0; + // if it's a number + (typeof chunk.timestampMicros === 'number' && + chunk.timestampMicros! > 0) || + // if it's an instance of Long + (typeof chunk.timestampMicros === 'object' && + 'compare' in chunk.timestampMicros && + typeof chunk.timestampMicros.compare === 'function' && + chunk.timestampMicros.compare(0) === 1); if (chunk.resetRow && containsData) { this.destroy( new TransformError({ @@ -452,4 +454,8 @@ export class ChunkTransformer extends Transform { } this.moveToNextState(chunk); } + + cancel(): void { + this._userCanceled = true; + } } diff --git a/src/table.ts b/src/table.ts index bb94470af..c17bebf0d 100644 --- a/src/table.ts +++ b/src/table.ts @@ -745,10 +745,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); filter = Filter.parse(options.filter); } + let chunkTransformer: ChunkTransformer; + let rowStream: Duplex; + const userStream = new PassThrough({objectMode: true}); const end = userStream.end.bind(userStream); userStream.end = () => { rowStream?.unpipe(userStream); + if (chunkTransformer) { + chunkTransformer.cancel(); + } if (activeRequestStream) { activeRequestStream.abort(); } @@ -758,9 +764,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return end(); }; - let chunkTransformer: ChunkTransformer; - let rowStream: Duplex; - const makeNewRequest = () => { // Avoid cancelling an expired timer if user // cancelled the stream in the middle of a retry @@ -882,7 +885,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const toRowStream = new Transform({ transform: (rowData, _, next) => { if ( - chunkTransformer._destroyed || + chunkTransformer.canceled || // eslint-disable-next-line @typescript-eslint/no-explicit-any (userStream as any)._writableState.ended ) { diff --git a/test/chunktransformer.ts b/test/chunktransformer.ts index 4696d3765..a978b0a64 100644 --- a/test/chunktransformer.ts +++ b/test/chunktransformer.ts @@ -997,20 +997,6 @@ describe('Bigtable/ChunkTransformer', () => { const err = callback.getCall(0).args[0]; assert(!err, 'did not expect error'); }); - it('should return when stream is destroyed', () => { - chunkTransformer._destroyed = true; - const chunks = [{key: 'key'}]; - chunkTransformer._transform({chunks}, {}, callback); - assert(!callback.called, 'unexpected call to next'); - }); - it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => { - chunkTransformer._transform( - {chunks: [], lastScannedRowKey: 'foo'}, - {}, - callback - ); - assert.deepStrictEqual(chunkTransformer.lastRowKey, 'foo'); - }); }); describe('reset', () => { it('should reset initial state', () => { diff --git a/test/streams.ts b/test/streams.ts index cc18050c6..31522b5f0 100644 --- a/test/streams.ts +++ b/test/streams.ts @@ -122,8 +122,7 @@ describe('Bigtable/Streams', () => { pipeline(readStream, transform, passThrough, () => {}); }); - // TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed - it.skip('should create read stream and read asynchronously using Transform stream', done => { + it('should create read stream and read asynchronously using Transform stream', done => { // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure const keyFrom = 0; const keyTo = 1000; @@ -218,12 +217,7 @@ describe('Bigtable/Streams', () => { }); }); - // TODO(@alexander-fenster): enable after the resumption logic is fixed. - // Currently, lastRowKey is updated in _transform (chunktransfomer.ts:155) - // https://github.com/googleapis/nodejs-bigtable/blob/436e77807e87e13f80ac2bc2c43813b09090000f/src/chunktransformer.ts#L155 - // before the record is committed, which makes it omit this record when - // the call is resumed. I believe lastRowKey should only be set in commit(). - it.skip('should silently resume after server or network error', done => { + it('should silently resume after server or network error', done => { // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure const keyFrom = 0; const keyTo = 1000;