Skip to content

Commit

Permalink
feat: support BQ Storage Write CDC (#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx authored Aug 21, 2024
1 parent 02aef98 commit 8380ca8
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/adapt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ export {
} from './proto';

export {convertBigQuerySchemaToStorageTableSchema} from './schema';

export {withChangeType, withChangeSequenceNumber} from './options';
45 changes: 45 additions & 0 deletions src/adapt/options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

export type AdaptOptions = {
addChangeSequenceNumber: boolean;
addChangeType: boolean;
};

export type AdaptOption = (opts: AdaptOptions) => AdaptOptions;

/**
* Add pseudocolumn `_CHANGE_TYPE` for BigQuery Change Data Capture.
* Used to define the type of change to be professed for each row.
* The pseudocolumn `_CHANGE_TYPE` only accepts the values UPSERT and DELETE.
* See more: https://cloud.google.com/bigquery/docs/change-data-capture#specify_changes_to_existing_records
*/
export function withChangeType(): AdaptOption {
return (opts: AdaptOptions) => ({
...opts,
addChangeType: true,
});
}

/**
* Add pseudocolumn `_CHANGE_SEQUENCE_NUMBER` for BigQuery Change Data Capture.
* Used to change behavior of ordering records with same primary key.
* See more: https://cloud.google.com/bigquery/docs/change-data-capture#manage_custom_ordering
*/
export function withChangeSequenceNumber(): AdaptOption {
return (opts: AdaptOptions) => ({
...opts,
addChangeSequenceNumber: true,
});
}
57 changes: 52 additions & 5 deletions src/adapt/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import * as protos from '../../protos/protos';
import {bqTypeToFieldTypeMap, convertModeToLabel} from './proto_mappings';
import {normalizeFieldType} from './schema_mappings';
import {AdaptOptions, AdaptOption} from './options';

type TableSchema = protos.google.cloud.bigquery.storage.v1.ITableSchema;
type TableFieldSchema =
Expand Down Expand Up @@ -59,12 +60,14 @@ const packedTypes: FieldDescriptorProtoType[] = [
*/
export function convertStorageSchemaToProto2Descriptor(
schema: TableSchema,
scope: string
scope: string,
...opts: AdaptOption[]
): DescriptorProto {
const fds = convertStorageSchemaToFileDescriptorInternal(
schema,
scope,
false
false,
...opts
);
return normalizeDescriptorSet(fds);
}
Expand All @@ -76,17 +79,32 @@ export function convertStorageSchemaToProto2Descriptor(
*/
export function convertStorageSchemaToProto3Descriptor(
schema: TableSchema,
scope: string
scope: string,
...opts: AdaptOption[]
): DescriptorProto {
const fds = convertStorageSchemaToFileDescriptorInternal(schema, scope, true);
const fds = convertStorageSchemaToFileDescriptorInternal(
schema,
scope,
true,
...opts
);
return normalizeDescriptorSet(fds);
}

function convertStorageSchemaToFileDescriptorInternal(
schema: TableSchema,
scope: string,
useProto3: boolean
useProto3: boolean,
...opts: AdaptOption[]
): FileDescriptorSet {
let adaptOpts: AdaptOptions = {
addChangeSequenceNumber: false,
addChangeType: false,
};
opts.forEach(f => {
adaptOpts = f(adaptOpts);
});

let fNumber = 0;
const fields: FieldDescriptorProto[] = [];
const deps = new Map<string, FileDescriptorProto>();
Expand Down Expand Up @@ -150,6 +168,35 @@ function convertStorageSchemaToFileDescriptorInternal(
}
}

if (adaptOpts) {
if (adaptOpts.addChangeSequenceNumber) {
const fdp = convertTableFieldSchemaToFieldDescriptorProto(
{
name: '_CHANGE_SEQUENCE_NUMBER',
type: 'STRING',
mode: 'REQUIRED',
},
991,
scope,
useProto3
);
fields.push(fdp);
}
if (adaptOpts.addChangeType) {
const fdp = convertTableFieldSchemaToFieldDescriptorProto(
{
name: '_CHANGE_TYPE',
type: 'STRING',
mode: 'REQUIRED',
},
992,
scope,
useProto3
);
fields.push(fdp);
}
}

const dp = new DescriptorProto({
name: scope,
field: fields,
Expand Down
125 changes: 125 additions & 0 deletions system-test/managed_writer_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,131 @@ describe('managedwriter.WriterClient', () => {
client.close();
}
}).timeout(30 * 1000);

it('Change data capture (CDC)', async () => {
bqWriteClient.initialize();
const client = new WriterClient();
client.setClient(bqWriteClient);

const schema: TableSchema = {
fields: [
{
name: 'id',
type: 'INTEGER',
mode: 'REQUIRED',
},
{
name: 'username',
type: 'STRING',
mode: 'REQUIRED',
},
],
};
const [table] = await bigquery
.dataset(datasetId)
.createTable(tableId + '_cdc', {
schema,
clustering: {
fields: ['id'],
},
tableConstraints: {
primaryKey: {
columns: ['id'],
},
},
});
const parent = `projects/${projectId}/datasets/${datasetId}/tables/${table.id}`;

const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor: DescriptorProto =
adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'root',
adapt.withChangeType()
);

const row1 = {
id: 1,
username: 'Alice',
_CHANGE_TYPE: 'INSERT',
};

const row2 = {
id: 2,
username: 'Bob',
_CHANGE_TYPE: 'INSERT',
};

try {
const insertConn = await client.createStreamConnection({
streamId: managedwriter.DefaultStream,
destinationTable: parent,
});

const writer = new JSONWriter({
connection: insertConn,
protoDescriptor,
});

let pw = writer.appendRows([row1, row2]);
let result = await pw.getResult();

let [rows] = await bigquery.query(
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by id`
);
assert.strictEqual(rows.length, 2);

const updaterConn = await client.createStreamConnection({
streamId: managedwriter.DefaultStream,
destinationTable: parent,
});

const updater = new JSONWriter({
connection: updaterConn,
protoDescriptor,
});

// Change Alice and send Charles
row1.username = 'Alice in Wonderlands';
row1._CHANGE_TYPE = 'UPSERT';

const row3 = {
id: 3,
username: 'Charles',
_CHANGE_TYPE: 'UPSERT',
};

pw = updater.appendRows([row1, row3]);
result = await pw.getResult();

[rows] = await bigquery.query(
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by id`
);
assert.strictEqual(rows.length, 3);

// Remove Bob
row2._CHANGE_TYPE = 'DELETE';

pw = updater.appendRows([row2]);
result = await pw.getResult();

Check warning on line 905 in system-test/managed_writer_client_test.ts

View workflow job for this annotation

GitHub Actions / lint

'result' is assigned a value but never used

[rows] = await bigquery.query(
`SELECT * FROM \`${projectId}.${datasetId}.${table.id}\` order by id`
);
assert.strictEqual(rows.length, 2);

assert.deepStrictEqual(rows, [
{id: 1, username: 'Alice in Wonderlands'},
{id: 3, username: 'Charles'},
]);

writer.close();
updater.close();
} finally {
client.close();
}
});
});

it('should fill default values when MissingValuesInterpretation is set', async () => {
Expand Down
39 changes: 39 additions & 0 deletions test/adapt/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,45 @@ describe('Adapt Protos', () => {
assert.deepEqual(raw, decoded);
});

it('basic with CDC fields', () => {
const schema = {
fields: [
{
name: 'id',
type: 'INTEGER',
mode: 'NULLABLE',
},
{
name: 'username',
type: 'STRING',
mode: 'REQUIRED',
},
],
};
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
'Test',
adapt.withChangeType(),
adapt.withChangeSequenceNumber()
);
assert.notEqual(protoDescriptor, null);
if (!protoDescriptor) {
throw Error('null proto descriptor set');
}
const TestProto = Type.fromDescriptor(protoDescriptor);
const raw = {
id: 1,
username: 'Alice',
_CHANGE_TYPE: 'INSERT',
_CHANGE_SEQUENCE_NUMBER: 'FF',
};
const serialized = TestProto.encode(raw).finish();
const decoded = TestProto.decode(serialized).toJSON();
assert.deepEqual(raw, decoded);
});

it('nested struct', () => {
const schema = {
fields: [
Expand Down

0 comments on commit 8380ca8

Please sign in to comment.