Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure that during resumption of a scan, rows that have not been observed by the caller are re-requested #1444

Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9e2c037
fix: dropping buffered rows during a retry of a scan
igorbernstein2 Jul 9, 2024
2a4d928
Add a test that sends rows back
danieljbruce Jul 2, 2024
d8e2e91
Create an instance with dummy server.
danieljbruce Jul 2, 2024
d803684
Add test parameters for sending back the right chu
danieljbruce Jul 2, 2024
a8bebb1
Omit server start
danieljbruce Jul 3, 2024
fdc5c0d
Run a test against the server in the old code
danieljbruce Jul 3, 2024
80f4fca
Add logging to the chunk transformer
danieljbruce Jul 3, 2024
580d65f
Add logging to indicate that the server received r
danieljbruce Jul 3, 2024
b08288a
chunk transformer logs and other logs
danieljbruce Jul 3, 2024
e4d9b86
Add a log
danieljbruce Jul 8, 2024
9d41781
Don’t change the old server
danieljbruce Jul 8, 2024
2d9b603
Add another guard against the logs
danieljbruce Jul 8, 2024
5bad7c0
Add setImmediate everywhere that it needs to be
danieljbruce Jul 10, 2024
e61343d
Remove the logging
danieljbruce Jul 10, 2024
138d105
Remove more logging
danieljbruce Jul 10, 2024
f35fc3e
Adjust header
danieljbruce Jul 10, 2024
bf74092
Add the high watermarks back in
danieljbruce Jul 10, 2024
ff13036
Remove the at accessor
danieljbruce Jul 10, 2024
87c57a9
Eliminate the watermark adjustments
danieljbruce Jul 10, 2024
09d9155
Introduce the watermarks back in
danieljbruce Jul 10, 2024
89c7c53
Reduce the number of watermark removals to 1.
danieljbruce Jul 10, 2024
f60e790
Reverted the streamEvents pipeline
danieljbruce Jul 11, 2024
d07aa9e
Add some comments for introducing the new waterma
danieljbruce Jul 11, 2024
537760b
Remove comments and console logs. Add TODO.
danieljbruce Jul 11, 2024
96f98d7
Add TODO
danieljbruce Jul 11, 2024
7b03b84
refactor the test with a helper method
danieljbruce Jul 11, 2024
cffcd57
Fix linting issue
danieljbruce Jul 11, 2024
52675d1
Adding a comment about the mock
danieljbruce Jul 11, 2024
c62ed72
readable comment change
danieljbruce Jul 11, 2024
d78d119
Update test/readrows.ts
danieljbruce Jul 11, 2024
8c0726c
It uses 150 rows not 1000 rows
danieljbruce Jul 11, 2024
eba2195
Add a TODO for making more specific typing
danieljbruce Jul 11, 2024
0a86df7
Add some TODOs for better factoring
danieljbruce Jul 11, 2024
85901e5
Merge branch 'fix-missing-rows-with-test-and-fix-for-node-14-plus-wat…
danieljbruce Jul 11, 2024
88236d1
Add interface: server writable stream
danieljbruce Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@
let filter: {} | null;
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;

let numConsecutiveErrors = 0;
let numRequestsMade = 0;
let retryTimer: NodeJS.Timeout | null;
Expand All @@ -749,14 +749,22 @@
let rowStream: Duplex;

let userCanceled = false;
// The key of the last row that was emitted by the per attempt pipeline
// Note: this must be updated from the operation level userStream to avoid referencing buffered rows that will be
// discarded in the per attempt subpipeline (rowStream)
let lastRowKey = '';
let rowsRead = 0;
const userStream = new PassThrough({
objectMode: true,
readableHighWaterMark: 0,
readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early.
writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key.
transform(row, _encoding, callback) {
if (userCanceled) {
callback();
return;
}
lastRowKey = row.id;
rowsRead++;
callback(null, row);
},
});
Expand Down Expand Up @@ -796,7 +804,6 @@
// cancelled the stream in the middle of a retry
retryTimer = null;

const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
chunkTransformer = new ChunkTransformer({decode: options.decode} as any);

