Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support schema references for Avro, Protocol Buffer, and JSON schema #197

Merged
merged 38 commits into from
Oct 4, 2022

Conversation

bifrost
Copy link
Contributor

@bifrost bifrost commented Mar 16, 2022

In this pull request, the library has been extended, so the schemas can refer to other registered schemas. The implementation is recursive and it is able to handle multi-level schema references.

The referred schemas are fetched from the schema-registry and are kept in order, so the referred schemas can be processed before the referring schemas. The referred schemas are then processed by the respective schema and schema helpers.

The Avro implementation is using the typeHook and is inspired from #92

Also three new documentation pages for references have been added: schema-json.md, schema-protobuf.md and schema-avro.md

Regards
Dan Rasmussen
SumUp

bifrost added 27 commits March 10, 2022 18:15
In this pull request, the library has been extended, so a Protobuf Schema or a JSON Schema can refer to other registered schemas. The implementation is recursive and it is able to handle multi-level Schema nesting.
Add documentation for nested references
add avro doc for nested references
@hilleer
Copy link

hilleer commented Mar 21, 2022

Hi @Nevon

Are you able to assist here or know who could be able to help us in considering this pull request? It seem like an already requested functionality in other pull requests

@Nevon
Copy link
Member

Nevon commented Mar 21, 2022

I have a review in progress, but not finished yet. Realistically, the soonest I will have enough time available is April 4-8.

I haven't yet looked at how the JSON Schema and Protobuf implementations work, but one thing that stood out to me in reading through the Avro implementation is the usage of typehooks. It's not clear to me how this feature will work when users are already using typehooks to implement logical types, especially in nested schemas. From a cursory look, it doesn't look like the implementation is taking this into account since it doesn't wrap the user-provided typehook.

@bifrost
Copy link
Contributor Author

bifrost commented Mar 22, 2022

Hi @Nevon
I change the code, so schema references always are parsed for Avro even if a usertypeHook is defined. The typeHook will be called for each avro.Type.forSchema call in the getAvroSchema method.
I have also added a test that uses a user-defined typeHook for parsing enums https://github.com/debitoor/confluent-schema-registry/blob/master/src/SchemaRegistry.avro.spec.ts#L440

@bifrost
Copy link
Contributor Author

bifrost commented Apr 19, 2022

Hi @Nevon

when do you have time to have a look at the pull request? The update should handle user-provided typeHook as you requested.

@Nevon
Copy link
Member

Nevon commented Jun 7, 2022

I'm currently away on vacation until June 25th, so sometime during that week.

Copy link
Member

@Nevon Nevon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally forgot to come back to this PR once I was back from vacation. Sorry about that.

I even had an in-progress review that was pending from back then, which I'm submitting now.

I have added a calendar event to block time for myself to go through this properly this Friday, and will come back with any additional feedback then.

Sorry again for the huge wait.

