Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly handle asynchronous read from stream #1284

Merged
merged 18 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export enum RowStateEnum {
export class ChunkTransformer extends Transform {
options: TransformOptions;
_destroyed: boolean;
_userCanceled: boolean;
lastRowKey?: Value;
state?: number;
row?: Row;
Expand All @@ -91,10 +92,15 @@ export class ChunkTransformer extends Transform {
super(options);
this.options = options;
this._destroyed = false;
this._userCanceled = false;
this.lastRowKey = undefined;
this.reset();
}

get canceled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we expose ChunkTransformer as part of our public api. Which I think is incorrect to begin with. To minimize further leakage of implementation details, can we mark this method as internal?

return this._userCanceled;
}

/**
* called at end of the stream.
* @public
Expand Down Expand Up @@ -129,10 +135,10 @@ export class ChunkTransformer extends Transform {
* @public
*
* @param {object} data readrows response containing array of chunks.
* @param {object} [enc] encoding options.
* @param {object} [_encoding] encoding options.
* @param {callback} next callback will be called once data is processed, with error if any error in processing
*/
_transform(data: Data, enc: string, next: Function): void {
_transform(data: Data, _encoding: string, next: Function): void {
for (const chunk of data.chunks!) {
switch (this.state) {
case RowStateEnum.NEW_ROW:
Expand All @@ -147,17 +153,6 @@ export class ChunkTransformer extends Transform {
default:
break;
}
if (this._destroyed) {
return;
}
}
if (data.lastScannedRowKey && data.lastScannedRowKey.length > 0) {
this.lastRowKey = Mutation.convertFromBytes(
data.lastScannedRowKey as Bytes,
{
userOptions: this.options,
}
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks request resumption logic in makeNewRequest

const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastRowKey is set when committing, it should not be set here because the row is still incomplete at this moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my fault, I forgot to mention one other detail of ReadRows protocol, there are 2 ways that a readrows resumption request can be built:

  1. based on the last received row key of a committed row chunk. ( This one you know about)
  2. As a heartbeat sent via an empty ReadRowsResponse. This is will be used when a caller has very restrictive filters that cause the scan to omit entire tablets. The protocol allows for the server to emit a heartbeat with the last scanned (as opposed to returned) row key

This is not currently enable on the serverside, but is specified by the protocol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this

next();
}
Expand Down Expand Up @@ -226,7 +221,14 @@ export class ChunkTransformer extends Transform {
chunk.familyName ||
chunk.qualifier ||
(chunk.value && chunk.value.length !== 0) ||
chunk.timestampMicros! > 0;
// if it's a number
(typeof chunk.timestampMicros === 'number' &&
chunk.timestampMicros! > 0) ||
// if it's an instance of Long
(typeof chunk.timestampMicros === 'object' &&
'compare' in chunk.timestampMicros &&
typeof chunk.timestampMicros.compare === 'function' &&
chunk.timestampMicros.compare(0) === 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit strange, chunk is a protobuf, so shouldnt it have a stable type?

Copy link
Contributor Author

@alexander-fenster alexander-fenster May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the specifics of protobufjs handling 64 bit integers. In JavaScript, all numbers are 64-bit IEEE 754 floats, so they can reliably represent up to 2⁵³ – 1. So protobufjs accepts and may emit an object of type Long (from https://www.npmjs.com/package/long). As far as I understand in this case it's always number in practice, but in my clean workspace VSCode shows error here, so should be safe to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please leave a comment in the code explaining this

if (chunk.resetRow && containsData) {
this.destroy(
new TransformError({
Expand Down Expand Up @@ -452,4 +454,8 @@ export class ChunkTransformer extends Transform {
}
this.moveToNextState(chunk);
}

cancel(): void {
this._userCanceled = true;
}
}
11 changes: 7 additions & 4 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
filter = Filter.parse(options.filter);
}

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
rowStream?.unpipe(userStream);
if (chunkTransformer) {
chunkTransformer.cancel();
}
if (activeRequestStream) {
activeRequestStream.abort();
}
Expand All @@ -758,9 +764,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
return end();
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
// Avoid cancelling an expired timer if user
// cancelled the stream in the middle of a retry
Expand Down Expand Up @@ -882,7 +885,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const toRowStream = new Transform({
transform: (rowData, _, next) => {
if (
chunkTransformer._destroyed ||
chunkTransformer.canceled ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are only 2 things that care about the cancelled flag:

  • the closure for userstream.end()
  • the closure for toRowStream

ChunkTransformer seems like an innocent bystander here. Why not move the userCancelled flag as a local var in createReadStream(), whose scope is shared by the 2 closure that care about the flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, maybe create a subclass for userStream that holds the state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the flag closer to the consumer makes sense to me, I'll try to do it. I only put it here to replace the _destroyed logic, but you are right, it will make more sense downstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// eslint-disable-next-line @typescript-eslint/no-explicit-any
(userStream as any)._writableState.ended
) {
Expand Down
14 changes: 0 additions & 14 deletions test/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,20 +997,6 @@ describe('Bigtable/ChunkTransformer', () => {
const err = callback.getCall(0).args[0];
assert(!err, 'did not expect error');
});
it('should return when stream is destroyed', () => {
chunkTransformer._destroyed = true;
const chunks = [{key: 'key'}];
chunkTransformer._transform({chunks}, {}, callback);
assert(!callback.called, 'unexpected call to next');
});
it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => {
chunkTransformer._transform(
{chunks: [], lastScannedRowKey: 'foo'},
{},
callback
);
assert.deepStrictEqual(chunkTransformer.lastRowKey, 'foo');
});
});
describe('reset', () => {
it('should reset initial state', () => {
Expand Down
7 changes: 6 additions & 1 deletion test/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ describe('Bigtable/Mutation', () => {
assert.strictEqual(decoded.toString(), message);
});

it('should not create a new Buffer needlessly', () => {
it('should not create a new Buffer needlessly', function () {
if (process.platform === 'win32') {
// stubbing Buffer.from does not work on Windows since sinon 15.1.0
// TODO(@alexander-fenster): investigate and report or fix
this.skip();
}
const message = 'Hello!';
const encoded = Buffer.from(message);
const stub = sandbox.stub(Buffer, 'from');
Expand Down
Loading