diff --git a/src/table.ts b/src/table.ts index b7e42bc08..492e3094f 100644 --- a/src/table.ts +++ b/src/table.ts @@ -743,6 +743,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const hasLimit = rowsLimit !== 0; let rowsRead = 0; let numConsecutiveErrors = 0; + let numRequestsMade = 0; let retryTimer: NodeJS.Timeout | null; rowKeys = options.keys || []; @@ -917,6 +918,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); reqOpts.rowsLimit = rowsLimit - rowsRead; } + options.gaxOptions = populateAttemptHeader( + numRequestsMade, + options.gaxOptions + ); + const requestStream = this.bigtable.request({ client: 'BigtableClient', method: 'readRows', @@ -970,6 +976,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); return; } numConsecutiveErrors++; + numRequestsMade++; if ( numConsecutiveErrors <= maxRetries && (RETRYABLE_STATUS_CODES.has(error.code) || isRstStreamError(error)) @@ -1614,6 +1621,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }, }; + options.gaxOptions = populateAttemptHeader( + numRequestsMade, + options.gaxOptions + ); + this.bigtable .request({ client: 'BigtableClient', @@ -2072,6 +2084,14 @@ function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) { return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis); } +function populateAttemptHeader(attempt: number, gaxOpts?: CallOptions) { + gaxOpts = gaxOpts || {}; + gaxOpts.otherArgs = gaxOpts.otherArgs || {}; + gaxOpts.otherArgs.headers = gaxOpts.otherArgs.headers || {}; + gaxOpts.otherArgs.headers['bigtable-attempt'] = attempt; + return gaxOpts; +} + export interface GoogleInnerError { reason?: string; message?: string; diff --git a/test/table.ts b/test/table.ts index d426d0999..f21596716 100644 --- a/test/table.ts +++ b/test/table.ts @@ -27,7 +27,7 @@ import {Family} from '../src/family.js'; import {Mutation} from '../src/mutation.js'; import {Row} from '../src/row.js'; import * as tblTypes from '../src/table'; -import {Bigtable} from '../src'; +import {Bigtable, RequestOptions} from '../src'; import {EventEmitter} from 'events'; const sandbox = sinon.createSandbox(); @@ -544,7 +544,9 @@ describe('Bigtable/Table', () => { assert.strictEqual(config.method, 'readRows'); assert.strictEqual(config.reqOpts.tableName, TABLE_NAME); assert.strictEqual(config.reqOpts.appProfileId, undefined); - assert.strictEqual(config.gaxOpts, undefined); + assert.deepStrictEqual(config.gaxOpts, { + otherArgs: {headers: {'bigtable-attempt': 0}}, + }); done(); }; table.createReadStream(); @@ -2570,6 +2572,7 @@ describe('Bigtable/Table', () => { let fakeStatuses: any; // eslint-disable-next-line @typescript-eslint/no-explicit-any let entryRequests: any; + const requestArgs: RequestOptions[] = []; beforeEach(() => { entryRequests = []; @@ -2599,6 +2602,7 @@ describe('Bigtable/Table', () => { ]; // eslint-disable-next-line @typescript-eslint/no-explicit-any table.bigtable.request = (config: any) => { + requestArgs.push(JSON.parse(JSON.stringify(config))); entryRequests.push(config.reqOpts.entries); const stream = new PassThrough({ objectMode: true, @@ -2612,6 +2616,25 @@ describe('Bigtable/Table', () => { }; }); + it('should send attempt header', done => { + table.mutate(entries, () => { + assert.strictEqual(requestArgs.length, 2); + assert.strictEqual( + (requestArgs[0].gaxOpts as any)['otherArgs']['headers'][ + 'bigtable-attempt' + ], + 0 + ); + assert.strictEqual( + (requestArgs[1].gaxOpts as any)['otherArgs']['headers'][ + 'bigtable-attempt' + ], + 1 + ); + done(); + }); + }); + it('should succeed after a retry', done => { table.maxRetries = 1; table.mutate(entries, done); @@ -2630,6 +2653,7 @@ describe('Bigtable/Table', () => { describe('rpc level retries', () => { let emitters: EventEmitter[] | null; // = [((stream: Writable) => { stream.push([{ key: 'a' }]); + let requestArgs: RequestOptions[] = []; // eslint-disable-next-line @typescript-eslint/no-explicit-any let entryRequests: any; @@ -2637,10 +2661,12 @@ describe('Bigtable/Table', () => { beforeEach(() => { emitters = null; // This needs to be assigned in each test case. + requestArgs = []; entryRequests = []; // eslint-disable-next-line @typescript-eslint/no-explicit-any table.bigtable.request = (config: any) => { + requestArgs.push(JSON.parse(JSON.stringify(config))); entryRequests.push(config.reqOpts.entries); const stream = new PassThrough({ objectMode: true, @@ -2707,6 +2733,36 @@ describe('Bigtable/Table', () => { done(); }); }); + + it('should send attempt header', done => { + const error = new Error('retryable') as ServiceError; + error.code = 14; // Unavailable + emitters = [ + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.end(); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(requestArgs.length, 2); + assert.strictEqual( + (requestArgs[0].gaxOpts as any)['otherArgs']['headers'][ + 'bigtable-attempt' + ], + 0 + ); + assert.strictEqual( + (requestArgs[1].gaxOpts as any)['otherArgs']['headers'][ + 'bigtable-attempt' + ], + 1 + ); + done(); + }); + }); }); });