Skip to content

Commit

Permalink
fix: ensure that during resumption of a scan, rows that have not been…
Browse files Browse the repository at this point in the history
… observed by the caller are re-requested (#1444)

* fix: dropping buffered rows during a retry of a scan

createReadStream() creates a pipeline of streams that converts a stream of row chunks into a stream of logical rows. It also has logic to handle stream resumption when a single attempt fails.
The pipeline can be split into 2 parts: the persistent operation stream that the caller sees and the transient per attempt segment. When a retry attempt occurs, the per attempt segment is unpiped from the operation stream and is discarded. Currently this includes any buffered data that each stream might contain. Unfortunately, when constructing the retry request, createReadStream() will use the last row key from the last buffered row. This will cause the buffered rows to be omitted from the operation stream.

This PR fixes the missing rows part by only referencing the row keys that were seen by the persistent operation stream when constructing a retry attempt. In other words, this will ensure that we only update the lastSeenRow key once the row has been "committed" to the persistent portion of the pipeline

* Add a test that sends rows back

# Conflicts:
#	system-test/read-rows.ts

* Create an instance with dummy server.

# Conflicts:
#	system-test/read-rows.ts

* Add test parameters for sending back the right chu

* Omit server start

* Run a test against the server in the old code

* Add logging to the chunk transformer

* Add logging to indicate that the server received r

* chunk transformer logs and other logs

* Add a log

* Don’t change the old server

* Add another guard against the logs

* Add setImmediate everywhere that it needs to be

* Remove the logging

* Remove more logging

* Adjust header

* Add the high watermarks back in

* Remove the at accessor

* Eliminate the watermark adjustments

* Introduce the watermarks back in

* Reduce the number of watermark removals to 1.

* Reverted the streamEvents pipeline

* Add some comments for introducing the new waterma

* Remove comments and console logs. Add TODO.

* Add TODO

* refactor the test with a helper method

* Fix linting issue

* Adding a comment about the mock

* readable comment change

* Update test/readrows.ts

Co-authored-by: Leah E. Cole <[email protected]>

* It uses 150 rows not 1000 rows

* Add a TODO for making more specific typing

* Add some TODOs for better factoring

* Add interface: server writable stream

Use it to replace any

---------

Co-authored-by: Igor Berntein <[email protected]>
Co-authored-by: Leah E. Cole <[email protected]>
  • Loading branch information
3 people authored Jul 11, 2024
1 parent c86d456 commit 2d8de32
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 15 deletions.
14 changes: 10 additions & 4 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
let filter: {} | null;
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;

let numConsecutiveErrors = 0;
let numRequestsMade = 0;
let retryTimer: NodeJS.Timeout | null;
Expand All @@ -749,14 +749,22 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
let rowStream: Duplex;

let userCanceled = false;
// The key of the last row that was emitted by the per attempt pipeline
// Note: this must be updated from the operation level userStream to avoid referencing buffered rows that will be
// discarded in the per attempt subpipeline (rowStream)
let lastRowKey = '';
let rowsRead = 0;
const userStream = new PassThrough({
objectMode: true,
readableHighWaterMark: 0,
readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early.
writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key.
transform(row, _encoding, callback) {
if (userCanceled) {
callback();
return;
}
lastRowKey = row.id;
rowsRead++;
callback(null, row);
},
});
Expand Down Expand Up @@ -796,7 +804,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
// cancelled the stream in the middle of a retry
retryTimer = null;

const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
chunkTransformer = new ChunkTransformer({decode: options.decode} as any);

Expand Down Expand Up @@ -918,7 +925,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
) {
return next();
}
rowsRead++;
const row = this.row(rowData.key);
row.data = rowData.data;
next(null, row);
Expand Down
1 change: 0 additions & 1 deletion src/util/mock-servers/mock-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class MockServer {
`localhost:${this.port}`,
grpc.ServerCredentials.createInsecure(),
() => {
server.start();
callback ? callback(portString) : undefined;
}
);
Expand Down
56 changes: 54 additions & 2 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

