Skip to content

Commit

Permalink
fix: properly handle asynchronous read from stream
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-fenster committed May 19, 2023
1 parent 3f2385b commit bf950d0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 40 deletions.
34 changes: 20 additions & 14 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export enum RowStateEnum {
export class ChunkTransformer extends Transform {
options: TransformOptions;
_destroyed: boolean;
_userCanceled: boolean;
lastRowKey?: Value;
state?: number;
row?: Row;
Expand All @@ -91,10 +92,15 @@ export class ChunkTransformer extends Transform {
super(options);
this.options = options;
this._destroyed = false;
this._userCanceled = false;
this.lastRowKey = undefined;
this.reset();
}

get canceled() {
return this._userCanceled;
}

/**
* called at end of the stream.
* @public
Expand Down Expand Up @@ -129,10 +135,10 @@ export class ChunkTransformer extends Transform {
* @public
*
* @param {object} data readrows response containing array of chunks.
* @param {object} [enc] encoding options.
* @param {object} [_encoding] encoding options.
* @param {callback} next callback will be called once data is processed, with error if any error in processing
*/
_transform(data: Data, enc: string, next: Function): void {
_transform(data: Data, _encoding: string, next: Function): void {
for (const chunk of data.chunks!) {
switch (this.state) {
case RowStateEnum.NEW_ROW:
Expand All @@ -147,17 +153,6 @@ export class ChunkTransformer extends Transform {
default:
break;
}
if (this._destroyed) {
return;
}
}
if (data.lastScannedRowKey && data.lastScannedRowKey.length > 0) {
this.lastRowKey = Mutation.convertFromBytes(
data.lastScannedRowKey as Bytes,
{
userOptions: this.options,
}
);
}
next();
}
Expand Down Expand Up @@ -226,7 +221,14 @@ export class ChunkTransformer extends Transform {
chunk.familyName ||
chunk.qualifier ||
(chunk.value && chunk.value.length !== 0) ||
chunk.timestampMicros! > 0;
// if it's a number
(typeof chunk.timestampMicros === 'number' &&
chunk.timestampMicros! > 0) ||
// if it's an instance of Long
(typeof chunk.timestampMicros === 'object' &&
'compare' in chunk.timestampMicros &&
typeof chunk.timestampMicros.compare === 'function' &&
chunk.timestampMicros.compare(0) === 1);
if (chunk.resetRow && containsData) {
this.destroy(
new TransformError({
Expand Down Expand Up @@ -452,4 +454,8 @@ export class ChunkTransformer extends Transform {
}
this.moveToNextState(chunk);
}

cancel(): void {
this._userCanceled = true;
}
}
11 changes: 7 additions & 4 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
filter = Filter.parse(options.filter);
}

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
rowStream?.unpipe(userStream);
if (chunkTransformer) {
chunkTransformer.cancel();
}
if (activeRequestStream) {
activeRequestStream.abort();
}
Expand All @@ -758,9 +764,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
return end();
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
// Avoid cancelling an expired timer if user
// cancelled the stream in the middle of a retry
Expand Down Expand Up @@ -882,7 +885,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const toRowStream = new Transform({
transform: (rowData, _, next) => {
if (
chunkTransformer._destroyed ||
chunkTransformer.canceled ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(userStream as any)._writableState.ended
) {
Expand Down
14 changes: 0 additions & 14 deletions test/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,20 +997,6 @@ describe('Bigtable/ChunkTransformer', () => {
const err = callback.getCall(0).args[0];
assert(!err, 'did not expect error');
});
it('should return when stream is destroyed', () => {
chunkTransformer._destroyed = true;
const chunks = [{key: 'key'}];
chunkTransformer._transform({chunks}, {}, callback);
assert(!callback.called, 'unexpected call to next');
});
it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => {
chunkTransformer._transform(
{chunks: [], lastScannedRowKey: 'foo'},
{},
callback
);
assert.deepStrictEqual(chunkTransformer.lastRowKey, 'foo');
});
});
describe('reset', () => {
it('should reset initial state', () => {
Expand Down
10 changes: 2 additions & 8 deletions test/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ describe('Bigtable/Streams', () => {
pipeline(readStream, transform, passThrough, () => {});
});

// TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed
it.skip('should create read stream and read asynchronously using Transform stream', done => {
it('should create read stream and read asynchronously using Transform stream', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
Expand Down Expand Up @@ -218,12 +217,7 @@ describe('Bigtable/Streams', () => {
});
});

// TODO(@alexander-fenster): enable after the resumption logic is fixed.
// Currently, lastRowKey is updated in _transform (chunktransfomer.ts:155)
// https://github.com/googleapis/nodejs-bigtable/blob/436e77807e87e13f80ac2bc2c43813b09090000f/src/chunktransformer.ts#L155
// before the record is committed, which makes it omit this record when
// the call is resumed. I believe lastRowKey should only be set in commit().
it.skip('should silently resume after server or network error', done => {
it('should silently resume after server or network error', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
Expand Down

0 comments on commit bf950d0

Please sign in to comment.