Skip to content

Commit

Permalink
refactor: update QTT framework to use JsonSchemaConverter
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed May 19, 2022
1 parent 83ed9ca commit 31f4bab
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.test.serde.json;

import io.confluent.connect.json.JsonSchemaConverter;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.ksql.serde.json.JsonSchemaTranslator;
import io.confluent.ksql.test.serde.ConnectSerdeSupplier;
import org.apache.kafka.connect.data.Schema;

public class ValueSpecJsonSchemaSerdeSupplier extends ConnectSerdeSupplier<JsonSchema> {
private final JsonSchemaTranslator schemaTranslator;

public ValueSpecJsonSchemaSerdeSupplier() {
super(JsonSchemaConverter::new);
this.schemaTranslator = new JsonSchemaTranslator();
}

@Override
protected Schema fromParsedSchema(final JsonSchema schema) {
return schemaTranslator.toConnectSchema(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.ksql.serde.protobuf.ProtobufProperties;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.serde.avro.ValueSpecAvroSerdeSupplier;
import io.confluent.ksql.test.serde.json.ValueSpecJsonSchemaSerdeSupplier;
import io.confluent.ksql.test.serde.json.ValueSpecJsonSerdeSupplier;
import io.confluent.ksql.test.serde.kafka.KafkaSerdeSupplier;
import io.confluent.ksql.test.serde.none.NoneSerdeSupplier;
Expand Down Expand Up @@ -82,7 +83,7 @@ public static SerdeSupplier<?> getSerdeSupplier(
case ProtobufNoSRFormat.NAME:
return new ValueSpecProtobufNoSRSerdeSupplier(schema, formatInfo.getProperties());
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false, properties);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true, properties);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSchemaSerdeSupplier();
case DelimitedFormat.NAME: return new StringSerdeSupplier();
case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema);
case NoneFormat.NAME: return new NoneSerdeSupplier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Translates between Connect and JSON Schema Registry schema types.
*/
class JsonSchemaTranslator implements ConnectSchemaTranslator {
public class JsonSchemaTranslator implements ConnectSchemaTranslator {

private final JsonSchemaData jsonData = new JsonSchemaData(new JsonSchemaDataConfig(
ImmutableMap.of(
Expand Down

0 comments on commit 31f4bab

Please sign in to comment.