From 485b33d88832e87708fd1884756924b3e0b806ef Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Mon, 25 Mar 2024 16:02:32 -0400 Subject: [PATCH 01/26] feat: add wrapper for reading table data using Storage API --- package.json | 5 +- src/index.ts | 7 + src/managedwriter/index.ts | 2 +- src/managedwriter/stream_connection.ts | 2 +- src/reader/index.ts | 30 ++++ src/reader/read_client.ts | 178 +++++++++++++++++++ src/reader/read_stream.ts | 234 +++++++++++++++++++++++++ src/reader/table_reader.ts | 158 +++++++++++++++++ src/{managedwriter => util}/logger.ts | 0 9 files changed, 612 insertions(+), 4 deletions(-) create mode 100644 src/reader/index.ts create mode 100644 src/reader/read_client.ts create mode 100644 src/reader/read_stream.ts create mode 100644 src/reader/table_reader.ts rename src/{managedwriter => util}/logger.ts (100%) diff --git a/package.json b/package.json index a3bd859b..f81ca5da 100644 --- a/package.json +++ b/package.json @@ -27,9 +27,10 @@ "precompile": "gts clean" }, "dependencies": { + "apache-arrow": "^15.0.2", "extend": "^3.0.2", - "google-gax": "^4.3.1", - "google-auth-library": "^9.6.3" + "google-auth-library": "^9.6.3", + "google-gax": "^4.3.1" }, "peerDependencies": { "protobufjs": "^7.2.4" diff --git a/src/index.ts b/src/index.ts index 8733d044..513e326c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,6 +19,7 @@ import * as v1 from './v1'; import * as v1beta1 from './v1beta1'; import * as managedwriter from './managedwriter'; +import * as reader from './reader'; const BigQueryReadClient = v1.BigQueryReadClient; type BigQueryReadClient = v1.BigQueryReadClient; const BigQueryWriteClient = v1.BigQueryWriteClient; @@ -27,6 +28,8 @@ const BigQueryStorageClient = v1beta1.BigQueryStorageClient; type BigQueryStorageClient = v1beta1.BigQueryStorageClient; const WriterClient = managedwriter.WriterClient; type WriterClient = managedwriter.WriterClient; +const ReadClient = reader.ReadClient; +type ReadClient = reader.ReadClient; export { v1, BigQueryReadClient, @@ -35,6 +38,8 @@ export { BigQueryWriteClient, managedwriter, WriterClient, + reader, + ReadClient, }; // For compatibility with JavaScript libraries we need to provide this default export: // tslint:disable-next-line no-default-export @@ -44,6 +49,8 @@ export default { BigQueryWriteClient, managedwriter, WriterClient, + reader, + ReadClient, }; import * as protos from '../protos/protos'; export {protos}; diff --git a/src/managedwriter/index.ts b/src/managedwriter/index.ts index 0ce72a8e..fd33b53d 100644 --- a/src/managedwriter/index.ts +++ b/src/managedwriter/index.ts @@ -34,4 +34,4 @@ export { PendingStream, } from './stream_types'; export {parseStorageErrors} from './error'; -export {setLogFunction} from './logger'; +export {setLogFunction} from '../util/logger'; diff --git a/src/managedwriter/stream_connection.ts b/src/managedwriter/stream_connection.ts index f0c91d7a..3dceff4e 100644 --- a/src/managedwriter/stream_connection.ts +++ b/src/managedwriter/stream_connection.ts @@ -18,7 +18,7 @@ import * as protos from '../../protos/protos'; import {WriterClient} from './writer_client'; import {PendingWrite} from './pending_write'; -import {logger} from './logger'; +import {logger} from '../util/logger'; import {parseStorageErrors} from './error'; type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema; diff --git a/src/reader/index.ts b/src/reader/index.ts new file mode 100644 index 00000000..8f66dd6d --- /dev/null +++ b/src/reader/index.ts @@ -0,0 +1,30 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Package reader provides an EXPERIMENTAL thick client around the + * BigQuery storage API's BigQueryReadClient. + * More information about this new read client may also be found in + * the public documentation: https://cloud.google.com/bigquery/docs/read-api + * + * It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this + * package may still make breaking changes to existing methods and functionality. + * + * @namespace reader + */ + +export {ReadClient} from './read_client'; +export {TableReader} from './table_reader'; +export {ReadStream} from './read_stream'; +export {setLogFunction} from '../util/logger'; diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts new file mode 100644 index 00000000..35550566 --- /dev/null +++ b/src/reader/read_client.ts @@ -0,0 +1,178 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as gax from 'google-gax'; +import type {CallOptions, ClientOptions} from 'google-gax'; +import * as protos from '../../protos/protos'; + +import {BigQueryReadClient} from '../v1'; +import bigquery from '@google-cloud/bigquery/build/src/types'; +import {Table} from '@google-cloud/bigquery'; +import {ReadStream} from './read_stream'; +import {TableReader} from './table_reader'; + +type CreateReadSessionRequest = + protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; + +/** + * BigQuery Read API Client. + * The Read API can be used to read data to BigQuery. + * + * This class provides the ability to make remote calls to the backing service through method + * calls that map to API methods. + * + * For supplementary information about the Read API, see: + * https://cloud.google.com/bigquery/docs/read-api + * + * @class + * @memberof reader + */ +export class ReadClient { + private _client: BigQueryReadClient; + private _open: boolean; + + constructor(opts?: ClientOptions) { + const baseOptions = { + 'grpc.keepalive_time_ms': 30 * 1000, + 'grpc.keepalive_timeout_ms': 10 * 1000, + }; + this._client = new BigQueryReadClient({ + ...baseOptions, + ...opts, + }); + this._open = false; + } + + /** + * Initialize the client. + * Performs asynchronous operations (such as authentication) and prepares the client. + * This function will be called automatically when any class method is called for the + * first time, but if you need to initialize it before calling an actual method, + * feel free to call initialize() directly. + * + * You can await on this method if you want to make sure the client is initialized. + * + * @returns {Promise} A promise that resolves when auth is complete. + */ + initialize = async (): Promise => { + await this._client.initialize(); + this._open = true; + }; + + getClient = (): BigQueryReadClient => { + return this._client; + }; + + setClient = (client: BigQueryReadClient): void => { + this._client = client; + }; + + /** + * Check if client is open and ready to send requests. + */ + isOpen(): boolean { + return this._open; + } + + /** + * Creates a write stream to the given table. + * Additionally, every table has a special stream named DefaultStream + * to which data can be written. This stream doesn't need to be created using + * createWriteStream. It is a stream that can be used simultaneously by any + * number of clients. Data written to this stream is considered committed as + * soon as an acknowledgement is received. + * + * @param {Object} request + * The request object that will be sent. + * @param {bigquery.ITableReference} request.table + * Reference to the table to which the stream belongs, in the format + * of `projects/{project}/datasets/{dataset}/tables/{table}`. + * @returns {Promise}} - The promise which resolves to the streamId. + */ + async createReadSession(request: { + table: bigquery.ITableReference; + }): Promise { + await this.initialize(); + const {table} = request; + const maxWorkerCount = 1; + const maxStreamCount = 0; + const createReq: CreateReadSessionRequest = { + parent: `projects/${table.projectId}`, + readSession: { + table: `projects/${table.projectId}/datasets/${table.datasetId}/tables/${table.tableId}`, + dataFormat: 'ARROW', + }, + preferredMinStreamCount: maxWorkerCount, + maxStreamCount: maxStreamCount, + }; + console.log('[read client] create session req', createReq); + const [response] = await this._client.createReadSession(createReq); + if (typeof [response] === undefined) { + throw new gax.GoogleError(`${response}`); + } + try { + return response; + } catch { + throw new Error('Stream connection failed'); + } + } + + /** + * Creates a write stream to the given table. + * Additionally, every table has a special stream named DefaultStream + * to which data can be written. This stream doesn't need to be created using + * createWriteStream. It is a stream that can be used simultaneously by any + * number of clients. Data written to this stream is considered committed as + * soon as an acknowledgement is received. + * + * @param {Object} request + * The request object that will be sent. + * @param {string} request.streamName + * Required. The type of stream to create. + * @param {string} request.offset + * Required. Reference to the table to which the stream belongs, in the format + * of `projects/{project}/datasets/{dataset}/tables/{table}`. + * @returns {Promise}} - The promise which resolves to the streamId. + */ + async createReadStream( + request: { + streamName: string; + session: ReadSession, + }, + options?: CallOptions + ): Promise { + await this.initialize(); + const {streamName, session} = request; + try { + const stream = new ReadStream(streamName, session, this, options); + return stream; + } catch (err) { + throw new Error('read stream connection failed:' + err); + } + } + + async createTableReader(request: {table: Table}): Promise { + await this.initialize(); + const reader = new TableReader(this, { + table: request.table, + }); + return reader; + } + + close() { + this._client.close(); + this._open = false; + } +} diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts new file mode 100644 index 00000000..b4b9a289 --- /dev/null +++ b/src/reader/read_stream.ts @@ -0,0 +1,234 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as gax from 'google-gax'; +import { + Schema, + tableFromIPC, + RecordBatchReader, + Uint8, + AsyncMessageReader, + MessageReader, +} from 'apache-arrow'; +import * as protos from '../../protos/protos'; + +import {ReadClient} from './read_client'; +import {logger} from '../util/logger'; +import {Readable} from 'stream'; +import {TableRow} from '@google-cloud/bigquery'; + +type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; + +export type RemoveListener = { + off: () => void; +}; + +/** + * ReadStream is responsible for reading data from a GRPC read stream + * connection against the Storage Read API readRows method. + * + * @class + * @extends EventEmitter + * @memberof reader + */ +export class ReadStream extends Readable { + private _streamName: string; + private _offset: number; + private _maxRows: number; + private _readClient: ReadClient; + private _session: ReadSession; + private _arrowSchema: Schema; + private _connection?: gax.CancellableStream | null; + private _callOptions?: gax.CallOptions; + + constructor( + streamName: string, + session: ReadSession, + readClient: ReadClient, + options?: gax.CallOptions + ) { + super({ + objectMode: true, + }); + this._streamName = streamName; + this._session = session; + this._offset = 0; + this._maxRows = 10; + this._readClient = readClient; + this._callOptions = options; + this.open(); + + const buf = Buffer.from( + this._session.arrowSchema?.serializedSchema as Uint8Array + ); + const schemaReader = new MessageReader(buf); + const schema = schemaReader.readSchema(false); + this.trace('schema', schema); + this._arrowSchema = schema!; + } + + _read(size?: number | undefined) { + if (this.readableLength === 0) { + this.trace('read called with zero rows', this._connection?.isPaused()); + if (this._connection && this._connection.isPaused()) { + this._connection.resume(); + this._connection.read(); + } + return null; + } + this.trace('read called with existing rows', this.isPaused()); + return undefined; + } + + open() { + if (this.isOpen()) { + this.close(); + } + const client = this._readClient.getClient(); + const connection = client.readRows( + { + readStream: this._streamName, + offset: this._offset, + }, + this._callOptions + ); + this._connection = connection; + this._connection.on('data', this.handleData); + this._connection.on('error', this.handleError); + this._connection.on('close', () => { + this.trace('connection closed'); + }); + this._connection.on('pause', () => { + this.trace('connection paused'); + }); + this._connection.on('resume', () => { + this.trace('connection resumed'); + this.resume(); + }); + this._connection.on('end', () => { + this.trace('connection ended'); + this.push(null); + this.close(); + }); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger( + 'read_stream', + `[streamName: ${this._streamName}]`, + msg, + ...otherArgs + ); + } + + private handleError = (err: gax.GoogleError) => { + this.trace('on error', err, JSON.stringify(err)); + if (this.shouldRetry(err)) { + this.reconnect(); + return; + } + this.emit('error', err); + }; + + private shouldRetry(err: gax.GoogleError): boolean { + const reconnectionErrorCodes = [ + gax.Status.ABORTED, + gax.Status.CANCELLED, + gax.Status.DEADLINE_EXCEEDED, + gax.Status.FAILED_PRECONDITION, + gax.Status.INTERNAL, + gax.Status.UNAVAILABLE, + ]; + return !!err.code && reconnectionErrorCodes.includes(err.code); + } + + private handleData = (response: ReadRowsResponse) => { + //this.trace('data arrived', response); + if ( + response.arrowRecordBatch && + response.arrowRecordBatch.serializedRecordBatch && + response.rowCount + ) { + const rowCount = parseInt(response.rowCount as string, 10); + const batch = response.arrowRecordBatch; + this.trace( + 'found ', + rowCount, + ' rows serialized in ', + batch.serializedRecordBatch?.length, + 'bytes', + this.readableFlowing + ); + + this._offset += rowCount; + + // TODO: parse arrow data + const row: TableRow = { + f: [ + {v: 'test'}, + {v: batch.serializedRecordBatch?.length + ' bytes'}, + {v: rowCount}, + ], + }; + this.resume(); + this.push(row); + if (this.readableLength > this._maxRows) { + this._connection?.pause(); + } + } + }; + + /** + * Get the name of the read stream associated with this connection. + */ + getStreamName = (): string => { + return this._streamName; + }; + + getRowsStream(): Readable { + return this; + } + + /** + * Check if connection is open and ready to read data. + */ + isOpen(): boolean { + return !!this._connection; + } + + /** + * Reconnect and re-open readRows channel. + */ + reconnect() { + this.trace('reconnect called'); + this.close(); + this.open(); + } + + /** + * Close the read stream connection. + */ + close() { + if (!this._connection) { + return; + } + this._connection.end(); + this._connection.removeAllListeners(); + this._connection = null; + } +} diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts new file mode 100644 index 00000000..1b6ed095 --- /dev/null +++ b/src/reader/table_reader.ts @@ -0,0 +1,158 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ReadStream} from './read_stream'; +import bigquery from '@google-cloud/bigquery/build/src/types'; +import { + BigQuery, + Table, + GetRowsOptions, + RowsResponse, + TableRow, +} from '@google-cloud/bigquery'; +import {ReadClient} from './read_client'; +import {Readable} from 'stream'; +import {logger} from '../util/logger'; + +/** + * A BigQuery Storage API Reader that can be used to reader data into BigQuery Table + * using the Storage API. + * + * @class + * @memberof reader + */ +export class TableReader { + //private _readSession: ReadSession; + private _readClient: ReadClient; + private _readStreams: ReadStream[]; + private _table: Table; + + /** + * Creates a new Reader instance. + * + * @param {Object} params - The parameters for the JSONWriter. + * @param {bigquery.ITableReference} params.table - The stream connection + * to the BigQuery streaming insert operation. + */ + constructor( + readClient: ReadClient, + params: { + bigQuery?: BigQuery; + tableRef?: bigquery.ITableReference; + table?: Table; + } + ) { + const {table, tableRef, bigQuery} = params; + if (tableRef && bigQuery) { + this._table = bigQuery + .dataset(tableRef.datasetId!, { + projectId: tableRef.projectId, + }) + .table(tableRef.tableId!); + } else if (table) { + this._table = table; + } else { + throw new Error('missing table'); + } + this._readClient = readClient; + this._readStreams = []; + //this._readSession = new ReadSession(); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger( + 'table_reader', + `[streamName: ${this._table.id}]`, + msg, + ...otherArgs + ); + } + + /** + * @callback RowsCallback + * @param {?Error} err Request error, if any. + * @param {array} rows The rows. + * @param {object} apiResponse The full API response. + */ + /** + * @typedef {array} RowsResponse + * @property {array} 0 The rows. + */ + async getRows(options?: GetRowsOptions): Promise { + this.trace('getRows', options); + const session = await this._readClient.createReadSession({ + table: { + projectId: this._table.dataset.projectId, + datasetId: this._table.dataset.id, + tableId: this._table.id, + }, + }); + this.trace('session created', session.name, session.streams); + + const [md] = (await this._table.getMetadata({ + view: 'BASIC', + })) as bigquery.ITable[]; + + this._readStreams = []; + for (const readStream of session.streams || []) { + const r = await this._readClient.createReadStream( + { + streamName: readStream.name!, + session, + }, + options + ); + this._readStreams.push(r); + } + + async function* mergeStreams(readables: Readable[]) { + for (const readable of readables) { + for await (const chunk of readable) { + yield chunk; + } + } + } + const joined = Readable.from(mergeStreams(this._readStreams)); + + this.trace('joined streams', joined); + + return new Promise((resolve, reject) => { + const rows: TableRow[] = []; + joined.on('readable', () => { + let data; + while ((data = joined.read()) !== null) { + this.trace('row arrived', data); + rows.push(data); + } + }); + joined.on('error', err => { + this.trace('reject called on joined stream', err); + reject(err); + }); + joined.on('end', () => { + this.trace('resolve called on joined stream'); + const parsed = BigQuery.mergeSchemaWithRows_(md.schema!, rows, { + wrapIntegers: options?.wrapIntegers || false, + parseJSON: options?.parseJSON, + }); + resolve([parsed, null, {}]); + }); + }); + } + + close() { + // this._readSession.close(); + } +} diff --git a/src/managedwriter/logger.ts b/src/util/logger.ts similarity index 100% rename from src/managedwriter/logger.ts rename to src/util/logger.ts From 39d33704b9d344bd3b325c5da76b47e844cfe343 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 28 Mar 2024 16:59:46 -0400 Subject: [PATCH 02/26] feat: parse arrow record batches and convert to TableRow --- samples/read_rows.js | 75 +++++++++++++++++++++++++++++++++++++++ src/reader/read_client.ts | 2 +- src/reader/read_stream.ts | 50 +++++++++++--------------- 3 files changed, 96 insertions(+), 31 deletions(-) create mode 100644 samples/read_rows.js diff --git a/samples/read_rows.js b/samples/read_rows.js new file mode 100644 index 00000000..11fa3876 --- /dev/null +++ b/samples/read_rows.js @@ -0,0 +1,75 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +function main( + sqlQuery = 'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null' +) { + // [START bigquerystorage_read_table] + const {reader} = require('@google-cloud/bigquery-storage'); + const {ReadClient} = reader; + const {BigQuery} = require('@google-cloud/bigquery'); + + async function readRows() { + const readClient = new ReadClient(); + const bigquery = new BigQuery(); + + try { + sqlQuery = + 'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null LIMIT 300000'; + //'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null LIMIT 1000'; + + const [job] = await bigquery.createQueryJob({ + query: sqlQuery, + location: 'US', + }); + console.log('job info: ', job.id); + const [metadata] = await job.getMetadata(); + console.log('job metadata: ', metadata.configuration); + const qconfig = metadata.configuration.query; + const dstTableRef = qconfig.destinationTable; + const table = bigquery + .dataset(dstTableRef.datasetId, { + projectId: dstTableRef.projectId, + }) + .table(dstTableRef.tableId); + + console.log('table', table.dataset.projectId, table.dataset.id, table.id); + const treader = await readClient.createTableReader({table}); + const [rows] = await treader.getRows(); + rows.forEach(row => { + const url = row['url']; + const owner = row['owner']; + const forks = row['forks']; + console.log(`url: ${url}, owner: ${owner}, ${forks} forks`); + }); + console.log('Query Results:', rows.length); + } catch (err) { + console.log(err); + } finally { + console.log('ended'); + readClient.close(); + } + } + // [END bigquerystorage_read_table] + readRows().then(() => { + console.log('Done'); + }); +} +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index 35550566..25a43c14 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -149,7 +149,7 @@ export class ReadClient { async createReadStream( request: { streamName: string; - session: ReadSession, + session: ReadSession; }, options?: CallOptions ): Promise { diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index b4b9a289..feb060c3 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -13,22 +13,13 @@ // limitations under the License. import * as gax from 'google-gax'; -import { - Schema, - tableFromIPC, - RecordBatchReader, - Uint8, - AsyncMessageReader, - MessageReader, -} from 'apache-arrow'; +import {RecordBatchReader} from 'apache-arrow'; import * as protos from '../../protos/protos'; import {ReadClient} from './read_client'; import {logger} from '../util/logger'; import {Readable} from 'stream'; -import {TableRow} from '@google-cloud/bigquery'; -type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; @@ -51,7 +42,6 @@ export class ReadStream extends Readable { private _maxRows: number; private _readClient: ReadClient; private _session: ReadSession; - private _arrowSchema: Schema; private _connection?: gax.CancellableStream | null; private _callOptions?: gax.CallOptions; @@ -71,17 +61,9 @@ export class ReadStream extends Readable { this._readClient = readClient; this._callOptions = options; this.open(); - - const buf = Buffer.from( - this._session.arrowSchema?.serializedSchema as Uint8Array - ); - const schemaReader = new MessageReader(buf); - const schema = schemaReader.readSchema(false); - this.trace('schema', schema); - this._arrowSchema = schema!; } - _read(size?: number | undefined) { + _read(_?: number | undefined) { if (this.readableLength === 0) { this.trace('read called with zero rows', this._connection?.isPaused()); if (this._connection && this._connection.isPaused()) { @@ -177,19 +159,27 @@ export class ReadStream extends Readable { this._offset += rowCount; - // TODO: parse arrow data - const row: TableRow = { - f: [ - {v: 'test'}, - {v: batch.serializedRecordBatch?.length + ' bytes'}, - {v: rowCount}, - ], - }; + const buf = Buffer.concat([ + this._session.arrowSchema?.serializedSchema as Uint8Array, + batch.serializedRecordBatch as Uint8Array, + ]); + const r = RecordBatchReader.from(buf); + for (const recordBatch of r.readAll()) { + for (const row of recordBatch) { + this.push({ + f: row.toArray().map(fieldValue => { + return { + v: fieldValue, + }; + }), + }); + } + } this.resume(); - this.push(row); + /* TODO: backpressure ? if (this.readableLength > this._maxRows) { this._connection?.pause(); - } + }*/ } }; From 1ef1c9b0d1550e965fb842a4ee58db50adfad2b5 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 2 Apr 2024 15:34:04 -0400 Subject: [PATCH 03/26] fix: set arrow to v14 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f81ca5da..a5268f0b 100644 --- a/package.json +++ b/package.json @@ -27,7 +27,7 @@ "precompile": "gts clean" }, "dependencies": { - "apache-arrow": "^15.0.2", + "apache-arrow": "^14.0.2", "extend": "^3.0.2", "google-auth-library": "^9.6.3", "google-gax": "^4.3.1" From 4bf1511712a69f4b383c9ad72927e314a35fc435 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 3 Apr 2024 15:38:37 -0400 Subject: [PATCH 04/26] feat: add bigquery as dep --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index a5268f0b..37e573f7 100644 --- a/package.json +++ b/package.json @@ -30,13 +30,13 @@ "apache-arrow": "^14.0.2", "extend": "^3.0.2", "google-auth-library": "^9.6.3", - "google-gax": "^4.3.1" + "google-gax": "^4.3.1", + "@google-cloud/bigquery": "^7.5.2" }, "peerDependencies": { "protobufjs": "^7.2.4" }, "devDependencies": { - "@google-cloud/bigquery": "^7.0.0", "@types/extend": "^3.0.4", "@types/mocha": "^9.0.0", "@types/node": "^20.0.0", From 45a4afae2e0364fb2b4a48e7fdb7998d5c4772ab Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 18 Apr 2024 12:19:04 -0400 Subject: [PATCH 05/26] feat: remove dep on @google-cloud/bigquery --- package.json | 4 +- samples/read_rows.js | 7 +- src/reader/read_client.ts | 51 +++++++++--- src/reader/table_reader.ts | 162 +++++++++++++++++++------------------ 4 files changed, 127 insertions(+), 97 deletions(-) diff --git a/package.json b/package.json index 37e573f7..f1c2e3aa 100644 --- a/package.json +++ b/package.json @@ -30,13 +30,13 @@ "apache-arrow": "^14.0.2", "extend": "^3.0.2", "google-auth-library": "^9.6.3", - "google-gax": "^4.3.1", - "@google-cloud/bigquery": "^7.5.2" + "google-gax": "^4.3.1" }, "peerDependencies": { "protobufjs": "^7.2.4" }, "devDependencies": { + "@google-cloud/bigquery": "^7.5.2", "@types/extend": "^3.0.4", "@types/mocha": "^9.0.0", "@types/node": "^20.0.0", diff --git a/samples/read_rows.js b/samples/read_rows.js index 11fa3876..8c16e7e9 100644 --- a/samples/read_rows.js +++ b/samples/read_rows.js @@ -29,7 +29,6 @@ function main( try { sqlQuery = 'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null LIMIT 300000'; - //'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null LIMIT 1000'; const [job] = await bigquery.createQueryJob({ query: sqlQuery, @@ -45,10 +44,14 @@ function main( projectId: dstTableRef.projectId, }) .table(dstTableRef.tableId); + const [md] = await table.getMetadata({ + view: 'BASIC', + }); console.log('table', table.dataset.projectId, table.dataset.id, table.id); const treader = await readClient.createTableReader({table}); - const [rows] = await treader.getRows(); + const [rawRows] = await treader.getRows(); + const rows = BigQuery.mergeSchemaWithRows_(md.schema, rawRows, {}); rows.forEach(row => { const url = row['url']; const owner = row['owner']; diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index 25a43c14..25dcfe05 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -17,14 +17,28 @@ import type {CallOptions, ClientOptions} from 'google-gax'; import * as protos from '../../protos/protos'; import {BigQueryReadClient} from '../v1'; -import bigquery from '@google-cloud/bigquery/build/src/types'; -import {Table} from '@google-cloud/bigquery'; import {ReadStream} from './read_stream'; import {TableReader} from './table_reader'; type CreateReadSessionRequest = protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; + +export type TableReference = { + /** + * Required. The ID of the dataset containing this table. + */ + datasetId?: string; + /** + * Required. The ID of the project containing this table. + */ + projectId?: string; + /** + * Required. The ID of the table. The ID can contain Unicode characters in category L (letter), M (mark), N (number), Pc (connector, including underscore), Pd (dash), and Zs (space). For more information, see [General Category](https://wikipedia.org/wiki/Unicode_character_property#General_Category). The maximum length is 1,024 characters. Certain operations allow suffixing of the table ID with a partition decorator, such as `sample_table$20190123`. + */ + tableId?: string; +}; /** * BigQuery Read API Client. @@ -96,28 +110,39 @@ export class ReadClient { * * @param {Object} request * The request object that will be sent. - * @param {bigquery.ITableReference} request.table + * @param {string} request.parent + * Parent table that all the streams should belong to, in the form + * of `projects/{project}`. + * @param {string} request.table + * Parent table that all the streams should belong to, in the form + * of `projects/{project}/datasets/{dataset}/tables/{table}`. + * @param {TableReference} request.table * Reference to the table to which the stream belongs, in the format * of `projects/{project}/datasets/{dataset}/tables/{table}`. * @returns {Promise}} - The promise which resolves to the streamId. */ async createReadSession(request: { - table: bigquery.ITableReference; + parent: string; + table: string; + dataFormat: DataFormat; + selectedFields?: string[]; }): Promise { await this.initialize(); - const {table} = request; + const {table, parent, dataFormat, selectedFields} = request; const maxWorkerCount = 1; const maxStreamCount = 0; const createReq: CreateReadSessionRequest = { - parent: `projects/${table.projectId}`, + parent, readSession: { - table: `projects/${table.projectId}/datasets/${table.datasetId}/tables/${table.tableId}`, - dataFormat: 'ARROW', + table, + dataFormat, + readOptions: { + selectedFields: selectedFields, + }, }, preferredMinStreamCount: maxWorkerCount, maxStreamCount: maxStreamCount, }; - console.log('[read client] create session req', createReq); const [response] = await this._client.createReadSession(createReq); if (typeof [response] === undefined) { throw new gax.GoogleError(`${response}`); @@ -163,11 +188,11 @@ export class ReadClient { } } - async createTableReader(request: {table: Table}): Promise { + async createTableReader(params: { + table: TableReference; + }): Promise { await this.initialize(); - const reader = new TableReader(this, { - table: request.table, - }); + const reader = new TableReader(this, params.table); return reader; } diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 1b6ed095..dd430923 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -13,17 +13,50 @@ // limitations under the License. import {ReadStream} from './read_stream'; -import bigquery from '@google-cloud/bigquery/build/src/types'; -import { - BigQuery, - Table, - GetRowsOptions, - RowsResponse, - TableRow, -} from '@google-cloud/bigquery'; -import {ReadClient} from './read_client'; +import * as protos from '../../protos/protos'; +import {TableReference, ReadClient} from './read_client'; import {Readable} from 'stream'; import {logger} from '../util/logger'; +import {ResourceStream} from '@google-cloud/paginator'; + +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; +const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; +interface ListParams { + /** + * Row limit of the table. + */ + maxResults?: number; + /** + * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; + */ + selectedFields?: string; +} +interface TableCell { + v?: any; +} +interface TableRow { + /** + * Represents a single row in the result set, consisting of one or more fields. + */ + f?: Array; +} +interface TableDataList { + /** + * Rows of results. + */ + rows?: Array; + /** + * Total rows of the entire table. In order to show default value 0 we have to present it as string. + */ + totalRows?: string; +} + +type GetRowsOptions = ListParams & { + autoPaginate?: boolean; + maxApiCalls?: number; +}; +type RowsResponse = any[] | [any[], GetRowsOptions | null, TableDataList]; /** * A BigQuery Storage API Reader that can be used to reader data into BigQuery Table @@ -33,77 +66,49 @@ import {logger} from '../util/logger'; * @memberof reader */ export class TableReader { - //private _readSession: ReadSession; private _readClient: ReadClient; private _readStreams: ReadStream[]; - private _table: Table; + private _table: TableReference; /** * Creates a new Reader instance. * * @param {Object} params - The parameters for the JSONWriter. - * @param {bigquery.ITableReference} params.table - The stream connection + * @param {TableReference} params.table - The stream connection * to the BigQuery streaming insert operation. */ - constructor( - readClient: ReadClient, - params: { - bigQuery?: BigQuery; - tableRef?: bigquery.ITableReference; - table?: Table; - } - ) { - const {table, tableRef, bigQuery} = params; - if (tableRef && bigQuery) { - this._table = bigQuery - .dataset(tableRef.datasetId!, { - projectId: tableRef.projectId, - }) - .table(tableRef.tableId!); - } else if (table) { - this._table = table; - } else { - throw new Error('missing table'); - } + constructor(readClient: ReadClient, table: TableReference) { + this._table = table; this._readClient = readClient; this._readStreams = []; - //this._readSession = new ReadSession(); } // eslint-disable-next-line @typescript-eslint/no-explicit-any private trace(msg: string, ...otherArgs: any[]) { logger( 'table_reader', - `[streamName: ${this._table.id}]`, + `[table: ${this._table.tableId}]`, msg, ...otherArgs ); } - /** - * @callback RowsCallback - * @param {?Error} err Request error, if any. - * @param {array} rows The rows. - * @param {object} apiResponse The full API response. - */ - /** - * @typedef {array} RowsResponse - * @property {array} 0 The rows. - */ - async getRows(options?: GetRowsOptions): Promise { - this.trace('getRows', options); + async getRowsStream( + options?: GetRowsOptions + ): Promise<[ResourceStream, ReadSession]> { + this.trace('getRowsStream', options); const session = await this._readClient.createReadSession({ - table: { - projectId: this._table.dataset.projectId, - datasetId: this._table.dataset.id, - tableId: this._table.id, - }, + parent: `projects/${this._table.projectId}`, + table: `projects/${this._table.projectId}/datasets/${this._table.datasetId}/tables/${this._table.tableId}`, + dataFormat: DataFormat.ARROW, + selectedFields: options?.selectedFields?.split(','), }); - this.trace('session created', session.name, session.streams); - - const [md] = (await this._table.getMetadata({ - view: 'BASIC', - })) as bigquery.ITable[]; + this.trace( + 'session created', + session.name, + session.streams, + session.estimatedRowCount + ); this._readStreams = []; for (const readStream of session.streams || []) { @@ -125,34 +130,31 @@ export class TableReader { } } const joined = Readable.from(mergeStreams(this._readStreams)); - this.trace('joined streams', joined); + const stream = joined as ResourceStream; + return [stream, session]; + } - return new Promise((resolve, reject) => { - const rows: TableRow[] = []; - joined.on('readable', () => { - let data; - while ((data = joined.read()) !== null) { - this.trace('row arrived', data); - rows.push(data); - } - }); - joined.on('error', err => { - this.trace('reject called on joined stream', err); - reject(err); - }); - joined.on('end', () => { - this.trace('resolve called on joined stream'); - const parsed = BigQuery.mergeSchemaWithRows_(md.schema!, rows, { - wrapIntegers: options?.wrapIntegers || false, - parseJSON: options?.parseJSON, - }); - resolve([parsed, null, {}]); - }); - }); + /** + * @callback RowsCallback + * @param {?Error} err Request error, if any. + * @param {array} rows The rows. + * @param {object} apiResponse The full API response. + */ + /** + * @typedef {array} RowsResponse + * @property {array} 0 The rows. + */ + async getRows(options?: GetRowsOptions): Promise { + this.trace('getRows', options); + const [stream] = await this.getRowsStream(options); + const rows = await stream.toArray(); + return rows; } close() { - // this._readSession.close(); + this._readStreams.forEach(rs => { + rs.close(); + }); } } From aa57c03232206b27f782de52cdfeb65d893b7f99 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 18 Apr 2024 12:24:58 -0400 Subject: [PATCH 06/26] fix: Stream.toArray not available on node < 17 --- src/reader/table_reader.ts | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index dd430923..ada53832 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -147,9 +147,25 @@ export class TableReader { */ async getRows(options?: GetRowsOptions): Promise { this.trace('getRows', options); - const [stream] = await this.getRowsStream(options); - const rows = await stream.toArray(); - return rows; + const [stream, session] = await this.getRowsStream(options); + return new Promise((resolve, reject) => { + const rows: TableRow[] = []; + stream.on('readable', () => { + let data; + while ((data = stream.read()) !== null) { + this.trace('row arrived', data); + rows.push(data); + } + }); + stream.on('error', err => { + this.trace('reject called on joined stream', err); + reject(err); + }); + stream.on('end', () => { + this.trace('resolve called on joined stream'); + resolve([rows, session]); + }); + }); } close() { From 72afe004fb367dd1e64d2e30780e6eb7779020a0 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 18 Apr 2024 12:30:25 -0400 Subject: [PATCH 07/26] fix: add paginator dep --- package.json | 1 + src/reader/table_reader.ts | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index f1c2e3aa..e6ca9a04 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "precompile": "gts clean" }, "dependencies": { + "@google-cloud/paginator": "^5.0.0", "apache-arrow": "^14.0.2", "extend": "^3.0.2", "google-auth-library": "^9.6.3", diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index ada53832..644ed91f 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -56,7 +56,7 @@ type GetRowsOptions = ListParams & { autoPaginate?: boolean; maxApiCalls?: number; }; -type RowsResponse = any[] | [any[], GetRowsOptions | null, TableDataList]; +type RowsResponse = any[] | [any[], ReadSession | null, TableDataList]; /** * A BigQuery Storage API Reader that can be used to reader data into BigQuery Table @@ -163,7 +163,7 @@ export class TableReader { }); stream.on('end', () => { this.trace('resolve called on joined stream'); - resolve([rows, session]); + resolve([rows, session, { rows, totalRows: session.estimatedRowCount }]); }); }); } From 7a5847aade3efd8181a9ae74d89e9dd73264d014 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 18 Apr 2024 13:09:27 -0400 Subject: [PATCH 08/26] fix: lint issues --- src/reader/table_reader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 644ed91f..4ce37f25 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -163,7 +163,7 @@ export class TableReader { }); stream.on('end', () => { this.trace('resolve called on joined stream'); - resolve([rows, session, { rows, totalRows: session.estimatedRowCount }]); + resolve([rows, session, {rows, totalRows: session.estimatedRowCount}]); }); }); } From 6b9871128b7d8668919b5fdc01ff16afb261a0df Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 29 May 2024 15:30:11 -0400 Subject: [PATCH 09/26] feat: move to stream transform instead of implementing Readable --- src/reader/read_client.ts | 48 +++++++------- src/reader/read_stream.ts | 129 ++++++++++++++++++------------------- src/reader/table_reader.ts | 12 ++-- 3 files changed, 93 insertions(+), 96 deletions(-) diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index 25dcfe05..4dc792f4 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -61,6 +61,7 @@ export class ReadClient { const baseOptions = { 'grpc.keepalive_time_ms': 30 * 1000, 'grpc.keepalive_timeout_ms': 10 * 1000, + 'grpc.use_local_subchannel_pool': 0, }; this._client = new BigQueryReadClient({ ...baseOptions, @@ -101,12 +102,25 @@ export class ReadClient { } /** - * Creates a write stream to the given table. - * Additionally, every table has a special stream named DefaultStream - * to which data can be written. This stream doesn't need to be created using - * createWriteStream. It is a stream that can be used simultaneously by any - * number of clients. Data written to this stream is considered committed as - * soon as an acknowledgement is received. + * Creates a new read session. A read session divides the contents of a + * BigQuery table into one or more streams, which can then be used to read + * data from the table. The read session also specifies properties of the + * data to be read, such as a list of columns or a push-down filter describing + * the rows to be returned. + * + * A particular row can be read by at most one stream. When the caller has + * reached the end of each stream in the session, then all the data in the + * table has been read. + * + * Data is assigned to each stream such that roughly the same number of + * rows can be read from each stream. Because the server-side unit for + * assigning data is collections of rows, the API does not guarantee that + * each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some + * filters can lead to lopsided assignments. + * + * Read sessions automatically expire 6 hours after they are created and do + * not require manual clean-up by the caller. * * @param {Object} request * The request object that will be sent. @@ -147,29 +161,19 @@ export class ReadClient { if (typeof [response] === undefined) { throw new gax.GoogleError(`${response}`); } - try { - return response; - } catch { - throw new Error('Stream connection failed'); - } + return response; } /** - * Creates a write stream to the given table. - * Additionally, every table has a special stream named DefaultStream - * to which data can be written. This stream doesn't need to be created using - * createWriteStream. It is a stream that can be used simultaneously by any - * number of clients. Data written to this stream is considered committed as - * soon as an acknowledgement is received. + * Creates a ReadStream to the given stream name and ReadSession. * * @param {Object} request * The request object that will be sent. * @param {string} request.streamName - * Required. The type of stream to create. - * @param {string} request.offset - * Required. Reference to the table to which the stream belongs, in the format - * of `projects/{project}/datasets/{dataset}/tables/{table}`. - * @returns {Promise}} - The promise which resolves to the streamId. + * Required. The id/name of read stream to read from. + * @param {string} request.session + * Required. Reference to the ReadSession. See `createReadSession`. + * @returns {Promise}} - The promise which resolves to the `ReadStream`. */ async createReadStream( request: { diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index feb060c3..92d27bfd 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -18,7 +18,7 @@ import * as protos from '../../protos/protos'; import {ReadClient} from './read_client'; import {logger} from '../util/logger'; -import {Readable} from 'stream'; +import {Readable, Transform} from 'stream'; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; type ReadRowsResponse = @@ -28,6 +28,12 @@ export type RemoveListener = { off: () => void; }; +interface TableRow { + f?: Array<{ + v?: any; + }>; +} + /** * ReadStream is responsible for reading data from a GRPC read stream * connection against the Storage Read API readRows method. @@ -36,12 +42,12 @@ export type RemoveListener = { * @extends EventEmitter * @memberof reader */ -export class ReadStream extends Readable { +export class ReadStream { private _streamName: string; private _offset: number; - private _maxRows: number; private _readClient: ReadClient; private _session: ReadSession; + private _readStream?: Readable; private _connection?: gax.CancellableStream | null; private _callOptions?: gax.CallOptions; @@ -51,31 +57,14 @@ export class ReadStream extends Readable { readClient: ReadClient, options?: gax.CallOptions ) { - super({ - objectMode: true, - }); this._streamName = streamName; this._session = session; this._offset = 0; - this._maxRows = 10; this._readClient = readClient; this._callOptions = options; this.open(); } - _read(_?: number | undefined) { - if (this.readableLength === 0) { - this.trace('read called with zero rows', this._connection?.isPaused()); - if (this._connection && this._connection.isPaused()) { - this._connection.resume(); - this._connection.read(); - } - return null; - } - this.trace('read called with existing rows', this.isPaused()); - return undefined; - } - open() { if (this.isOpen()) { this.close(); @@ -89,7 +78,16 @@ export class ReadStream extends Readable { this._callOptions ); this._connection = connection; - this._connection.on('data', this.handleData); + const parseTransform = new Transform({ + objectMode: true, + highWaterMark: 100, + transform: (response: ReadRowsResponse, _, callback) => { + const rows = this.parseReadRowsResponse(response); + rows.forEach(r => parseTransform.push(r)); + callback(null); + }, + }); + this._readStream = this._connection.pipe(parseTransform); this._connection.on('error', this.handleError); this._connection.on('close', () => { this.trace('connection closed'); @@ -97,13 +95,11 @@ export class ReadStream extends Readable { this._connection.on('pause', () => { this.trace('connection paused'); }); - this._connection.on('resume', () => { + this._connection.on('resume', async () => { this.trace('connection resumed'); - this.resume(); }); this._connection.on('end', () => { this.trace('connection ended'); - this.push(null); this.close(); }); } @@ -124,7 +120,7 @@ export class ReadStream extends Readable { this.reconnect(); return; } - this.emit('error', err); + this._readStream?.emit('error', err); }; private shouldRetry(err: gax.GoogleError): boolean { @@ -139,49 +135,48 @@ export class ReadStream extends Readable { return !!err.code && reconnectionErrorCodes.includes(err.code); } - private handleData = (response: ReadRowsResponse) => { - //this.trace('data arrived', response); - if ( - response.arrowRecordBatch && - response.arrowRecordBatch.serializedRecordBatch && - response.rowCount - ) { - const rowCount = parseInt(response.rowCount as string, 10); - const batch = response.arrowRecordBatch; - this.trace( - 'found ', - rowCount, - ' rows serialized in ', - batch.serializedRecordBatch?.length, - 'bytes', - this.readableFlowing - ); - - this._offset += rowCount; - - const buf = Buffer.concat([ - this._session.arrowSchema?.serializedSchema as Uint8Array, - batch.serializedRecordBatch as Uint8Array, - ]); - const r = RecordBatchReader.from(buf); - for (const recordBatch of r.readAll()) { - for (const row of recordBatch) { - this.push({ - f: row.toArray().map(fieldValue => { - return { - v: fieldValue, - }; - }), - }); - } + private parseReadRowsResponse(response: ReadRowsResponse): TableRow[] { + if (!response.arrowRecordBatch || !response.rowCount) { + return []; + } + if (!response.arrowRecordBatch.serializedRecordBatch) { + return []; + } + + const rowCount = parseInt(response.rowCount as string, 10); + const batch = response.arrowRecordBatch; + this.trace( + 'found ', + rowCount, + ' rows serialized in ', + batch.serializedRecordBatch?.length, + 'bytes' + ); + + this._offset += rowCount; + + const buf = Buffer.concat([ + this._session.arrowSchema?.serializedSchema as Uint8Array, + batch.serializedRecordBatch as Uint8Array, + ]); + const r = RecordBatchReader.from(buf); + const batches = r.readAll(); + const rows = []; + for (const recordBatch of batches) { + const rrows = []; + for (const row of recordBatch) { + rrows.push({ + f: row.toArray().map(fieldValue => { + return { + v: fieldValue, + }; + }), + }); } - this.resume(); - /* TODO: backpressure ? - if (this.readableLength > this._maxRows) { - this._connection?.pause(); - }*/ + rows.push(...rrows); } - }; + return rows; + } /** * Get the name of the read stream associated with this connection. @@ -191,7 +186,7 @@ export class ReadStream extends Readable { }; getRowsStream(): Readable { - return this; + return this._readStream!; } /** diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 4ce37f25..2c88b385 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -129,7 +129,9 @@ export class TableReader { } } } - const joined = Readable.from(mergeStreams(this._readStreams)); + const joined = Readable.from( + mergeStreams(this._readStreams.map(r => r.getRowsStream())) + ); this.trace('joined streams', joined); const stream = joined as ResourceStream; return [stream, session]; @@ -150,12 +152,8 @@ export class TableReader { const [stream, session] = await this.getRowsStream(options); return new Promise((resolve, reject) => { const rows: TableRow[] = []; - stream.on('readable', () => { - let data; - while ((data = stream.read()) !== null) { - this.trace('row arrived', data); - rows.push(data); - } + stream.on('data', (data: TableRow) => { + rows.push(data); }); stream.on('error', err => { this.trace('reject called on joined stream', err); From 98546f3550fd112ed9d952ec3aaa40ee66b4af3e Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 14 Jun 2024 12:02:00 -0400 Subject: [PATCH 10/26] feat: modular arrow streams and transforms --- src/reader/arrow_reader.ts | 94 +++++++++++++++++++++++ src/reader/arrow_transform.ts | 130 ++++++++++++++++++++++++++++++++ src/reader/read_client.ts | 11 ++- src/reader/read_session.ts | 136 ++++++++++++++++++++++++++++++++++ src/reader/read_stream.ts | 62 +++------------- src/reader/table_reader.ts | 97 ++++++++---------------- 6 files changed, 412 insertions(+), 118 deletions(-) create mode 100644 src/reader/arrow_reader.ts create mode 100644 src/reader/arrow_transform.ts create mode 100644 src/reader/read_session.ts diff --git a/src/reader/arrow_reader.ts b/src/reader/arrow_reader.ts new file mode 100644 index 00000000..8d3c5b43 --- /dev/null +++ b/src/reader/arrow_reader.ts @@ -0,0 +1,94 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {RecordBatch} from 'apache-arrow'; + +import * as protos from '../../protos/protos'; +import {TableReference, ReadClient} from './read_client'; +import {logger} from '../util/logger'; +import { + ArrowRawTransform, + ArrowRecordBatchTransform, + ArrowRecordReaderTransform, +} from './arrow_transform'; +import {ReadSession, GetStreamOptions} from './read_session'; + +type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; +type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; +const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; + +/** + * A BigQuery Storage API Reader that can be used to read data + * from BigQuery Tables using the Storage API. + * + * @class + * @memberof reader + */ +export class ArrowTableReader { + private _table: TableReference; + private _session: ReadSession; + + /** + * Creates a new Reader instance. + * + * @param {Object} params - The parameters for the JSONWriter. + * @param {TableReference} params.table - The stream connection + * to the BigQuery streaming insert operation. + */ + constructor(readClient: ReadClient, table: TableReference) { + this._table = table; + this._session = new ReadSession(readClient, table, DataFormat.ARROW); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger( + 'arrow_table_reader', + `[table: ${this._table.tableId}]`, + msg, + ...otherArgs + ); + } + + getSessionInfo(): ReadSessionInfo | undefined | null { + return this._session.getSessionInfo(); + } + + async getRawStream( + options?: GetStreamOptions + ): Promise> { + this.trace('getRawStream', options); + + const stream = await this._session.getStream(options); + + return stream.pipe(new ArrowRawTransform()) as ResourceStream; + } + + async getRecordBatchStream( + options?: GetStreamOptions + ): Promise> { + this.trace('getRecordBatchStream', options); + const stream = await this._session.getStream(options); + const info = this._session.getSessionInfo(); + return stream + .pipe(new ArrowRawTransform()) + .pipe(new ArrowRecordReaderTransform(info!)) + .pipe(new ArrowRecordBatchTransform()) as ResourceStream; + } + + close() { + this._session.close(); + } +} diff --git a/src/reader/arrow_transform.ts b/src/reader/arrow_transform.ts new file mode 100644 index 00000000..0066ad1d --- /dev/null +++ b/src/reader/arrow_transform.ts @@ -0,0 +1,130 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Transform, TransformCallback} from 'stream'; +import { + RecordBatchReader, + RecordBatch, + RecordBatchStreamReader, +} from 'apache-arrow'; +import * as protos from '../../protos/protos'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; + +export class ArrowRawTransform extends Transform { + constructor() { + super({ + readableObjectMode: false, + writableObjectMode: true, + }); + } + + _transform( + response: ReadRowsResponse, + _: BufferEncoding, + callback: TransformCallback + ): void { + if ( + !( + response.arrowRecordBatch && + response.arrowRecordBatch.serializedRecordBatch + ) + ) { + callback(null); + return; + } + callback(null, response.arrowRecordBatch?.serializedRecordBatch); + } +} +export class ArrowRecordReaderTransform extends Transform { + private session: ReadSession; + + constructor(session: ReadSession) { + super({ + objectMode: true, + }); + this.session = session; + } + + _transform( + serializedRecordBatch: Uint8Array, + _: BufferEncoding, + callback: TransformCallback + ): void { + const buf = Buffer.concat([ + this.session.arrowSchema?.serializedSchema as Uint8Array, + serializedRecordBatch, + ]); + const reader = RecordBatchReader.from(buf); + callback(null, reader); + } +} + +export class ArrowRecordBatchTransform extends Transform { + constructor() { + super({ + objectMode: true, + }); + } + + _transform( + reader: RecordBatchStreamReader, + _: BufferEncoding, + callback: TransformCallback + ): void { + const batches = reader.readAll(); + for (const row of batches) { + this.push(row); + } + callback(null); + } +} + +export class ArrowRecordBatchTableRowTransform extends Transform { + constructor() { + super({ + objectMode: true, + }); + } + + _transform( + batch: RecordBatch, + _: BufferEncoding, + callback: TransformCallback + ): void { + //console.log('transform ArrowRecordBatchTableRowTransform', batch) + const rows = new Array(batch.numRows); + for (let i = 0; i < batch.numRows; i++) { + rows[i] = { + f: new Array(batch.numCols), + }; + } + for (let j = 0; j < batch.numCols; j++) { + const column = batch.selectAt([j]); + for (let i = 0; i < batch.numRows; i++) { + const fieldData = column.get(i); + const fieldValue = fieldData?.toArray()[0]; + rows[i].f[j] = { + v: fieldValue, + }; + } + } + for (let i = 0; i < batch.numRows; i++) { + this.push(rows[i]); + } + callback(null); + } +} diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index 4dc792f4..b687f0f5 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -14,11 +14,12 @@ import * as gax from 'google-gax'; import type {CallOptions, ClientOptions} from 'google-gax'; -import * as protos from '../../protos/protos'; +import * as protos from '../../protos/protos'; import {BigQueryReadClient} from '../v1'; import {ReadStream} from './read_stream'; import {TableReader} from './table_reader'; +import {ArrowTableReader} from './arrow_reader'; type CreateReadSessionRequest = protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; @@ -200,6 +201,14 @@ export class ReadClient { return reader; } + async createArrowTableReader(params: { + table: TableReference; + }): Promise { + await this.initialize(); + const reader = new ArrowTableReader(this, params.table); + return reader; + } + close() { this._client.close(); this._open = false; diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts new file mode 100644 index 00000000..4a51c9a9 --- /dev/null +++ b/src/reader/read_session.ts @@ -0,0 +1,136 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {Readable} from 'stream'; + +import {ReadStream} from './read_stream'; +import * as protos from '../../protos/protos'; +import {TableReference, ReadClient} from './read_client'; +import {logger} from '../util/logger'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; +const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession; +type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; +const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; + +export type GetStreamOptions = { + /** + * Row limit of the table. + */ + maxResults?: number; + /** + * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; + */ + selectedFields?: string; + autoPaginate?: boolean; + maxApiCalls?: number; +}; + +export class ReadSession { + private _info: ReadSessionInfo | null; + private _table: TableReference; + private _format: DataFormat; + private _readStreams: ReadStream[]; + private _readClient: ReadClient; + + constructor( + readClient: ReadClient, + table: TableReference, + format: DataFormat + ) { + this._info = null; + this._format = format; + this._table = table; + this._readClient = readClient; + this._readStreams = []; + } + + getSessionInfo(): ReadSessionInfo | null { + return this._info; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger('session', `[session: ${this._info?.name}]`, msg, ...otherArgs); + } + + private async getOrCreateSession( + options?: GetStreamOptions + ): Promise { + if (this._info) { + return this._info; + } + const session = await this._readClient.createReadSession({ + parent: `projects/${this._table.projectId}`, + table: `projects/${this._table.projectId}/datasets/${this._table.datasetId}/tables/${this._table.tableId}`, + dataFormat: this._format, + selectedFields: options?.selectedFields?.split(','), + }); + this.trace( + 'session created', + session.name, + session.streams, + session.estimatedRowCount + ); + this._info = session; + return session; + } + + async getStream( + options?: GetStreamOptions + ): Promise> { + this.trace('getStream', options); + + const session = await this.getOrCreateSession(options); + this._readStreams = []; + for (const readStream of session.streams || []) { + const r = await this._readClient.createReadStream( + { + streamName: readStream.name!, + session, + }, + options + ); + this._readStreams.push(r); + } + + async function* mergeStreams(readables: Readable[]) { + for (const readable of readables) { + for await (const chunk of readable) { + yield chunk; + } + } + } + const joined = Readable.from( + mergeStreams( + this._readStreams.map(r => { + const stream = r.getRowsStream(); + return stream; + }) + ) + ); + this.trace('joined streams', joined); + const stream = joined as ResourceStream; + return stream; + } + + close() { + this._readStreams.forEach(rs => { + rs.close(); + }); + } +} diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index 92d27bfd..a940dd2d 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -13,7 +13,6 @@ // limitations under the License. import * as gax from 'google-gax'; -import {RecordBatchReader} from 'apache-arrow'; import * as protos from '../../protos/protos'; import {ReadClient} from './read_client'; @@ -28,12 +27,6 @@ export type RemoveListener = { off: () => void; }; -interface TableRow { - f?: Array<{ - v?: any; - }>; -} - /** * ReadStream is responsible for reading data from a GRPC read stream * connection against the Storage Read API readRows method. @@ -78,16 +71,14 @@ export class ReadStream { this._callOptions ); this._connection = connection; - const parseTransform = new Transform({ + const passthrough = new Transform({ objectMode: true, - highWaterMark: 100, transform: (response: ReadRowsResponse, _, callback) => { - const rows = this.parseReadRowsResponse(response); - rows.forEach(r => parseTransform.push(r)); - callback(null); + this.processReadRowsResponse(response); + callback(null, response); }, }); - this._readStream = this._connection.pipe(parseTransform); + this._readStream = this._connection.pipe(passthrough); this._connection.on('error', this.handleError); this._connection.on('close', () => { this.trace('connection closed'); @@ -135,47 +126,12 @@ export class ReadStream { return !!err.code && reconnectionErrorCodes.includes(err.code); } - private parseReadRowsResponse(response: ReadRowsResponse): TableRow[] { - if (!response.arrowRecordBatch || !response.rowCount) { - return []; - } - if (!response.arrowRecordBatch.serializedRecordBatch) { - return []; + private processReadRowsResponse(response: ReadRowsResponse) { + if (!response.rowCount) { + return; } - const rowCount = parseInt(response.rowCount as string, 10); - const batch = response.arrowRecordBatch; - this.trace( - 'found ', - rowCount, - ' rows serialized in ', - batch.serializedRecordBatch?.length, - 'bytes' - ); - this._offset += rowCount; - - const buf = Buffer.concat([ - this._session.arrowSchema?.serializedSchema as Uint8Array, - batch.serializedRecordBatch as Uint8Array, - ]); - const r = RecordBatchReader.from(buf); - const batches = r.readAll(); - const rows = []; - for (const recordBatch of batches) { - const rrows = []; - for (const row of recordBatch) { - rrows.push({ - f: row.toArray().map(fieldValue => { - return { - v: fieldValue, - }; - }), - }); - } - rows.push(...rrows); - } - return rows; } /** @@ -185,6 +141,10 @@ export class ReadStream { return this._streamName; }; + getReadSession(): ReadSession { + return this._session; + } + getRowsStream(): Readable { return this._readStream!; } diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 2c88b385..12e3ad84 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -12,26 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {ReadStream} from './read_stream'; +import {ResourceStream} from '@google-cloud/paginator'; + import * as protos from '../../protos/protos'; import {TableReference, ReadClient} from './read_client'; -import {Readable} from 'stream'; import {logger} from '../util/logger'; -import {ResourceStream} from '@google-cloud/paginator'; +import {ArrowRecordBatchTableRowTransform} from './arrow_transform'; +import {ArrowTableReader} from './arrow_reader'; + +type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; -type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; -type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; -const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; -interface ListParams { - /** - * Row limit of the table. - */ - maxResults?: number; - /** - * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; - */ - selectedFields?: string; -} interface TableCell { v?: any; } @@ -52,11 +42,19 @@ interface TableDataList { totalRows?: string; } -type GetRowsOptions = ListParams & { +type GetRowsOptions = { + /** + * Row limit of the table. + */ + maxResults?: number; + /** + * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; + */ + selectedFields?: string; autoPaginate?: boolean; maxApiCalls?: number; }; -type RowsResponse = any[] | [any[], ReadSession | null, TableDataList]; +type RowsResponse = any[] | [any[], ReadSessionInfo | null, TableDataList]; /** * A BigQuery Storage API Reader that can be used to reader data into BigQuery Table @@ -66,8 +64,7 @@ type RowsResponse = any[] | [any[], ReadSession | null, TableDataList]; * @memberof reader */ export class TableReader { - private _readClient: ReadClient; - private _readStreams: ReadStream[]; + private _arrowReader: ArrowTableReader; private _table: TableReference; /** @@ -79,8 +76,7 @@ export class TableReader { */ constructor(readClient: ReadClient, table: TableReference) { this._table = table; - this._readClient = readClient; - this._readStreams = []; + this._arrowReader = new ArrowTableReader(readClient, table); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -93,48 +89,18 @@ export class TableReader { ); } + getSessionInfo(): ReadSessionInfo | undefined | null { + return this._arrowReader.getSessionInfo(); + } + async getRowsStream( options?: GetRowsOptions - ): Promise<[ResourceStream, ReadSession]> { + ): Promise> { this.trace('getRowsStream', options); - const session = await this._readClient.createReadSession({ - parent: `projects/${this._table.projectId}`, - table: `projects/${this._table.projectId}/datasets/${this._table.datasetId}/tables/${this._table.tableId}`, - dataFormat: DataFormat.ARROW, - selectedFields: options?.selectedFields?.split(','), - }); - this.trace( - 'session created', - session.name, - session.streams, - session.estimatedRowCount - ); - - this._readStreams = []; - for (const readStream of session.streams || []) { - const r = await this._readClient.createReadStream( - { - streamName: readStream.name!, - session, - }, - options - ); - this._readStreams.push(r); - } - - async function* mergeStreams(readables: Readable[]) { - for (const readable of readables) { - for await (const chunk of readable) { - yield chunk; - } - } - } - const joined = Readable.from( - mergeStreams(this._readStreams.map(r => r.getRowsStream())) - ); - this.trace('joined streams', joined); - const stream = joined as ResourceStream; - return [stream, session]; + const stream = await this._arrowReader.getRecordBatchStream(options); + return stream.pipe( + new ArrowRecordBatchTableRowTransform() + ) as ResourceStream; } /** @@ -149,7 +115,8 @@ export class TableReader { */ async getRows(options?: GetRowsOptions): Promise { this.trace('getRows', options); - const [stream, session] = await this.getRowsStream(options); + const stream = await this.getRowsStream(options); + const session = this.getSessionInfo(); return new Promise((resolve, reject) => { const rows: TableRow[] = []; stream.on('data', (data: TableRow) => { @@ -161,14 +128,12 @@ export class TableReader { }); stream.on('end', () => { this.trace('resolve called on joined stream'); - resolve([rows, session, {rows, totalRows: session.estimatedRowCount}]); + resolve([rows, session, {rows, totalRows: session?.estimatedRowCount}]); }); }); } close() { - this._readStreams.forEach(rs => { - rs.close(); - }); + this._arrowReader.close(); } } From bd67c85c43c8d5b9f2a8978cf93dfee3ce55c6bc Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 16 Jul 2024 20:56:55 -0400 Subject: [PATCH 11/26] docs: update doc strings --- src/reader/arrow_reader.ts | 20 +++++++++++++++----- src/reader/arrow_transform.ts | 20 +++++++++++++++++++- src/reader/read_session.ts | 17 +++++++++++++++-- src/reader/table_reader.ts | 8 +++----- 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/src/reader/arrow_reader.ts b/src/reader/arrow_reader.ts index 8d3c5b43..1b148518 100644 --- a/src/reader/arrow_reader.ts +++ b/src/reader/arrow_reader.ts @@ -31,7 +31,7 @@ const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; /** * A BigQuery Storage API Reader that can be used to read data - * from BigQuery Tables using the Storage API. + * from BigQuery Tables using the Storage API in Arrow format. * * @class * @memberof reader @@ -41,11 +41,11 @@ export class ArrowTableReader { private _session: ReadSession; /** - * Creates a new Reader instance. + * Creates a new ArrowTableReader instance. Usually created via + * ReadClient.createArrowTableReader(). * - * @param {Object} params - The parameters for the JSONWriter. - * @param {TableReference} params.table - The stream connection - * to the BigQuery streaming insert operation. + * @param {ReadClient} readClient - Storage Read Client. + * @param {TableReference} table - target table to read data from. */ constructor(readClient: ReadClient, table: TableReference) { this._table = table; @@ -66,6 +66,11 @@ export class ArrowTableReader { return this._session.getSessionInfo(); } + /** + * Get a byte stream of Arrow Record Batch. + * + * @param {GetStreamOptions} options + */ async getRawStream( options?: GetStreamOptions ): Promise> { @@ -76,6 +81,11 @@ export class ArrowTableReader { return stream.pipe(new ArrowRawTransform()) as ResourceStream; } + /** + * Get a stream of Arrow RecordBatch objects. + * + * @param {GetStreamOptions} options + */ async getRecordBatchStream( options?: GetStreamOptions ): Promise> { diff --git a/src/reader/arrow_transform.ts b/src/reader/arrow_transform.ts index 0066ad1d..c0b87c15 100644 --- a/src/reader/arrow_transform.ts +++ b/src/reader/arrow_transform.ts @@ -24,6 +24,11 @@ type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +/** + * ArrowRawTransform implements a node stream Transform that reads + * ReadRowsResponse from BigQuery Storage Read API and convert + * a raw Arrow Record Batch. + */ export class ArrowRawTransform extends Transform { constructor() { super({ @@ -49,6 +54,12 @@ export class ArrowRawTransform extends Transform { callback(null, response.arrowRecordBatch?.serializedRecordBatch); } } + +/** + * ArrowRecordReaderTransform implements a node stream Transform that reads + * a byte stream of raw Arrow Record Batch and convert to a stream of Arrow + * RecordBatchStreamReader. + */ export class ArrowRecordReaderTransform extends Transform { private session: ReadSession; @@ -73,6 +84,10 @@ export class ArrowRecordReaderTransform extends Transform { } } +/** + * ArrowRecordBatchTransform implements a node stream Transform that reads + * a RecordBatchStreamReader and convert a stream of Arrow RecordBatch. + */ export class ArrowRecordBatchTransform extends Transform { constructor() { super({ @@ -93,6 +108,10 @@ export class ArrowRecordBatchTransform extends Transform { } } +/** + * ArrowRecordBatchTableRowTransform implements a node stream Transform that reads + * an Arrow RecordBatch and convert a stream of BigQuery TableRow. + */ export class ArrowRecordBatchTableRowTransform extends Transform { constructor() { super({ @@ -105,7 +124,6 @@ export class ArrowRecordBatchTableRowTransform extends Transform { _: BufferEncoding, callback: TransformCallback ): void { - //console.log('transform ArrowRecordBatchTableRowTransform', batch) const rows = new Array(batch.numRows); for (let i = 0; i < batch.numRows; i++) { rows[i] = { diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts index 4a51c9a9..fb64c8d3 100644 --- a/src/reader/read_session.ts +++ b/src/reader/read_session.ts @@ -36,10 +36,17 @@ export type GetStreamOptions = { * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; */ selectedFields?: string; - autoPaginate?: boolean; - maxApiCalls?: number; }; +/** + * A ReadSession represents a Read Session from the BigQuery + * Storage Read API. + * + * Read more on:https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#readsession + * + * @class + * @memberof reader + */ export class ReadSession { private _info: ReadSessionInfo | null; private _table: TableReference; @@ -90,6 +97,12 @@ export class ReadSession { return session; } + /** + * Get a merged stream of ReadRowsResponse from all ReadStream + * under this ReadSession. + * + * @param {GetStreamOptions} options + */ async getStream( options?: GetStreamOptions ): Promise> { diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 12e3ad84..f7596282 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -51,8 +51,6 @@ type GetRowsOptions = { * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; */ selectedFields?: string; - autoPaginate?: boolean; - maxApiCalls?: number; }; type RowsResponse = any[] | [any[], ReadSessionInfo | null, TableDataList]; @@ -93,10 +91,10 @@ export class TableReader { return this._arrowReader.getSessionInfo(); } - async getRowsStream( + async getRowStream( options?: GetRowsOptions ): Promise> { - this.trace('getRowsStream', options); + this.trace('getRowStream', options); const stream = await this._arrowReader.getRecordBatchStream(options); return stream.pipe( new ArrowRecordBatchTableRowTransform() @@ -115,7 +113,7 @@ export class TableReader { */ async getRows(options?: GetRowsOptions): Promise { this.trace('getRows', options); - const stream = await this.getRowsStream(options); + const stream = await this.getRowStream(options); const session = this.getSessionInfo(); return new Promise((resolve, reject) => { const rows: TableRow[] = []; From aaeb3bdea8bff9dbb9f6957ce1162588ebce6408 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 16 Jul 2024 21:07:01 -0400 Subject: [PATCH 12/26] fix: lint issues --- src/reader/arrow_reader.ts | 8 ++++---- src/reader/read_session.ts | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/reader/arrow_reader.ts b/src/reader/arrow_reader.ts index 1b148518..496d947a 100644 --- a/src/reader/arrow_reader.ts +++ b/src/reader/arrow_reader.ts @@ -68,8 +68,8 @@ export class ArrowTableReader { /** * Get a byte stream of Arrow Record Batch. - * - * @param {GetStreamOptions} options + * + * @param {GetStreamOptions} options */ async getRawStream( options?: GetStreamOptions @@ -83,8 +83,8 @@ export class ArrowTableReader { /** * Get a stream of Arrow RecordBatch objects. - * - * @param {GetStreamOptions} options + * + * @param {GetStreamOptions} options */ async getRecordBatchStream( options?: GetStreamOptions diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts index fb64c8d3..588d8704 100644 --- a/src/reader/read_session.ts +++ b/src/reader/read_session.ts @@ -41,7 +41,7 @@ export type GetStreamOptions = { /** * A ReadSession represents a Read Session from the BigQuery * Storage Read API. - * + * * Read more on:https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#readsession * * @class @@ -100,8 +100,8 @@ export class ReadSession { /** * Get a merged stream of ReadRowsResponse from all ReadStream * under this ReadSession. - * - * @param {GetStreamOptions} options + * + * @param {GetStreamOptions} options */ async getStream( options?: GetStreamOptions From 7382c340a1b84a0e79888a42137ed241a3b25091 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 17 Jul 2024 11:53:56 -0400 Subject: [PATCH 13/26] fix: read rows sample --- samples/read_rows.js | 2 +- src/reader/table_reader.ts | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/samples/read_rows.js b/samples/read_rows.js index 8c16e7e9..dd6a8a3c 100644 --- a/samples/read_rows.js +++ b/samples/read_rows.js @@ -49,7 +49,7 @@ function main( }); console.log('table', table.dataset.projectId, table.dataset.id, table.id); - const treader = await readClient.createTableReader({table}); + const treader = await readClient.createTableReader({table: dstTableRef}); const [rawRows] = await treader.getRows(); const rows = BigQuery.mergeSchemaWithRows_(md.schema, rawRows, {}); rows.forEach(row => { diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index f7596282..80f358d8 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -102,14 +102,12 @@ export class TableReader { } /** - * @callback RowsCallback - * @param {?Error} err Request error, if any. - * @param {array} rows The rows. - * @param {object} apiResponse The full API response. - */ - /** - * @typedef {array} RowsResponse - * @property {array} 0 The rows. + * Retrieves table data as rows in same format + * as tabledata.list BigQuery v2 API. + * Extra parameters returned contain Storage Read API specific information + * like ReadSession and totalRows count. + * + * @param {options} GetRowsOptions */ async getRows(options?: GetRowsOptions): Promise { this.trace('getRows', options); From 63805b34cd278304247194a5b007fd72a6bfc089 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 17 Jul 2024 17:12:09 -0400 Subject: [PATCH 14/26] test: arrow transforms --- src/reader/arrow_transform.ts | 32 +++++++- test/reader/arrow_transform.ts | 130 +++++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 2 deletions(-) create mode 100644 test/reader/arrow_transform.ts diff --git a/src/reader/arrow_transform.ts b/src/reader/arrow_transform.ts index c0b87c15..ca9e8b44 100644 --- a/src/reader/arrow_transform.ts +++ b/src/reader/arrow_transform.ts @@ -17,6 +17,7 @@ import { RecordBatchReader, RecordBatch, RecordBatchStreamReader, + Vector, } from 'apache-arrow'; import * as protos from '../../protos/protos'; @@ -24,6 +25,13 @@ type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +interface TableCell { + v?: any; +} +interface TableRow { + f?: Array; +} + /** * ArrowRawTransform implements a node stream Transform that reads * ReadRowsResponse from BigQuery Storage Read API and convert @@ -132,11 +140,12 @@ export class ArrowRecordBatchTableRowTransform extends Transform { } for (let j = 0; j < batch.numCols; j++) { const column = batch.selectAt([j]); + const columnName = column.schema.fields[0].name; for (let i = 0; i < batch.numRows; i++) { const fieldData = column.get(i); - const fieldValue = fieldData?.toArray()[0]; + const fieldValue = fieldData?.toJSON()[columnName]; rows[i].f[j] = { - v: fieldValue, + v: convertArrowValue(fieldValue), }; } } @@ -146,3 +155,22 @@ export class ArrowRecordBatchTableRowTransform extends Transform { callback(null); } } + +function convertArrowValue(fieldValue: any): any { + if (typeof fieldValue === 'object') { + if (fieldValue instanceof Vector) { + const arr = fieldValue.toJSON(); + return arr.map((v: any) => { + return {v: convertArrowValue(v)}; + }); + } + const tableRow: TableRow = {f: []}; + Object.keys(fieldValue).forEach(key => { + tableRow.f?.push({ + v: convertArrowValue(fieldValue[key]), + }); + }); + return tableRow; + } + return fieldValue; +} diff --git a/test/reader/arrow_transform.ts b/test/reader/arrow_transform.ts new file mode 100644 index 00000000..7b10c3be --- /dev/null +++ b/test/reader/arrow_transform.ts @@ -0,0 +1,130 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it} from 'mocha'; +import * as protos from '../../protos/protos'; +import { + RecordBatchStreamWriter, + tableFromArrays, +} from 'apache-arrow'; +import {Readable} from 'stream'; +import { + ArrowRawTransform, + ArrowRecordBatchTableRowTransform, + ArrowRecordBatchTransform, + ArrowRecordReaderTransform, +} from '../../src/reader/arrow_transform'; +import {BigQuery} from '@google-cloud/bigquery'; +import bigquery from '@google-cloud/bigquery/build/src/types'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +const ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.ReadRowsResponse; + +describe('Arrow Transform', () => { + it('Pipeline with all transforms', async () => { + const schema: bigquery.ITableSchema = { + fields: [ + {name: 'name', type: 'STRING'}, + {name: 'row', type: 'INTEGER'}, + {name: 'arr', type: 'INTEGER', mode: 'REPEATED'}, + { + name: 'rec', + type: 'RECORD', + fields: [ + {name: 'key', type: 'STRING'}, + {name: 'value', type: 'STRING'}, + ], + }, + { + name: 'recs', + type: 'RECORD', + mode: 'REPEATED', + fields: [{name: 'num', type: 'INTEGER'}], + }, + ], + }; + const table = tableFromArrays({ + name: ['Ada Lovelace', 'Alan Turing'], + row: [1, 2], + arr: [ + [10, 20], + [20, 30], + ], + rec: [ + {key: 'foo', value: 'bar'}, + {key: 'test', value: 'baz'}, + ], + recs: [ + [{num: 10}, {num: 20}], + [{num: 20}, {num: 30}], + ], + }); + const writer = RecordBatchStreamWriter.writeAll(table); + const serializedRecordBatch = writer.toUint8Array(true); + const serializedSchema = Uint8Array.from([]); + const response: ReadRowsResponse = { + arrowSchema: { + serializedSchema, + }, + arrowRecordBatch: { + serializedRecordBatch, + rowCount: table.numRows, + }, + }; + + const pipeline = Readable.from([response]) + .pipe(new ArrowRawTransform()) + .pipe( + new ArrowRecordReaderTransform({ + arrowSchema: { + serializedSchema, + }, + }) + ) + .pipe(new ArrowRecordBatchTransform()) + .pipe(new ArrowRecordBatchTableRowTransform()); + + const promise = new Promise(resolve => { + const rows: any[] = []; + pipeline + .on('data', data => { + rows.push(data); + }) + .on('end', () => resolve(rows)); + }); + const tableRows = await promise; + const rows = BigQuery.mergeSchemaWithRows_(schema, tableRows, { + wrapIntegers: false, + }); + assert.deepStrictEqual(rows, [ + { + name: 'Ada Lovelace', + row: 1, + arr: [10, 20], + rec: {key: 'foo', value: 'bar'}, + recs: [{num: 10}, {num: 20}], + }, + { + name: 'Alan Turing', + row: 2, + arr: [20, 30], + rec: {key: 'test', value: 'baz'}, + recs: [{num: 20}, {num: 30}], + }, + ]); + }); +}); From 182d3234497b9d6131e995d4bd995a3cb7d276cc Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 18 Jul 2024 16:28:14 -0400 Subject: [PATCH 15/26] test: add reader package tests --- package.json | 4 +- src/reader/arrow_reader.ts | 11 +- src/reader/data_format.ts | 33 ++ src/reader/index.ts | 2 + src/reader/read_client.ts | 5 +- src/reader/read_session.ts | 61 ++-- src/reader/read_stream.ts | 13 +- src/reader/table_reader.ts | 5 +- system-test/install.ts | 1 + system-test/reader_client_test.ts | 485 ++++++++++++++++++++++++++++++ test/reader/arrow_transform.ts | 35 ++- 11 files changed, 591 insertions(+), 64 deletions(-) create mode 100644 src/reader/data_format.ts create mode 100644 system-test/reader_client_test.ts diff --git a/package.json b/package.json index 68470bab..b9a287c9 100644 --- a/package.json +++ b/package.json @@ -27,8 +27,8 @@ "precompile": "gts clean" }, "dependencies": { + "apache-arrow": "^17.0.0", "@google-cloud/paginator": "^5.0.0", - "apache-arrow": "^14.0.2", "extend": "^3.0.2", "google-auth-library": "^9.6.3", "google-gax": "^4.3.1" @@ -55,7 +55,7 @@ "pack-n-play": "^2.0.0", "sinon": "^18.0.0", "ts-loader": "^9.0.0", - "typescript": "^5.1.6", + "typescript": "^5.5.3", "uuid": "^9.0.0", "webpack": "^5.0.0", "webpack-cli": "^5.0.0" diff --git a/src/reader/arrow_reader.ts b/src/reader/arrow_reader.ts index 496d947a..8638b193 100644 --- a/src/reader/arrow_reader.ts +++ b/src/reader/arrow_reader.ts @@ -24,10 +24,9 @@ import { ArrowRecordReaderTransform, } from './arrow_transform'; import {ReadSession, GetStreamOptions} from './read_session'; +import {ArrowFormat} from './data_format'; type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; -type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; -const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; /** * A BigQuery Storage API Reader that can be used to read data @@ -49,7 +48,7 @@ export class ArrowTableReader { */ constructor(readClient: ReadClient, table: TableReference) { this._table = table; - this._session = new ReadSession(readClient, table, DataFormat.ARROW); + this._session = new ReadSession(readClient, table, ArrowFormat); } // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -71,13 +70,11 @@ export class ArrowTableReader { * * @param {GetStreamOptions} options */ - async getRawStream( + async getStream( options?: GetStreamOptions ): Promise> { - this.trace('getRawStream', options); - + this.trace('getStream', options); const stream = await this._session.getStream(options); - return stream.pipe(new ArrowRawTransform()) as ResourceStream; } diff --git a/src/reader/data_format.ts b/src/reader/data_format.ts new file mode 100644 index 00000000..d599c471 --- /dev/null +++ b/src/reader/data_format.ts @@ -0,0 +1,33 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as protos from '../../protos/protos'; + +export type DataFormat = + protos.google.cloud.bigquery.storage.v1.IReadSession['dataFormat']; +const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; + +/** + * Return data in Apache Arrow format. + * + * @memberof reader + */ +export const ArrowFormat: DataFormat = 'ARROW'; + +/** + * Return data in Apache Avro format. + * + * @memberof reader + */ +export const AvroFormat: DataFormat = 'AVRO'; diff --git a/src/reader/index.ts b/src/reader/index.ts index 8f66dd6d..49983942 100644 --- a/src/reader/index.ts +++ b/src/reader/index.ts @@ -26,5 +26,7 @@ export {ReadClient} from './read_client'; export {TableReader} from './table_reader'; +export {ArrowTableReader} from './arrow_reader'; export {ReadStream} from './read_stream'; +export {DataFormat, ArrowFormat, AvroFormat} from './data_format'; export {setLogFunction} from '../util/logger'; diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index b687f0f5..d13e07b6 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -20,11 +20,11 @@ import {BigQueryReadClient} from '../v1'; import {ReadStream} from './read_stream'; import {TableReader} from './table_reader'; import {ArrowTableReader} from './arrow_reader'; +import {DataFormat} from './data_format'; type CreateReadSessionRequest = protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; -type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; export type TableReference = { /** @@ -131,9 +131,6 @@ export class ReadClient { * @param {string} request.table * Parent table that all the streams should belong to, in the form * of `projects/{project}/datasets/{dataset}/tables/{table}`. - * @param {TableReference} request.table - * Reference to the table to which the stream belongs, in the format - * of `projects/{project}/datasets/{dataset}/tables/{table}`. * @returns {Promise}} - The promise which resolves to the streamId. */ async createReadSession(request: { diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts index 588d8704..5f10f44a 100644 --- a/src/reader/read_session.ts +++ b/src/reader/read_session.ts @@ -18,14 +18,13 @@ import {Readable} from 'stream'; import {ReadStream} from './read_stream'; import * as protos from '../../protos/protos'; import {TableReference, ReadClient} from './read_client'; +import {DataFormat} from './data_format'; import {logger} from '../util/logger'; type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession; -type DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; -const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; export type GetStreamOptions = { /** @@ -94,6 +93,18 @@ export class ReadSession { session.estimatedRowCount ); this._info = session; + + this._readStreams = []; + for (const readStream of session.streams || []) { + const r = await this._readClient.createReadStream( + { + streamName: readStream.name!, + session, + }, + options + ); + this._readStreams.push(r); + } return session; } @@ -108,34 +119,15 @@ export class ReadSession { ): Promise> { this.trace('getStream', options); - const session = await this.getOrCreateSession(options); - this._readStreams = []; - for (const readStream of session.streams || []) { - const r = await this._readClient.createReadStream( - { - streamName: readStream.name!, - session, - }, - options - ); - this._readStreams.push(r); - } + await this.getOrCreateSession(options); - async function* mergeStreams(readables: Readable[]) { - for (const readable of readables) { - for await (const chunk of readable) { - yield chunk; - } - } - } - const joined = Readable.from( - mergeStreams( - this._readStreams.map(r => { - const stream = r.getRowsStream(); - return stream; - }) - ) + const mergedStream = mergeStreams( + this._readStreams.map(r => { + const stream = r.getRowsStream(); + return stream; + }) ); + const joined = Readable.from(mergedStream); this.trace('joined streams', joined); const stream = joined as ResourceStream; return stream; @@ -147,3 +139,16 @@ export class ReadSession { }); } } + +async function* mergeStreams(readables: Readable[]) { + for (const readable of readables) { + try { + for await (const chunk of readable) { + yield chunk; + } + } catch (err) { + console.log('mergeStream err', err); + throw err; + } + } +} diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index a940dd2d..cc5e84da 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -107,14 +107,17 @@ export class ReadStream { private handleError = (err: gax.GoogleError) => { this.trace('on error', err, JSON.stringify(err)); - if (this.shouldRetry(err)) { + if (this.isRetryableError(err)) { this.reconnect(); return; } this._readStream?.emit('error', err); }; - private shouldRetry(err: gax.GoogleError): boolean { + private isRetryableError(err?: gax.GoogleError | null): boolean { + if (!err) { + return false; + } const reconnectionErrorCodes = [ gax.Status.ABORTED, gax.Status.CANCELLED, @@ -153,7 +156,10 @@ export class ReadStream { * Check if connection is open and ready to read data. */ isOpen(): boolean { - return !!this._connection; + if (this._connection) { + return !(this._connection.destroyed || this._connection.closed); + } + return false; } /** @@ -174,6 +180,7 @@ export class ReadStream { } this._connection.end(); this._connection.removeAllListeners(); + this._connection.destroy(); this._connection = null; } } diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 80f358d8..4c08dc51 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -52,7 +52,7 @@ type GetRowsOptions = { */ selectedFields?: string; }; -type RowsResponse = any[] | [any[], ReadSessionInfo | null, TableDataList]; +type RowsResponse = [any[], ReadSessionInfo | null, TableDataList]; /** * A BigQuery Storage API Reader that can be used to reader data into BigQuery Table @@ -124,7 +124,8 @@ export class TableReader { }); stream.on('end', () => { this.trace('resolve called on joined stream'); - resolve([rows, session, {rows, totalRows: session?.estimatedRowCount}]); + const totalRows = `${session?.estimatedRowCount ?? 0}`; + resolve([rows, session ?? null, {rows, totalRows}]); }); }); } diff --git a/system-test/install.ts b/system-test/install.ts index 83b83f33..3785fd2f 100644 --- a/system-test/install.ts +++ b/system-test/install.ts @@ -27,6 +27,7 @@ describe('📦 pack-n-play test', () => { packageDir: process.cwd(), sample: { description: 'TypeScript user can use the type definitions', + devDependencies: ['@types/web'], ts: readFileSync( './system-test/fixtures/sample/src/index.ts' ).toString(), diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts new file mode 100644 index 00000000..f5509e71 --- /dev/null +++ b/system-test/reader_client_test.ts @@ -0,0 +1,485 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it} from 'mocha'; +import * as gax from 'google-gax'; +import * as uuid from 'uuid'; +import * as sinon from 'sinon'; +import {BigQuery, TableRow, TableSchema} from '@google-cloud/bigquery'; +import * as protos from '../protos/protos'; +import * as protobuf from 'protobufjs'; +import {ClientOptions} from 'google-gax'; +import * as customerRecordProtoJson from '../samples/customer_record.json'; +import * as bigquerystorage from '../src'; +import * as reader from '../src/reader'; +import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +const {ReadClient, TableReader, ArrowFormat, AvroFormat} = reader; + +const sandbox = sinon.createSandbox(); +afterEach(() => sandbox.restore()); + +if (process.env.NODE_ENV === 'DEBUG') { + reader.setLogFunction(console.log); +} + +const GCLOUD_TESTS_PREFIX = 'nodejs_bqstorage_system_test'; +const bigquery = new BigQuery(); +const generateUuid = () => + `${GCLOUD_TESTS_PREFIX}_${uuid.v4()}`.replace(/-/gi, '_'); +const datasetId = generateUuid(); + +const root = protobuf.Root.fromJSON(customerRecordProtoJson); +if (!root) { + throw Error('Proto must not be undefined'); +} + +describe('reader.ReaderClient', () => { + let projectId: string; + let parent: string; + let tableRef: string; + let tableId: string; + let bqReadClient: bigquerystorage.BigQueryReadClient; + let clientOptions: ClientOptions; + const schema: TableSchema = { + fields: [ + { + name: 'name', + type: 'STRING', + mode: 'REQUIRED', + }, + { + name: 'row_num', + type: 'INTEGER', + mode: 'REQUIRED', + }, + ], + }; + + before(async () => { + await deleteDatasets(); + + await bigquery.createDataset(datasetId); + }); + + beforeEach(async () => { + tableId = generateUuid(); + + const [table] = await bigquery + .dataset(datasetId) + .createTable(tableId, {schema}); + + projectId = table.metadata.tableReference.projectId; + + parent = `projects/${projectId}`; + tableRef = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`; + + await bigquery + .dataset(datasetId) + .table(tableId) + .insert([ + {name: 'Ada Lovelace', row_num: 1}, + {name: 'Alan Turing', row_num: 2}, + {name: 'Bell', row_num: 3}, + ]); + }); + + after(async () => { + await bigquery.dataset(datasetId).delete({force: true}).catch(console.warn); + }); + + beforeEach(async () => { + clientOptions = { + projectId: projectId, + 'grpc.keepalive_time_ms': 30 * 1000, + 'grpc.keepalive_timeout_ms': 10 * 1000, + }; + bqReadClient = new bigquerystorage.BigQueryReadClient(clientOptions); + }); + + afterEach(async () => { + await bqReadClient.close(); + }); + + describe('Common methods', () => { + it('should create a client without arguments', () => { + const client = new ReadClient(); + assert(client.getClient()); + }); + + it('should create a client with arguments: parent, client, opts', async () => { + const client = new ReadClient(clientOptions); + assert(client.getClient()); + const clientId = await client.getClient().getProjectId(); + assert.strictEqual(clientId, clientOptions.projectId); + }); + }); + + describe('Read', () => { + it('should invoke createReadSession and createReadStream without errors', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const session = await client.createReadSession({ + parent, + table: tableRef, + dataFormat: ArrowFormat, + }); + + assert.equal(session.dataFormat, ArrowFormat); + assert.notEqual(session.streams, null); + assert.equal(session.streams?.length, 1); + + const readStream = session.streams![0]; + const stream = await client.createReadStream({ + session, + streamName: readStream.name!, + }); + const rowStream = stream.getRowsStream(); + + const responses: ReadRowsResponse[] = []; + await new Promise((resolve, reject) => { + rowStream.on('data', (data: ReadRowsResponse) => { + responses.push(data); + }); + rowStream.on('error', reject); + rowStream.on('end', () => { + resolve(null); + }); + }); + + assert.equal(responses.length, 1); + + const res = responses[0]; + assert.equal(stream['_offset'], res.rowCount); + stream.close(); + } finally { + client.close(); + } + }); + }); + + describe('ArrowTableReader', () => { + it('should allow to read a table as an Arrow byte stream', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createArrowTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const rawStream = await reader.getStream(); + + const session = reader.getSessionInfo(); + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + const content: Buffer = await new Promise((resolve, reject) => { + let serializedSchema: string | Uint8Array = ''; + if (session?.arrowSchema?.serializedSchema) { + serializedSchema = session?.arrowSchema?.serializedSchema; + } + let buf = Buffer.from(serializedSchema); + rawStream.on('data', (data: Uint8Array) => { + buf = Buffer.concat([buf, data]); + }); + rawStream.on('error', reject); + rawStream.on('end', () => { + resolve(buf); + }); + }); + const table = await tableFromIPC(content); + + assert.equal(table.numRows, 3); + assert.equal(table.numCols, 2); + + reader.close(); + client.close(); + } catch (err) { + console.error('failed', err); + throw err; + } finally { + client.close(); + } + }); + + it('should allow to read a table as a stream of Arrow Record Batches', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createArrowTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const recordBatchStream = await reader.getRecordBatchStream(); + + const session = reader.getSessionInfo(); + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + const batches: RecordBatch[] = []; + for await (const batch of recordBatchStream) { + batches.push(batch); + } + const table = new Table(batches); + + assert.equal(table.numRows, 3); + assert.equal(table.numCols, 2); + + reader.close(); + client.close(); + } catch (err) { + console.error('failed', err); + throw err; + } finally { + client.close(); + } + }); + }); + + describe('TableReader', () => { + it('should allow to read a table as a stream', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const rowStream = await reader.getRowStream(); + const rows: TableRow[] = []; + await new Promise((resolve, reject) => { + rowStream.on('data', (data: TableRow) => { + rows.push(data); + }); + rowStream.on('error', reject); + rowStream.on('end', () => { + resolve(null); + }); + }); + + const session = reader.getSessionInfo(); + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + assert.equal(rows.length, 3); + + reader.close(); + client.close(); + } finally { + client.close(); + } + }); + + it('should allow to read a table as tabledata.list RowsResponse', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const [rows, session, response] = await reader.getRows(); + + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + assert.notEqual(response.totalRows, null); // estimated row count + assert.equal(response.rows?.length, 3); + + assert.equal(rows.length, 3); + + reader.close(); + client.close(); + } finally { + client.close(); + } + }); + }); + + describe('Error Scenarios', () => { + it('send request with mismatched selected fields', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + let foundError: gax.GoogleError | null = null; + try { + const rowStream = await reader.getRowStream({ + selectedFields: 'wrong_field', + }); + const rows: TableRow[] = []; + for await (const data of rowStream) { + rows.push(data); + } + } catch (err) { + assert.notEqual(err, null); + foundError = err as gax.GoogleError; + } + + assert.notEqual(foundError, null); + assert.equal(foundError?.code, gax.Status.INVALID_ARGUMENT); + assert.equal( + foundError?.message.includes( + 'request failed: The following selected fields do not exist in the table schema: wrong_field' + ), + true + ); + + reader.close(); + } finally { + client.close(); + } + }); + + it('should trigger reconnection when intermitent error happens', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + await reader.getRowStream(); + + // access private stream connection + const stream = reader['_arrowReader']['_session']['_readStreams'][0]; + let reconnectedCalled = false; + sandbox.stub(stream, 'reconnect').callsFake(() => { + reconnectedCalled = true; + }); + const conn = stream['_connection'] as gax.CancellableStream; // private method + + const gerr = new gax.GoogleError('aborted'); + gerr.code = gax.Status.ABORTED; + conn.emit('error', gerr); + conn.emit('close'); + + assert.equal(reconnectedCalled, true); + + reader.close(); + } catch (err) { + console.log('scenario err', err); + } finally { + client.close(); + } + }); + }); + + describe('close', () => { + it('should invoke close without errors', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const session = await client.createReadSession({ + parent: `projects/${projectId}`, + table: `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`, + dataFormat: AvroFormat, + }); + + assert.equal(session.dataFormat, AvroFormat); + assert.notEqual(session.streams, null); + assert.notEqual(session.streams?.length, 0); + + const readStream = session.streams![0]; + const connection = await client.createReadStream({ + session, + streamName: readStream.name!, + }); + const internalConn = connection['_connection']!; + + connection.close(); + client.close(); + assert.strictEqual(client.isOpen(), false); + assert.strictEqual(internalConn.destroyed, true); + } finally { + client.close(); + } + }); + }); + + // Only delete a resource if it is older than 24 hours. That will prevent + // collisions with parallel CI test runs. + function isResourceStale(creationTime: number) { + const oneDayMs = 86400000; + const now = new Date(); + const created = new Date(creationTime); + return now.getTime() - created.getTime() >= oneDayMs; + } + + async function deleteDatasets() { + let [datasets] = await bigquery.getDatasets(); + datasets = datasets.filter(dataset => + dataset.id?.includes(GCLOUD_TESTS_PREFIX) + ); + + for (const dataset of datasets) { + const [metadata] = await dataset.getMetadata(); + const creationTime = Number(metadata.creationTime); + if (isResourceStale(creationTime)) { + try { + await dataset.delete({force: true}); + } catch (e) { + console.log(`dataset(${dataset.id}).delete() failed`); + console.log(e); + } + } + } + } +}); diff --git a/test/reader/arrow_transform.ts b/test/reader/arrow_transform.ts index 7b10c3be..e8081b40 100644 --- a/test/reader/arrow_transform.ts +++ b/test/reader/arrow_transform.ts @@ -15,10 +15,7 @@ import * as assert from 'assert'; import {describe, it} from 'mocha'; import * as protos from '../../protos/protos'; -import { - RecordBatchStreamWriter, - tableFromArrays, -} from 'apache-arrow'; +import {RecordBatchStreamWriter, tableFromArrays} from 'apache-arrow'; import {Readable} from 'stream'; import { ArrowRawTransform, @@ -58,19 +55,22 @@ describe('Arrow Transform', () => { ], }; const table = tableFromArrays({ - name: ['Ada Lovelace', 'Alan Turing'], - row: [1, 2], + name: ['Ada Lovelace', 'Alan Turing', 'Bell'], + row: [1, 2, 3], arr: [ [10, 20], [20, 30], + [30, 40], ], rec: [ {key: 'foo', value: 'bar'}, {key: 'test', value: 'baz'}, + {key: 'a key', value: 'a value'}, ], recs: [ [{num: 10}, {num: 20}], [{num: 20}, {num: 30}], + [{num: 30}, {num: 40}], ], }); const writer = RecordBatchStreamWriter.writeAll(table); @@ -88,25 +88,17 @@ describe('Arrow Transform', () => { const pipeline = Readable.from([response]) .pipe(new ArrowRawTransform()) - .pipe( - new ArrowRecordReaderTransform({ - arrowSchema: { - serializedSchema, - }, - }) - ) + .pipe(new ArrowRecordReaderTransform({arrowSchema: {serializedSchema}})) .pipe(new ArrowRecordBatchTransform()) .pipe(new ArrowRecordBatchTableRowTransform()); - const promise = new Promise(resolve => { + const consumeRows = new Promise(resolve => { const rows: any[] = []; pipeline - .on('data', data => { - rows.push(data); - }) + .on('data', data => rows.push(data)) .on('end', () => resolve(rows)); }); - const tableRows = await promise; + const tableRows = await consumeRows; const rows = BigQuery.mergeSchemaWithRows_(schema, tableRows, { wrapIntegers: false, }); @@ -125,6 +117,13 @@ describe('Arrow Transform', () => { rec: {key: 'test', value: 'baz'}, recs: [{num: 20}, {num: 30}], }, + { + name: 'Bell', + row: 3, + arr: [30, 40], + rec: {key: 'a key', value: 'a value'}, + recs: [{num: 30}, {num: 40}], + }, ]); }); }); From ac0a01828a42d99f80abc5b6e7b0f5a04eb500e2 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 18 Jul 2024 16:49:30 -0400 Subject: [PATCH 16/26] fix: rollback arrow to v14 --- package.json | 2 +- system-test/reader_client_test.ts | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index b9a287c9..2b336d5c 100644 --- a/package.json +++ b/package.json @@ -27,8 +27,8 @@ "precompile": "gts clean" }, "dependencies": { - "apache-arrow": "^17.0.0", "@google-cloud/paginator": "^5.0.0", + "apache-arrow": "^14.0.2", "extend": "^3.0.2", "google-auth-library": "^9.6.3", "google-gax": "^4.3.1" diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index f5509e71..a0fa1c41 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -28,7 +28,7 @@ import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; type ReadRowsResponse = protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; -const {ReadClient, TableReader, ArrowFormat, AvroFormat} = reader; +const {ReadClient, ArrowFormat, AvroFormat} = reader; const sandbox = sinon.createSandbox(); afterEach(() => sandbox.restore()); @@ -217,9 +217,6 @@ describe('reader.ReaderClient', () => { reader.close(); client.close(); - } catch (err) { - console.error('failed', err); - throw err; } finally { client.close(); } @@ -256,9 +253,6 @@ describe('reader.ReaderClient', () => { reader.close(); client.close(); - } catch (err) { - console.error('failed', err); - throw err; } finally { client.close(); } From 26d5b95c40af5855dadb631825640a248d9de191 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 23 Jul 2024 16:45:58 -0400 Subject: [PATCH 17/26] fix: add node 14 pollyfil for array.at --- package.json | 1 + src/reader/index.ts | 3 +++ 2 files changed, 4 insertions(+) diff --git a/package.json b/package.json index 2b336d5c..6f99e2a7 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "dependencies": { "@google-cloud/paginator": "^5.0.0", "apache-arrow": "^14.0.2", + "core-js": "^3.37.1", "extend": "^3.0.2", "google-auth-library": "^9.6.3", "google-gax": "^4.3.1" diff --git a/src/reader/index.ts b/src/reader/index.ts index 49983942..280011fc 100644 --- a/src/reader/index.ts +++ b/src/reader/index.ts @@ -30,3 +30,6 @@ export {ArrowTableReader} from './arrow_reader'; export {ReadStream} from './read_stream'; export {DataFormat, ArrowFormat, AvroFormat} from './data_format'; export {setLogFunction} from '../util/logger'; + +// polyfill array.at for Node < 14. Remove after Node 14 is deprecated. +import 'core-js/full/array/at'; From 456f2d42dbcf6ace787caf4e3ef0c34fc4ffd751 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 24 Jul 2024 16:23:54 -0400 Subject: [PATCH 18/26] fix: properly close connection --- src/reader/read_stream.ts | 18 ++++++++++-------- system-test/reader_client_test.ts | 20 ++++++++++++-------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index cc5e84da..168567de 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -91,7 +91,6 @@ export class ReadStream { }); this._connection.on('end', () => { this.trace('connection ended'); - this.close(); }); } @@ -111,7 +110,7 @@ export class ReadStream { this.reconnect(); return; } - this._readStream?.emit('error', err); + this._readStream?.emit('error', err); }; private isRetryableError(err?: gax.GoogleError | null): boolean { @@ -175,12 +174,15 @@ export class ReadStream { * Close the read stream connection. */ close() { - if (!this._connection) { - return; + if (this._connection) { + this._connection.end(); + this._connection.removeAllListeners(); + this._connection.destroy(); + this._connection = null; + } + if (this._readStream){ + this._readStream.destroy(); + this._readStream = undefined; } - this._connection.end(); - this._connection.removeAllListeners(); - this._connection.destroy(); - this._connection = null; } } diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index a0fa1c41..cda92398 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -43,6 +43,11 @@ const generateUuid = () => `${GCLOUD_TESTS_PREFIX}_${uuid.v4()}`.replace(/-/gi, '_'); const datasetId = generateUuid(); +const sleep = (ms: number) => + new Promise(resolve => { + setTimeout(resolve, ms); + }); + const root = protobuf.Root.fromJSON(customerRecordProtoJson); if (!root) { throw Error('Proto must not be undefined'); @@ -216,7 +221,6 @@ describe('reader.ReaderClient', () => { assert.equal(table.numCols, 2); reader.close(); - client.close(); } finally { client.close(); } @@ -252,7 +256,6 @@ describe('reader.ReaderClient', () => { assert.equal(table.numCols, 2); reader.close(); - client.close(); } finally { client.close(); } @@ -293,7 +296,6 @@ describe('reader.ReaderClient', () => { assert.equal(rows.length, 3); reader.close(); - client.close(); } finally { client.close(); } @@ -324,7 +326,6 @@ describe('reader.ReaderClient', () => { assert.equal(rows.length, 3); reader.close(); - client.close(); } finally { client.close(); } @@ -406,8 +407,6 @@ describe('reader.ReaderClient', () => { assert.equal(reconnectedCalled, true); reader.close(); - } catch (err) { - console.log('scenario err', err); } finally { client.close(); } @@ -436,14 +435,19 @@ describe('reader.ReaderClient', () => { session, streamName: readStream.name!, }); + await sleep(100); + const internalConn = connection['_connection']!; connection.close(); + assert.strictEqual(internalConn.destroyed, true); + client.close(); assert.strictEqual(client.isOpen(), false); - assert.strictEqual(internalConn.destroyed, true); } finally { - client.close(); + if (client.isOpen()) { + client.close(); + } } }); }); From a02c634c6524e474dd7549a1db2147b616e1a7e7 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Wed, 24 Jul 2024 16:26:40 -0400 Subject: [PATCH 19/26] fix: lint issue --- src/reader/read_stream.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index 168567de..f4810c3f 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -110,7 +110,7 @@ export class ReadStream { this.reconnect(); return; } - this._readStream?.emit('error', err); + this._readStream?.emit('error', err); }; private isRetryableError(err?: gax.GoogleError | null): boolean { @@ -174,15 +174,15 @@ export class ReadStream { * Close the read stream connection. */ close() { - if (this._connection) { + if (this._connection) { this._connection.end(); this._connection.removeAllListeners(); this._connection.destroy(); this._connection = null; } - if (this._readStream){ - this._readStream.destroy(); - this._readStream = undefined; + if (this._readStream) { + this._readStream.destroy(); + this._readStream = undefined; } } } From 4b41b3ef65e28168a7778d303d343625b28db6be Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 24 Jul 2024 20:32:47 +0000 Subject: [PATCH 20/26] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 1 + samples/README.md | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/README.md b/README.md index f9f0725b..8ce0bcb1 100644 --- a/README.md +++ b/README.md @@ -298,6 +298,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-bigquery-st | Append_rows_table_to_proto2 | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_table_to_proto2.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/append_rows_table_to_proto2.js,samples/README.md) | | Customer_record_pb | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/customer_record_pb.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/customer_record_pb.js,samples/README.md) | | BigQuery Storage Quickstart | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/quickstart.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/quickstart.js,samples/README.md) | +| Read_rows | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/read_rows.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/read_rows.js,samples/README.md) | | Sample_data_pb | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/sample_data_pb.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/sample_data_pb.js,samples/README.md) | diff --git a/samples/README.md b/samples/README.md index 7769d773..5affea33 100644 --- a/samples/README.md +++ b/samples/README.md @@ -125,6 +125,7 @@ See sample code on the [Quickstart section](#quickstart). * [Append_rows_table_to_proto2](#append_rows_table_to_proto2) * [Customer_record_pb](#customer_record_pb) * [BigQuery Storage Quickstart](#bigquery-storage-quickstart) + * [Read_rows](#read_rows) * [Sample_data_pb](#sample_data_pb) ## Before you begin @@ -280,6 +281,23 @@ __Usage:__ +### Read_rows + +View the [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/read_rows.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/read_rows.js,samples/README.md) + +__Usage:__ + + +`node samples/read_rows.js` + + +----- + + + + ### Sample_data_pb View the [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/sample_data_pb.js). From d266408d033a5a4e7eb4397ab180495b49fd997d Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 30 Jul 2024 13:08:21 -0400 Subject: [PATCH 21/26] fix: address PR comments --- samples/read_rows.js | 78 ------------------------------- src/reader/arrow_reader.ts | 10 ++-- src/reader/read_client.ts | 11 ----- src/reader/read_session.ts | 19 +++----- src/reader/read_stream.ts | 8 ++-- src/reader/table_reader.ts | 10 ++-- system-test/reader_client_test.ts | 5 +- 7 files changed, 23 insertions(+), 118 deletions(-) delete mode 100644 samples/read_rows.js diff --git a/samples/read_rows.js b/samples/read_rows.js deleted file mode 100644 index dd6a8a3c..00000000 --- a/samples/read_rows.js +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -'use strict'; - -function main( - sqlQuery = 'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null' -) { - // [START bigquerystorage_read_table] - const {reader} = require('@google-cloud/bigquery-storage'); - const {ReadClient} = reader; - const {BigQuery} = require('@google-cloud/bigquery'); - - async function readRows() { - const readClient = new ReadClient(); - const bigquery = new BigQuery(); - - try { - sqlQuery = - 'SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM `bigquery-public-data.samples.github_timeline` where repository_url is not null LIMIT 300000'; - - const [job] = await bigquery.createQueryJob({ - query: sqlQuery, - location: 'US', - }); - console.log('job info: ', job.id); - const [metadata] = await job.getMetadata(); - console.log('job metadata: ', metadata.configuration); - const qconfig = metadata.configuration.query; - const dstTableRef = qconfig.destinationTable; - const table = bigquery - .dataset(dstTableRef.datasetId, { - projectId: dstTableRef.projectId, - }) - .table(dstTableRef.tableId); - const [md] = await table.getMetadata({ - view: 'BASIC', - }); - - console.log('table', table.dataset.projectId, table.dataset.id, table.id); - const treader = await readClient.createTableReader({table: dstTableRef}); - const [rawRows] = await treader.getRows(); - const rows = BigQuery.mergeSchemaWithRows_(md.schema, rawRows, {}); - rows.forEach(row => { - const url = row['url']; - const owner = row['owner']; - const forks = row['forks']; - console.log(`url: ${url}, owner: ${owner}, ${forks} forks`); - }); - console.log('Query Results:', rows.length); - } catch (err) { - console.log(err); - } finally { - console.log('ended'); - readClient.close(); - } - } - // [END bigquerystorage_read_table] - readRows().then(() => { - console.log('Done'); - }); -} -process.on('unhandledRejection', err => { - console.error(err.message); - process.exitCode = 1; -}); -main(...process.argv.slice(2)); diff --git a/src/reader/arrow_reader.ts b/src/reader/arrow_reader.ts index 8638b193..01439769 100644 --- a/src/reader/arrow_reader.ts +++ b/src/reader/arrow_reader.ts @@ -36,7 +36,7 @@ type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; * @memberof reader */ export class ArrowTableReader { - private _table: TableReference; + private _tableRef: TableReference; private _session: ReadSession; /** @@ -46,16 +46,16 @@ export class ArrowTableReader { * @param {ReadClient} readClient - Storage Read Client. * @param {TableReference} table - target table to read data from. */ - constructor(readClient: ReadClient, table: TableReference) { - this._table = table; - this._session = new ReadSession(readClient, table, ArrowFormat); + constructor(readClient: ReadClient, tableRef: TableReference) { + this._tableRef = tableRef; + this._session = new ReadSession(readClient, tableRef, ArrowFormat); } // eslint-disable-next-line @typescript-eslint/no-explicit-any private trace(msg: string, ...otherArgs: any[]) { logger( 'arrow_table_reader', - `[table: ${this._table.tableId}]`, + `[table: ${this._tableRef.tableId}]`, msg, ...otherArgs ); diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts index d13e07b6..5b89ae12 100644 --- a/src/reader/read_client.ts +++ b/src/reader/read_client.ts @@ -56,7 +56,6 @@ export type TableReference = { */ export class ReadClient { private _client: BigQueryReadClient; - private _open: boolean; constructor(opts?: ClientOptions) { const baseOptions = { @@ -68,7 +67,6 @@ export class ReadClient { ...baseOptions, ...opts, }); - this._open = false; } /** @@ -84,7 +82,6 @@ export class ReadClient { */ initialize = async (): Promise => { await this._client.initialize(); - this._open = true; }; getClient = (): BigQueryReadClient => { @@ -95,13 +92,6 @@ export class ReadClient { this._client = client; }; - /** - * Check if client is open and ready to send requests. - */ - isOpen(): boolean { - return this._open; - } - /** * Creates a new read session. A read session divides the contents of a * BigQuery table into one or more streams, which can then be used to read @@ -208,6 +198,5 @@ export class ReadClient { close() { this._client.close(); - this._open = false; } } diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts index 5f10f44a..c08ce9b5 100644 --- a/src/reader/read_session.ts +++ b/src/reader/read_session.ts @@ -48,19 +48,19 @@ export type GetStreamOptions = { */ export class ReadSession { private _info: ReadSessionInfo | null; - private _table: TableReference; + private _tableRef: TableReference; private _format: DataFormat; private _readStreams: ReadStream[]; private _readClient: ReadClient; constructor( readClient: ReadClient, - table: TableReference, + tableRef: TableReference, format: DataFormat ) { this._info = null; this._format = format; - this._table = table; + this._tableRef = tableRef; this._readClient = readClient; this._readStreams = []; } @@ -81,8 +81,8 @@ export class ReadSession { return this._info; } const session = await this._readClient.createReadSession({ - parent: `projects/${this._table.projectId}`, - table: `projects/${this._table.projectId}/datasets/${this._table.datasetId}/tables/${this._table.tableId}`, + parent: `projects/${this._tableRef.projectId}`, + table: `projects/${this._tableRef.projectId}/datasets/${this._tableRef.datasetId}/tables/${this._tableRef.tableId}`, dataFormat: this._format, selectedFields: options?.selectedFields?.split(','), }); @@ -142,13 +142,8 @@ export class ReadSession { async function* mergeStreams(readables: Readable[]) { for (const readable of readables) { - try { - for await (const chunk of readable) { - yield chunk; - } - } catch (err) { - console.log('mergeStream err', err); - throw err; + for await (const chunk of readable) { + yield chunk; } } } diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index f4810c3f..c94f7bd6 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -40,8 +40,8 @@ export class ReadStream { private _offset: number; private _readClient: ReadClient; private _session: ReadSession; - private _readStream?: Readable; - private _connection?: gax.CancellableStream | null; + private _readStream: Readable | null; + private _connection: gax.CancellableStream | null; private _callOptions?: gax.CallOptions; constructor( @@ -54,6 +54,8 @@ export class ReadStream { this._session = session; this._offset = 0; this._readClient = readClient; + this._connection = null; + this._readStream = null; this._callOptions = options; this.open(); } @@ -182,7 +184,7 @@ export class ReadStream { } if (this._readStream) { this._readStream.destroy(); - this._readStream = undefined; + this._readStream = null; } } } diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts index 4c08dc51..8ccef34a 100644 --- a/src/reader/table_reader.ts +++ b/src/reader/table_reader.ts @@ -63,7 +63,7 @@ type RowsResponse = [any[], ReadSessionInfo | null, TableDataList]; */ export class TableReader { private _arrowReader: ArrowTableReader; - private _table: TableReference; + private _tableRef: TableReference; /** * Creates a new Reader instance. @@ -72,16 +72,16 @@ export class TableReader { * @param {TableReference} params.table - The stream connection * to the BigQuery streaming insert operation. */ - constructor(readClient: ReadClient, table: TableReference) { - this._table = table; - this._arrowReader = new ArrowTableReader(readClient, table); + constructor(readClient: ReadClient, tableRef: TableReference) { + this._tableRef = tableRef; + this._arrowReader = new ArrowTableReader(readClient, tableRef); } // eslint-disable-next-line @typescript-eslint/no-explicit-any private trace(msg: string, ...otherArgs: any[]) { logger( 'table_reader', - `[table: ${this._table.tableId}]`, + `[table: ${this._tableRef.tableId}]`, msg, ...otherArgs ); diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index cda92398..db39f2c0 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -443,11 +443,8 @@ describe('reader.ReaderClient', () => { assert.strictEqual(internalConn.destroyed, true); client.close(); - assert.strictEqual(client.isOpen(), false); } finally { - if (client.isOpen()) { - client.close(); - } + client.close(); } }); }); From 0d1af647a341d087413fc1f76a02b69b760b3643 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 30 Jul 2024 17:20:54 +0000 Subject: [PATCH 22/26] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 1 - samples/README.md | 18 ------------------ 2 files changed, 19 deletions(-) diff --git a/README.md b/README.md index 8ce0bcb1..f9f0725b 100644 --- a/README.md +++ b/README.md @@ -298,7 +298,6 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-bigquery-st | Append_rows_table_to_proto2 | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/append_rows_table_to_proto2.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/append_rows_table_to_proto2.js,samples/README.md) | | Customer_record_pb | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/customer_record_pb.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/customer_record_pb.js,samples/README.md) | | BigQuery Storage Quickstart | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/quickstart.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/quickstart.js,samples/README.md) | -| Read_rows | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/read_rows.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/read_rows.js,samples/README.md) | | Sample_data_pb | [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/sample_data_pb.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/sample_data_pb.js,samples/README.md) | diff --git a/samples/README.md b/samples/README.md index 5affea33..7769d773 100644 --- a/samples/README.md +++ b/samples/README.md @@ -125,7 +125,6 @@ See sample code on the [Quickstart section](#quickstart). * [Append_rows_table_to_proto2](#append_rows_table_to_proto2) * [Customer_record_pb](#customer_record_pb) * [BigQuery Storage Quickstart](#bigquery-storage-quickstart) - * [Read_rows](#read_rows) * [Sample_data_pb](#sample_data_pb) ## Before you begin @@ -281,23 +280,6 @@ __Usage:__ -### Read_rows - -View the [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/read_rows.js). - -[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-bigquery-storage&page=editor&open_in_editor=samples/read_rows.js,samples/README.md) - -__Usage:__ - - -`node samples/read_rows.js` - - ------ - - - - ### Sample_data_pb View the [source code](https://github.com/googleapis/nodejs-bigquery-storage/blob/main/samples/sample_data_pb.js). From 88449ea60f3b8efedea76c980904032fce0033aa Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Thu, 15 Aug 2024 12:50:44 -0400 Subject: [PATCH 23/26] fix: remove failed precondition from retry predicate --- src/reader/read_stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index c94f7bd6..e7a5c3a6 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -123,7 +123,6 @@ export class ReadStream { gax.Status.ABORTED, gax.Status.CANCELLED, gax.Status.DEADLINE_EXCEEDED, - gax.Status.FAILED_PRECONDITION, gax.Status.INTERNAL, gax.Status.UNAVAILABLE, ]; From 898ae4bd5b19dcf1b7e4fb5c4bf4501bb563cddc Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Tue, 3 Sep 2024 17:23:34 -0400 Subject: [PATCH 24/26] fix: address pr comments and add bigger table test --- src/reader/read_stream.ts | 3 ++- system-test/reader_client_test.ts | 36 +++++++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts index e7a5c3a6..7e528e4e 100644 --- a/src/reader/read_stream.ts +++ b/src/reader/read_stream.ts @@ -112,7 +112,8 @@ export class ReadStream { this.reconnect(); return; } - this._readStream?.emit('error', err); + this._readStream?.destroy(err); + this._readStream = null; }; private isRetryableError(err?: gax.GoogleError | null): boolean { diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts index db39f2c0..82fd91a7 100644 --- a/system-test/reader_client_test.ts +++ b/system-test/reader_client_test.ts @@ -330,6 +330,40 @@ describe('reader.ReaderClient', () => { client.close(); } }); + + it('should allow to read a table with long running query', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const genTableId = generateUuid(); + await bigquery.query( + `CREATE TABLE ${projectId}.${datasetId}.${genTableId} AS SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num` + ); + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId: genTableId, + }, + }); + + const [rows, session, response] = await reader.getRows(); + + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + assert.notEqual(response.totalRows, null); // estimated row count + assert.equal(response.rows?.length, 1000000); + + assert.equal(rows.length, 1000000); + + reader.close(); + } finally { + client.close(); + } + }).timeout(30 * 1000); }); describe('Error Scenarios', () => { @@ -405,8 +439,6 @@ describe('reader.ReaderClient', () => { conn.emit('close'); assert.equal(reconnectedCalled, true); - - reader.close(); } finally { client.close(); } From 872f0f30a0d1c37abba53db4cd53f232d6b9f266 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 6 Sep 2024 15:03:46 +0000 Subject: [PATCH 25/26] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- protos/protos.d.ts | 9 --------- protos/protos.js | 21 +++------------------ protos/protos.json | 3 +++ 3 files changed, 6 insertions(+), 27 deletions(-) diff --git a/protos/protos.d.ts b/protos/protos.d.ts index 6a9bd20d..9592df04 100644 --- a/protos/protos.d.ts +++ b/protos/protos.d.ts @@ -1649,9 +1649,6 @@ export namespace google { /** ReadRowsResponse schema. */ public schema?: ("avroSchema"|"arrowSchema"); - /** ReadRowsResponse _uncompressedByteSize. */ - public _uncompressedByteSize?: "uncompressedByteSize"; - /** * Creates a new ReadRowsResponse instance using the specified properties. * @param [properties] Properties to set @@ -3788,12 +3785,6 @@ export namespace google { /** TableReadOptions outputFormatSerializationOptions. */ public outputFormatSerializationOptions?: ("arrowSerializationOptions"|"avroSerializationOptions"); - /** TableReadOptions _samplePercentage. */ - public _samplePercentage?: "samplePercentage"; - - /** TableReadOptions _responseCompressionCodec. */ - public _responseCompressionCodec?: "responseCompressionCodec"; - /** * Creates a new TableReadOptions instance using the specified properties. * @param [properties] Properties to set diff --git a/protos/protos.js b/protos/protos.js index f27ee65a..360d687a 100644 --- a/protos/protos.js +++ b/protos/protos.js @@ -3510,12 +3510,7 @@ set: $util.oneOfSetter($oneOfFields) }); - /** - * ReadRowsResponse _uncompressedByteSize. - * @member {"uncompressedByteSize"|undefined} _uncompressedByteSize - * @memberof google.cloud.bigquery.storage.v1.ReadRowsResponse - * @instance - */ + // Virtual OneOf for proto3 optional field Object.defineProperty(ReadRowsResponse.prototype, "_uncompressedByteSize", { get: $util.oneOfGetter($oneOfFields = ["uncompressedByteSize"]), set: $util.oneOfSetter($oneOfFields) @@ -9045,23 +9040,13 @@ set: $util.oneOfSetter($oneOfFields) }); - /** - * TableReadOptions _samplePercentage. - * @member {"samplePercentage"|undefined} _samplePercentage - * @memberof google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions - * @instance - */ + // Virtual OneOf for proto3 optional field Object.defineProperty(TableReadOptions.prototype, "_samplePercentage", { get: $util.oneOfGetter($oneOfFields = ["samplePercentage"]), set: $util.oneOfSetter($oneOfFields) }); - /** - * TableReadOptions _responseCompressionCodec. - * @member {"responseCompressionCodec"|undefined} _responseCompressionCodec - * @memberof google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions - * @instance - */ + // Virtual OneOf for proto3 optional field Object.defineProperty(TableReadOptions.prototype, "_responseCompressionCodec", { get: $util.oneOfGetter($oneOfFields = ["responseCompressionCodec"]), set: $util.oneOfSetter($oneOfFields) diff --git a/protos/protos.json b/protos/protos.json index 63e97dd1..e5e3e029 100644 --- a/protos/protos.json +++ b/protos/protos.json @@ -1,4 +1,7 @@ { + "options": { + "syntax": "proto3" + }, "nested": { "google": { "nested": { From 94886cc4ad10b0dad24aac73b6cb72dfe2df4df5 Mon Sep 17 00:00:00 2001 From: Alvaro Viebrantz Date: Fri, 20 Sep 2024 15:48:25 -0400 Subject: [PATCH 26/26] build: update types/node to fix build --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index cb5efd94..6b68129f 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ "@google-cloud/bigquery": "^7.5.2", "@types/extend": "^3.0.4", "@types/mocha": "^9.0.0", - "@types/node": "^20.0.0", + "@types/node": "^20.16.5", "@types/sinon": "^17.0.0", "@types/uuid": "^9.0.1", "c8": "^9.0.0",