package.json Outdated
@@ -1,6 +1,6 @@
{
"name": "@kafkajs/confluent-schema-registry",
"version": "3.2.1",
"version": "3.3.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this. We'll do the version bump and release separately.

Suggested change
"version": "3.3.0",
"version": "3.2.1",

}
```

To registry schemas with references they have to be registered in reverse order, so the referred schemas already exists. In this case B has to be registered before A. Furthermore A must define an array references to the referred schemas. A reference consist of a `name`, that should match the namespace + name, a schema `subject` and a schema `version`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To registry schemas with references they have to be registered in reverse order, so the referred schemas already exists. In this case B has to be registered before A. Furthermore A must define an array references to the referred schemas. A reference consist of a `name`, that should match the namespace + name, a schema `subject` and a schema `version`.
To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:
* `name` - the fully qualified name of the referenced schema. Example: `test.B`
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

Cleaned this up a little bit. One thing I'm not clear about is this bit:

A reference consist of a name that should match the namespace + name , a schema subject and a schema version.

Isn't name actually the type used within the referencing schema? If so, shouldn't we call this type to make the connection clearer? I haven't read through the code yet, but my understanding so far is that a reference is a mapping between a type and a subject-version. If that's correct, I would expect it to be { [type: string]: { subject: string; version: number } }, since right now it's undefined what happens if you provide multiple references with the same name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I understand, you're trying to match the Java API, where it is called name and is (for Avro) the fully qualified name of the schema. Makes sense. Updated my suggestion to match.

"type": "object",
"properties": {
"id": { "type": "number" },
"b": { "$ref": "https://sumup.com/schemas/B" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"b": { "$ref": "https://sumup.com/schemas/B" }
"b": { "$ref": "https://example.com/schemas/B" }

Let's use a reserved domain from good ol' RFC 2606

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bunch of these, so just a global search and replace for sumup.com -> example.com.


```JSON
{
"$id": "https://sumup.com/schemas/B",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"$id": "https://sumup.com/schemas/B",
"$id": "https://example.com/schemas/B",

}
```

To registry schemas with references they have to be registered in reverse order, so the referred schemas already exists. In this case B has to be registered before A. Furthermore A must define an array references to the referred schemas. A reference consist of a `name`, that should match the $ref, a schema `subject` and a schema `version`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To registry schemas with references they have to be registered in reverse order, so the referred schemas already exists. In this case B has to be registered before A. Furthermore A must define an array references to the referred schemas. A reference consist of a `name`, that should match the $ref, a schema `subject` and a schema `version`.
To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:
* `name` - A URL matching the `$ref` from the schema
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use


To registry schemas with references they have to be registered in reverse order, so the referred schemas already exists. In this case B has to be registered before A. Furthermore A must define an array references to the referred schemas. A reference consist of a `name`, that should match the namespace + name, a schema `subject` and a schema `version`.

Notice the library will handle an arbitrary number of nested levels.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Notice the library will handle an arbitrary number of nested levels.
The library will handle an arbitrary number of nested levels.

src/@types.ts Outdated
@@ -60,16 +67,28 @@ export interface ConfluentSubject {
export interface AvroConfluentSchema {
type: SchemaType.AVRO
schema: string | RawAvroSchema
references?: ReferenceType[]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this a SchemaReference instead. Everything in this file is a Type, after all, so there's no point in including the word Type in there.

@@ -21,7 +24,24 @@ export default class AvroHelper implements SchemaHelper {
? schema
: this.getRawAvroSchema(schema)
// @ts-ignore TODO: Fix typings for Schema...
const avroSchema: AvroSchema = avro.Type.forSchema(rawSchema, opts)

// eslint-disable-next-line @typescript-eslint/no-this-alias
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do this if you use an arrow function.

const avroSchema = avro.Type.forSchema(rawSchema, {
      ...opts,
      typeHook:
        opts?.typeHook ??
        ((_schema: avro.Schema, opts: AvroOptions) => {
          // Since AvroOptions is a superset of ForSchemaOptions and `referredSchemas` is optional, we don't need to cast
          opts?.referredSchemas?.forEach(subSchema => {
            const rawSubSchema = this.getRawAvroSchema(subSchema)
            avroOpts.typeHook = undefined
            avro.Type.forSchema(rawSubSchema, avroOpts)
          })

          // avsc allows returning undefined, and will use the default type in that case, but the typings don't allow it,
         // hence this little hack
          return (undefined as unknown) as avro.Type;
        }),
    })

const avroOpts = opts as AvroOptions
avroOpts?.referredSchemas?.forEach(subSchema => {
const rawSubSchema = me.getRawAvroSchema(subSchema)
avroOpts.typeHook = undefined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the typehook here? Doesn't that mean that any type extensions will fail in nested schemas?

...opts,
// @ts-ignore
typeHook:
opts?.typeHook ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this would mean that you can either use a typehook or you can use schema references? A better solution might be to wrap the user's typeHook and execute it after we do our reference resolution. Something like:

const avroSchema = avro.Type.forSchema(rawSchema, {
  ...opts,
  typeHook: (_schema, opts: AvroOptions) => {
    const { referredSchemas, ...rest } = opts
    referredSchemas?.forEach(subSchema => {
      const rawSubSchema = this.getRawAvroSchema(subSchema)
      rest.typeHook = undefined
      avro.Type.forSchema(rawSubSchema, opts)
    })

    if (opts.typeHook) {
      return opts.typeHook(_schema, rest as ForSchemaOptions)
    }
    return (undefined as unknown) as avro.Type
  },
})```