Expand All @@ -810,7 +817,7 @@
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {

Check warning on line 820 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used

Check warning on line 820 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return false;
},
};
Expand Down Expand Up @@ -918,7 +925,6 @@
) {
return next();
}
rowsRead++;
const row = this.row(rowData.key);
row.data = rowData.data;
next(null, row);
Expand Down Expand Up @@ -969,7 +975,7 @@
userStream.emit('error', error);
}
})
.on('data', _ => {

Check warning on line 978 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used
// Reset error count after a successful read so the backoff
// time won't keep increasing when as stream had multiple errors
numConsecutiveErrors = 0;
Expand Down Expand Up @@ -1592,7 +1598,7 @@
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {

Check warning on line 1601 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used

Check warning on line 1601 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return false;
},
};
Expand Down
1 change: 0 additions & 1 deletion src/util/mock-servers/mock-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class MockServer {
`localhost:${this.port}`,
grpc.ServerCredentials.createInsecure(),
() => {
server.start();
callback ? callback(portString) : undefined;
}
);
Expand Down
37 changes: 37 additions & 0 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {BigtableClientMockService} from '../src/util/mock-servers/service-implem
import {MockService} from '../src/util/mock-servers/mock-service';
import {debugLog, readRowsImpl} from './utils/readRowsImpl';
import {UntypedHandleCall} from '@grpc/grpc-js';
import {readRowsImpl2} from './utils/readRowsImpl2';

describe('Bigtable/ReadRows', () => {
let server: MockServer;
Expand Down Expand Up @@ -317,6 +318,42 @@ describe('Bigtable/ReadRows', () => {
});
});

it('should return row data in the right order', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment about backpressure still true given that the highwatermark is set to 0? if not, please remove, otherwise, feel free to resolve

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still true because we want to create the scenario where there is backpressure in the chunk transformer and other streams in order to reproduce the issue that occurs when these transforms are thrown away from before the fix. Note that this fix only applies a highwatermark of 0 to the user stream.

However, this comment still does need an adjustment from 1000 to 150 :)

const keyFrom = undefined;
const keyTo = undefined;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 100;
const dataResults = [];

service.setService({
ReadRows: readRowsImpl2(keyFrom, keyTo, errorAfterChunkNo) as any,
leahecole marked this conversation as resolved.
Show resolved Hide resolved
});
const sleep = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
(async () => {
try {
const stream = table.createReadStream({
start: '00000000',
end: '00000150',
});

for await (const row of stream) {
dataResults.push(row.id);
await sleep(50);
}
const expectedResults = Array.from(Array(150).keys())
.map(i => '00000000' + i.toString())
.map(i => i.slice(-8));
assert.deepStrictEqual(dataResults, expectedResults);
done();
} catch (error) {
done(error);
}
})();
});

after(async () => {
server.shutdown(() => {});
});
Expand Down
30 changes: 22 additions & 8 deletions test/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,20 @@ describe('Bigtable/Table', () => {
let reqOptsCalls: any[];
let setTimeoutSpy: sinon.SinonSpy;

/*
setImmediate is required here to correctly mock events as they will
come in from the request function. It is required for tests to pass,
but it is not a problem that it is required because we never expect
a single Node event to emit data and then emit an error. That is,
a mock without setImmediate around the last error represents a scenario
that will never happen.
*/
function emitRetriableError(stream: Duplex) {
setImmediate(() => {
stream.emit('error', makeRetryableError());
});
}

beforeEach(() => {
FakeChunkTransformer.prototype._transform = function (
rows: Row[],
Expand Down Expand Up @@ -1262,7 +1276,7 @@ describe('Bigtable/Table', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
((stream: any) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
Expand All @@ -1285,7 +1299,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
Expand All @@ -1310,7 +1324,7 @@ describe('Bigtable/Table', () => {
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.push([{key: 'b'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Duplex) => {
stream.push([{key: 'c'}]);
Expand Down Expand Up @@ -1343,7 +1357,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Duplex) => {
stream.end([{key: 'c'}]);
Expand All @@ -1362,7 +1376,7 @@ describe('Bigtable/Table', () => {
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.push([{key: 'b'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand All @@ -1381,7 +1395,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand All @@ -1395,7 +1409,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'c'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand All @@ -1418,7 +1432,7 @@ describe('Bigtable/Table', () => {
((stream: Duplex) => {
stream.push([{key: 'a1'}]);
stream.push([{key: 'd'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand Down
Loading
Loading