import {before, describe, it} from 'mocha';
import {Bigtable, Row, Table} from '../src';
import {Bigtable, protos, Row, Table} from '../src';
import * as assert from 'assert';
import {Transform, PassThrough, pipeline} from 'stream';

Expand All @@ -22,7 +22,18 @@ import {MockServer} from '../src/util/mock-servers/mock-server';
import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service';
import {MockService} from '../src/util/mock-servers/mock-service';
import {debugLog, readRowsImpl} from './utils/readRowsImpl';
import {UntypedHandleCall} from '@grpc/grpc-js';
import {ServerWritableStream, UntypedHandleCall} from '@grpc/grpc-js';
import {readRowsImpl2} from './utils/readRowsImpl2';

type PromiseVoid = Promise<void>;
interface ServerImplementationInterface {
(
server: ServerWritableStream<
protos.google.bigtable.v2.IReadRowsRequest,
protos.google.bigtable.v2.IReadRowsResponse
>
): PromiseVoid;
}

describe('Bigtable/ReadRows', () => {
let server: MockServer;
Expand Down Expand Up @@ -317,6 +328,47 @@ describe('Bigtable/ReadRows', () => {
});
});

it('should return row data in the right order', done => {
// 150 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = undefined;
const keyTo = undefined;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 100;
const dataResults = [];

// TODO: Do not use `any` here, make it a more specific type and address downstream implications on the mock server.
service.setService({
ReadRows: readRowsImpl2(
keyFrom,
keyTo,
errorAfterChunkNo
) as ServerImplementationInterface,
});
const sleep = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
(async () => {
try {
const stream = table.createReadStream({
start: '00000000',
end: '00000150',
});

for await (const row of stream) {
dataResults.push(row.id);
await sleep(50);
}
const expectedResults = Array.from(Array(150).keys())
.map(i => '00000000' + i.toString())
.map(i => i.slice(-8));
assert.deepStrictEqual(dataResults, expectedResults);
done();
} catch (error) {
done(error);
}
})();
});

after(async () => {
server.shutdown(() => {});
});
Expand Down
30 changes: 22 additions & 8 deletions test/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,20 @@ describe('Bigtable/Table', () => {
let reqOptsCalls: any[];
let setTimeoutSpy: sinon.SinonSpy;

/*
setImmediate is required here to correctly mock events as they will
come in from the request function. It is required for tests to pass,
but it is not a problem that it is required because we never expect
a single Node event to emit data and then emit an error. That is,
a mock without setImmediate around the last error represents a scenario
that will never happen.
*/
function emitRetriableError(stream: Duplex) {
setImmediate(() => {
stream.emit('error', makeRetryableError());
});
}

beforeEach(() => {
FakeChunkTransformer.prototype._transform = function (
rows: Row[],
Expand Down Expand Up @@ -1262,7 +1276,7 @@ describe('Bigtable/Table', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
((stream: any) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
Expand All @@ -1285,7 +1299,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Writable) => {
stream.end();
Expand All @@ -1310,7 +1324,7 @@ describe('Bigtable/Table', () => {
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.push([{key: 'b'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Duplex) => {
stream.push([{key: 'c'}]);
Expand Down Expand Up @@ -1343,7 +1357,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
((stream: Duplex) => {
stream.end([{key: 'c'}]);
Expand All @@ -1362,7 +1376,7 @@ describe('Bigtable/Table', () => {
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.push([{key: 'b'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand All @@ -1381,7 +1395,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'a'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand All @@ -1395,7 +1409,7 @@ describe('Bigtable/Table', () => {
emitters = [
((stream: Duplex) => {
stream.push([{key: 'c'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand All @@ -1418,7 +1432,7 @@ describe('Bigtable/Table', () => {
((stream: Duplex) => {
stream.push([{key: 'a1'}]);
stream.push([{key: 'd'}]);
stream.emit('error', makeRetryableError());
emitRetriableError(stream);
}) as {} as EventEmitter,
];

Expand Down
Loading

0 comments on commit 2d8de32

Please sign in to comment.