Copy link
Member

@Nevon Nevon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My biggest question mark is around compatibility with the options for Avro schemas. I'm not sure that the options are actually being respected for both the old API and new API, and the added tests are only exercising the legacy API, as far as I can tell.

}
```

To registry schemas with references they have to be registered in reverse order, so the referred schemas already exists. In this case B has to be registered before A. Furthermore A must define an array references to the referred schemas. A reference consist of a `name`, that should match the namespace + name, a schema `subject` and a schema `version`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just food for thought and not something I think should hold up the adoption of this PR, but given the complexity of determining the registration order of schemas, perhaps we should offer a way to register multiple schemas in a single function call, i.e. await registry.register(schema: Schema[]) so that we can determine which order the schemas need to be registered in instead of having the user needing to figure that out.

src/@types.ts Outdated
export type JsonOptions = ConstructorParameters<typeof Ajv>[0] & {
ajvInstance?: {
compile: (schema: any) => ValidateFunction
}
referredSchemas?: JsonConfluentSchema[]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the named referredSchemas since both we and the registry use the term reference for this feature, and the types are called SchemaReference, ReferenceType etc. It seems more natural to me to replace all instances of referred with referenced, i.e. referencedSchemas, addReferencedSchemas etc. That way it's obvious that this is related to the "schema refererences" feature.

) {
const helper = helperTypeFromSchemaType(schema.type)
const referredSchemas = await this.getReferredSchemas(schema, helper)
return helper.updateOptionsFromSchemaReferences(options as ProtocolOptions, referredSchemas)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand the casting of options to ProtocolOptions here. I don't think we know that options isn't LegacyOptions at this point, since the type hasn't been narrowed down to ProtocolOptions.

I think you have to do something similar to

try {
let schema: Schema
switch (confluentSchema.type) {
case SchemaType.AVRO: {
const opts: AvroOptions | undefined =
(options as LegacyOptions)?.forSchemaOptions ||
(options as ProtocolOptions)?.[SchemaType.AVRO]
schema = (helperTypeFromSchemaType(confluentSchema.type) as AvroHelper).getAvroSchema(
confluentSchema,
opts,
)
break
}
case SchemaType.JSON: {
const opts: JsonOptions | undefined = (options as ProtocolOptions)?.[SchemaType.JSON]
schema = new JsonSchema(confluentSchema, opts)
break
}
case SchemaType.PROTOBUF: {
const opts: ProtoOptions | undefined = (options as ProtocolOptions)?.[SchemaType.PROTOBUF]
schema = new ProtoSchema(confluentSchema, opts)
break
}
to make sure that if you initialized the client with LegacyOptions those options are used when the schema is an Avro schema. Right now I think the updateOptions method is assuming that the options are always ProtocolOptions, which would break backwards compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests in SchemaRegistry.avro.spec.ts look like they are using the LegacyOptions, though, so I'm confused how this is working. I'm kinda suspecting that we end up making a frankenstein options object here by cominbing LegacyOptions with ProtocolOptions, into something like:

{
  subject: 'subject',
  [SchemaType.JSON]: {
    references: [ ... ]
  }
}

Copy link
Contributor Author

@bifrost bifrost Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good catch, I cannot find a test that uses LegacyOptions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😔 You're right. I could have sworn there were, but apparently not. You can see the difference here. It's a fairly minor difference, in that the options have just moved in one level to accommodate other schema types.

src/SchemaRegistry.ts Show resolved Hide resolved
const schemas = await this.getReferredSchemasFromReference(reference, helper, referencesSet)
referredSchemas = referredSchemas.concat(schemas)
const schemas = await this.getreferencedSchemasFromReference(reference, helper, referencesSet)
referencedSchemas = referencedSchemas.concat(schemas)
Copy link

@hilleer hilleer Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid mutation.

Suggested change
referencedSchemas = referencedSchemas.concat(schemas)
referencedSchemas.push(...schemas)

schema: ConfluentSchema,
helper: SchemaHelper,
referencesSet: Set<string>,
): Promise<ConfluentSchema[]> {
const references = schema.references || []

let referredSchemas: ConfluentSchema[] = []
let referencedSchemas: ConfluentSchema[] = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let referencedSchemas: ConfluentSchema[] = []
const referencedSchemas: ConfluentSchema[] = []

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible because referencedSchemas is updated

Copy link

@hilleer hilleer Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my related comment below - instead of re-declaring referencedSchemas and mutating it, you can simply use .push to push to the array instead.

referencedSchemas.push(...schemas);

@bifrost
Copy link
Contributor Author

bifrost commented Oct 3, 2022

Hi @Nevon
Thanks for your review.
I have fixed your review comments and added a test for LegacyOptions, please have a look.

@@ -168,19 +246,15 @@ export default class SchemaRegistry {
}

const response = await this.getSchemaOriginRequest(registryId)
const foundSchema: { schema: string; schemaType: string } = response.data()
const rawSchema = foundSchema.schema
const foundSchema = response.data() as SchemaResponse
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you actually want to cast the type? Otherwise it is better to define the type like this:

Suggested change
const foundSchema = response.data() as SchemaResponse
const foundSchema: SchemaResponse = response.data();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an actual difference? Assuming that response.data returns any or maybe unknown, do you get any additional type safety from defining the type of foundSchema vs casting the return type of response.data? I've always thought of those two scenarios as equivalent, but maybe there are scenarios in which they aren't.

Copy link

@hilleer hilleer Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess for this specific case there is no actual difference, as response.data behaves like you describe. However, if you had a different example like this:

type FooObject = {
  value: string;
  key: 'string';
};

// complains because key is not included
const foo: FooObject = {
  value: 'foo'
};

// does not complain, because we specifically cast the type and allow missing properties
const fooSecond = {
  value: 'foo'
} as FooObject

It would behave equally if the function actually returned a specific/known type - casting the type we might not notice missing properties, while the defined type : SchemaResponse would be complaining. Casting would also complain, if the types do not match at all.

While response.data returns unknown I guess you are right that there's no additional type safety between the two options.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, totally, in other cases there's of course a difference. I'm just thinking about the case where we're assigning something that's any or unknown already. One thing that does come to mind in favor of not casting is if in the future the return type of response.data changes from any to Response<T> or whatever. If we were casting it to SchemaResponse then the compiler might not let us know that the type is now incompatible with whatever foundSchema is used for, whereas if we just declare foundSchema: SchemaResponse we would definitely get a compiler error if Response<T> is not compatible with SchemaResponse, which is good.

if (referencedSchemas) {
referencedSchemas.forEach(rawSchema => {
const $schema = JSON.parse(rawSchema.schema)
// @ts-ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the compiler not liking about this? I would guess JSON.parse returns any, so I wouldn't expect the compiler to complain about you accessing $id on it. Seeing a ts-ignore makes my spider sense tingle, since there's few if any cases where it's necessary and might hide mistakes.

})
})

describe('with enum typeHook defined', () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

Copy link
Member

@Nevon Nevon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I feel more comfortable with the change, now that it's tested with both the legacy and new options API. Just curious about that ts-ignore.

@bifrost
Copy link
Contributor Author

bifrost commented Oct 4, 2022

The ts-ignore is needed because the ajv.d.ts does not define addSchema. Ajv extends Core and Core implements addSchema, that is why it works. Plese let me know if I can do a better or how to get types from Core?

Screenshot 2022-10-04 at 10 19 35

@Nevon
Copy link
Member

Nevon commented Oct 4, 2022

It's not because of Ajv, but because ajv is either an instance of AjvCore or something that implements a similar interface, and that "similar interface" is lacking addSchema. See #163 for context on why this was done.

Changing the JsonOptions type to the following should solve it:

export type JsonOptions = ConstructorParameters<typeof Ajv>[0] & {
  ajvInstance?: {
    addSchema: Ajv['addSchema']
    compile: (schema: any) => ValidateFunction
  }
  referencedSchemas?: JsonConfluentSchema[]
}

@Nevon Nevon merged commit 3f4dea4 into kafkajs:master Oct 4, 2022
@Nevon
Copy link
Member

Nevon commented Oct 4, 2022

Thank you for the contribution and sorry it took so long to upstream. This is now released in v3.3.0.

@bifrost
Copy link
Contributor Author

bifrost commented Oct 4, 2022

Thanks a lot, Nevon - it has been a pleasure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants