From d6525a3d12abd2b84434395bbf18c33599825c98 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Fri, 16 Dec 2022 04:01:08 +0000 Subject: [PATCH 01/17] chore: update repo semaphore config From 28e568b21055852712360422a35e20d989943d7c Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Fri, 16 Dec 2022 04:01:08 +0000 Subject: [PATCH 02/17] chore: update repo semaphore config From c7d4bfd30e6f52a163286d3bd65f40586a0c14e1 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Fri, 16 Dec 2022 04:01:08 +0000 Subject: [PATCH 03/17] chore: update repo semaphore config From 6fa5c7040b803e6416e2a34e90569e9c10882dde Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Fri, 16 Dec 2022 04:01:08 +0000 Subject: [PATCH 04/17] chore: update repo semaphore config From a7c1f0cccbae4c11d26e638c5b66c10b1d2957ba Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sat, 17 Dec 2022 03:59:13 +0000 Subject: [PATCH 05/17] chore: update repo semaphore config From 364154ef006a65a14cff2e93b3e6a75c3759b55f Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sat, 17 Dec 2022 03:59:13 +0000 Subject: [PATCH 06/17] chore: update repo semaphore config From bc6e2f5d5ac8b14b78263e9c10439b1172e6e655 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sat, 17 Dec 2022 03:59:13 +0000 Subject: [PATCH 07/17] chore: update repo semaphore config From 5b4ee60ae8e2d11e205b2381c5eb708515ec6862 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sat, 17 Dec 2022 03:59:13 +0000 Subject: [PATCH 08/17] chore: update repo semaphore config From d2d288c451e5526e94299ca348c0b050ed8f3243 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sun, 18 Dec 2022 04:00:00 +0000 Subject: [PATCH 09/17] chore: update repo semaphore config From 6a9e0328f07e56c417d1d7e4b807361c9003e352 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sun, 18 Dec 2022 04:00:00 +0000 Subject: [PATCH 10/17] chore: update repo semaphore config From 5beac6fdae4d5cc90c3041e0f3f87a51a40ff9cc Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sun, 18 Dec 2022 04:00:00 +0000 Subject: [PATCH 11/17] chore: update repo semaphore config From 303a0e6fd325f115d5ce98843fc9f2c4017a358e Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Sun, 18 Dec 2022 04:00:00 +0000 Subject: [PATCH 12/17] chore: update repo semaphore config From 0cf1ed2603fe59b2fa569977ecb9a3dd5e679f2c Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Mon, 19 Dec 2022 04:00:10 +0000 Subject: [PATCH 13/17] chore: update repo semaphore config From 5c16d1b574564c08d05ba2599da0db58dbd0974c Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Mon, 19 Dec 2022 04:00:10 +0000 Subject: [PATCH 14/17] chore: update repo semaphore config From cb4ae4e2f73399f82fa26e6386720b08c3842076 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Mon, 19 Dec 2022 04:00:10 +0000 Subject: [PATCH 15/17] chore: update repo semaphore config From a4be10e7e7cc250619af4eb065d2340305d0b676 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Mon, 19 Dec 2022 04:00:10 +0000 Subject: [PATCH 16/17] chore: update repo semaphore config From efe77d2afcdef9880cd6c5c986d8c1c3f42c71ce Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 8 Nov 2023 10:40:28 -0800 Subject: [PATCH 17/17] Fix JSON validation during serialization --- schemaregistry/serde/avro/avro_generic.go | 2 +- schemaregistry/serde/avro/avro_specific.go | 2 +- .../serde/jsonschema/json_schema.go | 2 +- .../serde/jsonschema/json_schema_test.go | 60 +++++++++++++++++++ schemaregistry/serde/protobuf/protobuf.go | 2 +- schemaregistry/serde/serde.go | 16 ++--- 6 files changed, 72 insertions(+), 12 deletions(-) diff --git a/schemaregistry/serde/avro/avro_generic.go b/schemaregistry/serde/avro/avro_generic.go index add2e30f2..e43d2b98a 100644 --- a/schemaregistry/serde/avro/avro_generic.go +++ b/schemaregistry/serde/avro/avro_generic.go @@ -67,7 +67,7 @@ func (s *GenericSerializer) Serialize(topic string, msg interface{}) ([]byte, er info := schemaregistry.SchemaInfo{ Schema: avroType.String(), } - id, err := s.GetID(topic, msg, info) + id, err := s.GetID(topic, msg, &info) if err != nil { return nil, err } diff --git a/schemaregistry/serde/avro/avro_specific.go b/schemaregistry/serde/avro/avro_specific.go index 5304d43c3..7d35be1ca 100644 --- a/schemaregistry/serde/avro/avro_specific.go +++ b/schemaregistry/serde/avro/avro_specific.go @@ -76,7 +76,7 @@ func (s *SpecificSerializer) Serialize(topic string, msg interface{}) ([]byte, e info := schemaregistry.SchemaInfo{ Schema: avroMsg.Schema(), } - id, err := s.GetID(topic, avroMsg, info) + id, err := s.GetID(topic, avroMsg, &info) if err != nil { return nil, err } diff --git a/schemaregistry/serde/jsonschema/json_schema.go b/schemaregistry/serde/jsonschema/json_schema.go index 4d60e6a77..08c6b2ede 100644 --- a/schemaregistry/serde/jsonschema/json_schema.go +++ b/schemaregistry/serde/jsonschema/json_schema.go @@ -68,7 +68,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { Schema: string(raw), SchemaType: "JSON", } - id, err := s.GetID(topic, msg, info) + id, err := s.GetID(topic, msg, &info) if err != nil { return nil, err } diff --git a/schemaregistry/serde/jsonschema/json_schema_test.go b/schemaregistry/serde/jsonschema/json_schema_test.go index 3f32a8b5c..ae78dd16c 100644 --- a/schemaregistry/serde/jsonschema/json_schema_test.go +++ b/schemaregistry/serde/jsonschema/json_schema_test.go @@ -17,6 +17,9 @@ package jsonschema import ( + "encoding/json" + "github.com/invopop/jsonschema" + "strings" "testing" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" @@ -85,6 +88,63 @@ func TestJSONSchemaSerdeWithNested(t *testing.T) { serde.MaybeFail("deserialization", err, serde.Expect(newobj, obj)) } +func TestFailingJSONSchemaValidationWithSimple(t *testing.T) { + serde.MaybeFail = serde.InitFailFunc(t) + var err error + conf := schemaregistry.NewConfig("mock://") + + client, err := schemaregistry.NewClient(conf) + serde.MaybeFail("Schema Registry configuration", err) + + serConfig := NewSerializerConfig() + serConfig.EnableValidation = true + // We don't want to risk registering one instead of using the already registered one + serConfig.AutoRegisterSchemas = false + serConfig.UseLatestVersion = true + ser, err := NewSerializer(client, serde.ValueSerde, serConfig) + serde.MaybeFail("Serializer configuration", err) + + obj := JSONDemoSchema{} + jschema := jsonschema.Reflect(obj) + raw, err := json.Marshal(jschema) + serde.MaybeFail("Schema marshalling", err) + info := schemaregistry.SchemaInfo{ + Schema: string(raw), + SchemaType: "JSON", + } + + id, err := client.Register("topic1-value", info, false) + serde.MaybeFail("Schema registration", err) + if id <= 0 { + t.Errorf("Expected valid schema id, found %d", id) + } + + _, err = ser.Serialize("topic1", &obj) + if err != nil { + t.Errorf("Expected no validation error, found %s", err) + } + + diffObj := DifferentJSONDemoSchema{} + _, err = ser.Serialize("topic1", &diffObj) + if err == nil || !strings.Contains(err.Error(), "jsonschema") { + t.Errorf("Expected validation error, found %s", err) + } +} + +type DifferentJSONDemoSchema struct { + IntField int32 `json:"IntField"` + + ExtraStringField string `json:"ExtraStringField"` + + DoubleField float64 `json:"DoubleField"` + + StringField string `json:"StringField"` + + BoolFieldThatsActuallyString string `json:"BoolField"` + + BytesField test.Bytes `json:"BytesField"` +} + type JSONDemoSchema struct { IntField int32 `json:"IntField"` diff --git a/schemaregistry/serde/protobuf/protobuf.go b/schemaregistry/serde/protobuf/protobuf.go index 23a89038e..d690cd5d9 100644 --- a/schemaregistry/serde/protobuf/protobuf.go +++ b/schemaregistry/serde/protobuf/protobuf.go @@ -177,7 +177,7 @@ func (s *Serializer) Serialize(topic string, msg interface{}) ([]byte, error) { SchemaType: metadata.SchemaType, References: metadata.References, } - id, err := s.GetID(topic, protoMsg, info) + id, err := s.GetID(topic, protoMsg, &info) if err != nil { return nil, err } diff --git a/schemaregistry/serde/serde.go b/schemaregistry/serde/serde.go index a57c4db5f..7d8436ed6 100644 --- a/schemaregistry/serde/serde.go +++ b/schemaregistry/serde/serde.go @@ -128,28 +128,28 @@ func TopicNameStrategy(topic string, serdeType Type, schema schemaregistry.Schem } // GetID returns a schema ID for the given schema -func (s *BaseSerializer) GetID(topic string, msg interface{}, info schemaregistry.SchemaInfo) (int, error) { +func (s *BaseSerializer) GetID(topic string, msg interface{}, info *schemaregistry.SchemaInfo) (int, error) { autoRegister := s.Conf.AutoRegisterSchemas useSchemaID := s.Conf.UseSchemaID useLatest := s.Conf.UseLatestVersion normalizeSchema := s.Conf.NormalizeSchemas var id = -1 - subject, err := s.SubjectNameStrategy(topic, s.SerdeType, info) + subject, err := s.SubjectNameStrategy(topic, s.SerdeType, *info) if err != nil { return -1, err } if autoRegister { - id, err = s.Client.Register(subject, info, normalizeSchema) + id, err = s.Client.Register(subject, *info, normalizeSchema) if err != nil { return -1, err } } else if useSchemaID >= 0 { - info, err = s.Client.GetBySubjectAndID(subject, useSchemaID) + *info, err = s.Client.GetBySubjectAndID(subject, useSchemaID) if err != nil { return -1, err } - id, err = s.Client.GetID(subject, info, false) + id, err = s.Client.GetID(subject, *info, false) if err != nil { return -1, err } @@ -161,17 +161,17 @@ func (s *BaseSerializer) GetID(topic string, msg interface{}, info schemaregistr if err != nil { return -1, err } - info = schemaregistry.SchemaInfo{ + *info = schemaregistry.SchemaInfo{ Schema: metadata.Schema, SchemaType: metadata.SchemaType, References: metadata.References, } - id, err = s.Client.GetID(subject, info, false) + id, err = s.Client.GetID(subject, *info, false) if err != nil { return -1, err } } else { - id, err = s.Client.GetID(subject, info, normalizeSchema) + id, err = s.Client.GetID(subject, *info, normalizeSchema) if err != nil { return -1, err }