diff --git a/.changeset/wicked-ligers-hope.md b/.changeset/wicked-ligers-hope.md new file mode 100644 index 00000000..b59c42ed --- /dev/null +++ b/.changeset/wicked-ligers-hope.md @@ -0,0 +1,5 @@ +--- +"@sebspark/pubsub": minor +--- + +Fixes bug that did not check if a schema exists in the correct way. diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json index 1d34fa08..c5fe2dde 100644 --- a/packages/pubsub/package.json +++ b/packages/pubsub/package.json @@ -24,9 +24,7 @@ "@types/html-escaper": "3.0.2", "avsc": "^5.7.7", "express": "4.21.0", - "html-escaper": "3.0.3", - "ts-pattern": "^5.5.0", - "zod": "^3.23.8" + "html-escaper": "3.0.3" }, "keywords": [ "google apis", diff --git a/packages/pubsub/src/lib/publisher.spec.ts b/packages/pubsub/src/lib/publisher.spec.ts index 5935440f..ab67113b 100644 --- a/packages/pubsub/src/lib/publisher.spec.ts +++ b/packages/pubsub/src/lib/publisher.spec.ts @@ -6,20 +6,35 @@ import { } from '@google-cloud/pubsub' import { Type } from 'avsc' import { type Mock, type MockedObject, describe, expect, it, vi } from 'vitest' -import { z } from 'zod' import { createPublisher } from './publisher' -import { zodToAvro } from './zod-to-avro' -const exampleSchema = z.object({ - messageType: z.string(), - created: z.string(), - data: z.string().optional(), -}) +type ExampleMessage = { + messageType: string + message: string +} -type ExampleMessage = z.infer -const exampleAvroSchema = zodToAvro('Example', exampleSchema, { - namespace: 'com.acme.example', -}) +const exampleAvroSchema = ` +{ + "type": "record", + "name": "ExampleMessage", + "namespace": "com.example", + "fields": [ + { + "name": "messageType", + "type": "string" + }, + { + "name": "message", + "type": "string" + } + ] +} +` + +const message = { + messageType: 'type of message', + message: 'message data', +} satisfies ExampleMessage type ExamplePubsubChannels = { example: ExampleMessage @@ -74,14 +89,6 @@ vi.mock('@google-cloud/pubsub', () => { let subscriberFn: (args: { ack: Mock; nack: Mock; data: string }) => void const description = 'This is an example message' -const zodValue = z.object( - { - messageType: z.string(), - message: z.number(), - }, - { description } -) -const avroSchema = zodToAvro('ExampleMessage', zodValue) describe('when creating a new publisher client with no schema and publish a message', () => { it('should call the underlaying api with the correct values and message format', async () => { @@ -96,7 +103,6 @@ describe('when creating a new publisher client with no schema and publish a mess projectId: 'test', }) - const message = { messageType: 'TYPE', created: new Date().toISOString() } await client.topic('example').publish(message) expect(pubSubMock.topic).toBeCalledWith('example') @@ -119,11 +125,10 @@ describe('when creating a new publisher client with schema that does not exist a projectId: 'test', }) - const message = { messageType: 'TYPE', created: new Date().toISOString() } await client .topic('example', { schemaId: 'schemaId', - avroDefinition: JSON.stringify(exampleAvroSchema), + avroDefinition: exampleAvroSchema, }) .publish(message) const schemaType = Type.forSchema(exampleAvroSchema) @@ -152,7 +157,6 @@ describe('when creating a new publisher client with schema that does exist and p projectId: 'test', }) - const message = { messageType: 'TYPE', created: new Date().toISOString() } await client .topic('example', { schemaId: 'schemaId-does-not-exist', diff --git a/packages/pubsub/src/lib/publisher.ts b/packages/pubsub/src/lib/publisher.ts index 45f09289..7d666f6a 100644 --- a/packages/pubsub/src/lib/publisher.ts +++ b/packages/pubsub/src/lib/publisher.ts @@ -21,7 +21,7 @@ const syncTopicSchema = async (client: PubSub, cloudSchema: CloudSchema) => { ) } - const schema = await client.schema(cloudSchema.schemaId) + const schema = client.schema(cloudSchema.schemaId) const exits = await schemaExists(client, cloudSchema.schemaId) if (exits) { @@ -123,7 +123,7 @@ export const createPublisher = >( const schemaExists = async (client: PubSub, schemaId: string) => { for await (const s of client.listSchemas()) { - if (s.name === schemaId) { + if (s.name?.endsWith(`/${schemaId}`)) { return true } } diff --git a/packages/pubsub/src/lib/zod-to-avro.ts b/packages/pubsub/src/lib/zod-to-avro.ts deleted file mode 100644 index 90cbbb7c..00000000 --- a/packages/pubsub/src/lib/zod-to-avro.ts +++ /dev/null @@ -1,131 +0,0 @@ -import type { schema } from 'avsc' -import { P, match } from 'ts-pattern' -import { - ZodArray, - ZodBigInt, - ZodBoolean, - ZodDate, - ZodEnum, - ZodNullable, - ZodNumber, - ZodObject, - ZodOptional, - type ZodRawShape, - ZodString, - type ZodTypeAny, - ZodUnion, -} from 'zod' - -export const zodToAvro = ( - name: string, - zodType: ZodTypeAny, - options?: { namespace: string }, - cache: Map = new Map() -): schema.AvroSchema => { - const fqn = `${options?.namespace}.${name}` - if (cache.has(zodType)) { - return cache.get(zodType) as schema.AvroSchema - } - const retval = match<{ value: ZodTypeAny }, schema.AvroSchema>({ - value: zodType, - }) - .with({ value: P.instanceOf(ZodOptional) }, (zodObject) => { - return Array.from( - new Set( - [ - 'null', - zodToAvro(name, zodObject.value.unwrap(), options, cache), - ].flat() - ) - ) as schema.AvroSchema - }) - .with({ value: P.instanceOf(ZodNullable) }, (zodObject) => { - return Array.from( - new Set( - [ - 'null', - zodToAvro(name, zodObject.value.unwrap(), options, cache), - ].flat() - ) - ) as schema.AvroSchema - }) - .with({ value: P.instanceOf(ZodObject) }, (zodObject) => { - cache.set(zodObject.value, fqn) - return parseZodObjectToAvscRecord(name, zodObject.value, cache, options) - }) - .with({ value: P.instanceOf(ZodString) }, () => { - return 'string' - }) - .with({ value: P.instanceOf(ZodUnion) }, (zodUnion) => { - return Array.from( - new Set( - zodUnion.value.options.flatMap((zodType) => - zodToAvro(name, zodType, options, cache) - ) - ) - ) - }) - .with({ value: P.instanceOf(ZodEnum) }, (zodEnum) => { - cache.set(zodEnum.value, fqn) - return { - name, - type: 'enum', - symbols: zodEnum.value.options, - doc: zodEnum.value.description, - namespace: options?.namespace, - } - }) - .with({ value: P.instanceOf(ZodNumber) }, () => { - return 'double' - }) - .with({ value: P.instanceOf(ZodDate) }, () => { - return 'long' - }) - .with({ value: P.instanceOf(ZodArray) }, (zodArray) => { - return { - type: 'array', - items: zodToAvro( - `${name}-value`, - zodArray.value._def.type, - options, - cache - ), - } - }) - .with({ value: P.instanceOf(ZodBigInt) }, () => { - return 'long' - }) - .with({ value: P.instanceOf(ZodBoolean) }, () => { - return 'boolean' - }) - .otherwise((v) => { - throw new Error(`Unsupported type ${v}`) - }) - return retval -} - -const parseZodObjectToAvscRecord = ( - name: string, - zodObject: ZodObject, - cache: Map, - options?: { namespace: string } -): schema.RecordType => { - const shape = zodObject.shape - const fields = Object.entries(shape).map((k) => { - const type = zodToAvro(k[0], k[1], options, cache) - const name = k[0] - const doc = k[1].description - const fieldDef: schema.RecordType['fields'][number] = { name, type, doc } - if (type === 'null' || (Array.isArray(type) && type.includes('null'))) { - fieldDef.default = null - } - return fieldDef - }) - return { - name, - type: 'record', - fields, - namespace: options?.namespace, - doc: zodObject.description, - } -}