Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add wrapper for reading table data using Storage API #431

Merged
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
485b33d
feat: add wrapper for reading table data using Storage API
alvarowolfx Mar 25, 2024
39d3370
feat: parse arrow record batches and convert to TableRow
alvarowolfx Mar 28, 2024
1ef1c9b
fix: set arrow to v14
alvarowolfx Apr 2, 2024
4bf1511
feat: add bigquery as dep
alvarowolfx Apr 3, 2024
45a4afa
feat: remove dep on @google-cloud/bigquery
alvarowolfx Apr 18, 2024
aa57c03
fix: Stream.toArray not available on node < 17
alvarowolfx Apr 18, 2024
72afe00
fix: add paginator dep
alvarowolfx Apr 18, 2024
7a5847a
fix: lint issues
alvarowolfx Apr 18, 2024
6b98711
feat: move to stream transform instead of implementing Readable
alvarowolfx May 29, 2024
77fff01
Merge branch 'main' into feat-storage-read-veneer
alvarowolfx May 29, 2024
98546f3
feat: modular arrow streams and transforms
alvarowolfx Jun 14, 2024
bd67c85
docs: update doc strings
alvarowolfx Jul 17, 2024
aaeb3bd
fix: lint issues
alvarowolfx Jul 17, 2024
9fa976a
Merge branch 'main' into feat-storage-read-veneer
alvarowolfx Jul 17, 2024
7382c34
fix: read rows sample
alvarowolfx Jul 17, 2024
63805b3
test: arrow transforms
alvarowolfx Jul 17, 2024
182d323
test: add reader package tests
alvarowolfx Jul 18, 2024
ac0a018
fix: rollback arrow to v14
alvarowolfx Jul 18, 2024
26d5b95
fix: add node 14 pollyfil for array.at
alvarowolfx Jul 23, 2024
456f2d4
fix: properly close connection
alvarowolfx Jul 24, 2024
a02c634
fix: lint issue
alvarowolfx Jul 24, 2024
4b41b3e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 24, 2024
d266408
fix: address PR comments
alvarowolfx Jul 30, 2024
0d1af64
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 30, 2024
88449ea
fix: remove failed precondition from retry predicate
alvarowolfx Aug 15, 2024
f811ead
Merge branch 'main' into feat-storage-read-veneer
alvarowolfx Aug 15, 2024
898ae4b
fix: address pr comments and add bigger table test
alvarowolfx Sep 3, 2024
6a86580
Merge branch 'main' into feat-storage-read-veneer
alvarowolfx Sep 6, 2024
872f0f3
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 6, 2024
052713b
Merge branch 'main' into feat-storage-read-veneer
alvarowolfx Sep 20, 2024
94886cc
build: update types/node to fix build
alvarowolfx Sep 20, 2024
6b9f68c
Merge branch 'main' into feat-storage-read-veneer
gcf-merge-on-green[bot] Sep 20, 2024
67baeca
Merge branch 'main' into feat-storage-read-veneer
alvarowolfx Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@
"precompile": "gts clean"
},
"dependencies": {
"@google-cloud/paginator": "^5.0.0",
"apache-arrow": "^14.0.2",
"core-js": "^3.37.1",
leahecole marked this conversation as resolved.
Show resolved Hide resolved
"extend": "^3.0.2",
"google-gax": "^4.3.1",
"google-auth-library": "^9.6.3"
"google-auth-library": "^9.6.3",
"google-gax": "^4.3.1"
},
"peerDependencies": {
"protobufjs": "^7.2.4"
},
"devDependencies": {
"@google-cloud/bigquery": "^7.0.0",
"@google-cloud/bigquery": "^7.5.2",
"@types/extend": "^3.0.4",
"@types/mocha": "^9.0.0",
"@types/node": "^20.0.0",
Expand All @@ -53,7 +56,7 @@
"pack-n-play": "^2.0.0",
"sinon": "^18.0.0",
"ts-loader": "^9.0.0",
"typescript": "^5.1.6",
"typescript": "^5.5.3",
"uuid": "^9.0.0",
"webpack": "^5.0.0",
"webpack-cli": "^5.0.0"
Expand Down
7 changes: 7 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import * as v1 from './v1';
import * as v1beta1 from './v1beta1';
import * as managedwriter from './managedwriter';
import * as reader from './reader';
const BigQueryReadClient = v1.BigQueryReadClient;
type BigQueryReadClient = v1.BigQueryReadClient;
const BigQueryWriteClient = v1.BigQueryWriteClient;
Expand All @@ -27,6 +28,8 @@ const BigQueryStorageClient = v1beta1.BigQueryStorageClient;
type BigQueryStorageClient = v1beta1.BigQueryStorageClient;
const WriterClient = managedwriter.WriterClient;
type WriterClient = managedwriter.WriterClient;
const ReadClient = reader.ReadClient;
type ReadClient = reader.ReadClient;
export {
v1,
BigQueryReadClient,
Expand All @@ -35,6 +38,8 @@ export {
BigQueryWriteClient,
managedwriter,
WriterClient,
reader,
ReadClient,
};
// For compatibility with JavaScript libraries we need to provide this default export:
// tslint:disable-next-line no-default-export
Expand All @@ -44,6 +49,8 @@ export default {
BigQueryWriteClient,
managedwriter,
WriterClient,
reader,
ReadClient,
};
import * as protos from '../protos/protos';
export {protos};
Expand Down
101 changes: 101 additions & 0 deletions src/reader/arrow_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {ResourceStream} from '@google-cloud/paginator';
import {RecordBatch} from 'apache-arrow';

import * as protos from '../../protos/protos';
import {TableReference, ReadClient} from './read_client';
import {logger} from '../util/logger';
import {
ArrowRawTransform,
ArrowRecordBatchTransform,
ArrowRecordReaderTransform,
} from './arrow_transform';
import {ReadSession, GetStreamOptions} from './read_session';
import {ArrowFormat} from './data_format';

type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession;

/**
* A BigQuery Storage API Reader that can be used to read data
* from BigQuery Tables using the Storage API in Arrow format.
*
* @class
* @memberof reader
*/
export class ArrowTableReader {
private _tableRef: TableReference;
private _session: ReadSession;

/**
* Creates a new ArrowTableReader instance. Usually created via
* ReadClient.createArrowTableReader().
*
* @param {ReadClient} readClient - Storage Read Client.
* @param {TableReference} table - target table to read data from.
*/
constructor(readClient: ReadClient, tableRef: TableReference) {
this._tableRef = tableRef;
this._session = new ReadSession(readClient, tableRef, ArrowFormat);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as others about the any

private trace(msg: string, ...otherArgs: any[]) {
logger(
'arrow_table_reader',
`[table: ${this._tableRef.tableId}]`,
msg,
...otherArgs
);
}

getSessionInfo(): ReadSessionInfo | undefined | null {
return this._session.getSessionInfo();
}

/**
* Get a byte stream of Arrow Record Batch.
*
* @param {GetStreamOptions} options
*/
async getStream(
options?: GetStreamOptions
): Promise<ResourceStream<Uint8Array>> {
this.trace('getStream', options);
const stream = await this._session.getStream(options);
return stream.pipe(new ArrowRawTransform()) as ResourceStream<Uint8Array>;
leahecole marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Get a stream of Arrow RecordBatch objects.
*
* @param {GetStreamOptions} options
*/
async getRecordBatchStream(
options?: GetStreamOptions
): Promise<ResourceStream<RecordBatch>> {
this.trace('getRecordBatchStream', options);
const stream = await this._session.getStream(options);
const info = this._session.getSessionInfo();
return stream
.pipe(new ArrowRawTransform())
.pipe(new ArrowRecordReaderTransform(info!))
.pipe(new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>;
Comment on lines +92 to +95
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this many streams piped, it is probably better to use pipeline with a common error handler for them - otherwise you run the risk for errors getting swallowed in between the various pipe calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check how to use the pipeline method, that's new to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to use pipeline, but is meant to be used when we have a destination. In this case where, we are just applying a bunch of transforms and we don't know the destination beforehand.

The error that I got:

  TypeError [ERR_INVALID_ARG_TYPE]: The "streams[stream.length - 1]" property must be of type function. Received an instance of ArrowRecordBatchTransform

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am envisioning something like

const outputStream = pipeline([stream, new ArrowRawTransform(), new ArrowRecordReaderTransform(info!), new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>], err => {if (err){throw err})) 

or

const tempStream1 = new ArrowRawTransform();
const tempStream2 = new ArrowRecordReaderTransform(info!)
const tempStream3 = new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>;
const outputStream = pipeline([stream, tempStream1, tempStream2, tempStream3], err => {if (err){throw err})) 

then you'd return that outputStream - are you thinking of something different? I won't die on this hill, but I really got burned by pipes recently and want to save you from that experience

}

close() {
this._session.close();
}
}
176 changes: 176 additions & 0 deletions src/reader/arrow_transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {Transform, TransformCallback} from 'stream';
import {
RecordBatchReader,
RecordBatch,
RecordBatchStreamReader,
Vector,
} from 'apache-arrow';
import * as protos from '../../protos/protos';

type ReadRowsResponse =
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;

interface TableCell {
v?: any;

Check warning on line 29 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
}
interface TableRow {
f?: Array<TableCell>;
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* ArrowRawTransform implements a node stream Transform that reads
* ReadRowsResponse from BigQuery Storage Read API and convert
* a raw Arrow Record Batch.
*/
export class ArrowRawTransform extends Transform {
constructor() {
super({
readableObjectMode: false,
writableObjectMode: true,
});
}

_transform(
response: ReadRowsResponse,
_: BufferEncoding,
callback: TransformCallback
): void {
if (
!(
response.arrowRecordBatch &&
response.arrowRecordBatch.serializedRecordBatch
)
) {
callback(null);
return;
}
callback(null, response.arrowRecordBatch?.serializedRecordBatch);
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* ArrowRecordReaderTransform implements a node stream Transform that reads
* a byte stream of raw Arrow Record Batch and convert to a stream of Arrow
* RecordBatchStreamReader.
*/
export class ArrowRecordReaderTransform extends Transform {
private session: ReadSession;

constructor(session: ReadSession) {
super({
objectMode: true,
});
this.session = session;
}

_transform(
serializedRecordBatch: Uint8Array,
_: BufferEncoding,
callback: TransformCallback
): void {
const buf = Buffer.concat([
this.session.arrowSchema?.serializedSchema as Uint8Array,
serializedRecordBatch,
]);
const reader = RecordBatchReader.from(buf);
callback(null, reader);
}
}

/**
* ArrowRecordBatchTransform implements a node stream Transform that reads
* a RecordBatchStreamReader and convert a stream of Arrow RecordBatch.
*/
export class ArrowRecordBatchTransform extends Transform {
constructor() {
super({
objectMode: true,
});
}

_transform(
reader: RecordBatchStreamReader,
_: BufferEncoding,
callback: TransformCallback
): void {
const batches = reader.readAll();
for (const row of batches) {
this.push(row);
}
callback(null);
}
}

/**
* ArrowRecordBatchTableRowTransform implements a node stream Transform that reads
* an Arrow RecordBatch and convert a stream of BigQuery TableRow.
*/
export class ArrowRecordBatchTableRowTransform extends Transform {
constructor() {
super({
objectMode: true,
});
}

_transform(
batch: RecordBatch,
_: BufferEncoding,
callback: TransformCallback
): void {
const rows = new Array(batch.numRows);
for (let i = 0; i < batch.numRows; i++) {
rows[i] = {
f: new Array(batch.numCols),
};
}
for (let j = 0; j < batch.numCols; j++) {
const column = batch.selectAt([j]);
const columnName = column.schema.fields[0].name;
for (let i = 0; i < batch.numRows; i++) {
const fieldData = column.get(i);
const fieldValue = fieldData?.toJSON()[columnName];
rows[i].f[j] = {
v: convertArrowValue(fieldValue),
};
}
}
for (let i = 0; i < batch.numRows; i++) {
this.push(rows[i]);
}
callback(null);
}
}

function convertArrowValue(fieldValue: any): any {

Check warning on line 159 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

Check warning on line 159 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
if (typeof fieldValue === 'object') {
if (fieldValue instanceof Vector) {
const arr = fieldValue.toJSON();
return arr.map((v: any) => {

Check warning on line 163 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return {v: convertArrowValue(v)};
});
}
const tableRow: TableRow = {f: []};
Object.keys(fieldValue).forEach(key => {
tableRow.f?.push({
v: convertArrowValue(fieldValue[key]),
});
});
return tableRow;
}
return fieldValue;
}
33 changes: 33 additions & 0 deletions src/reader/data_format.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as protos from '../../protos/protos';

export type DataFormat =
protos.google.cloud.bigquery.storage.v1.IReadSession['dataFormat'];
const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat;

/**
* Return data in Apache Arrow format.
*
* @memberof reader
*/
export const ArrowFormat: DataFormat = 'ARROW';

/**
* Return data in Apache Avro format.
*
* @memberof reader
*/
export const AvroFormat: DataFormat = 'AVRO';
Loading
Loading