From d1f9ecf3f76ca352a8e3e38a1ee5a588505028e8 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Fri, 10 Feb 2023 17:10:23 +0100 Subject: [PATCH] feat: allow to configure value schema using Avro --- checkstyle/suppressions.xml | 4 + .../connect/filepulse/data/StructSchema.java | 11 +- .../connect/filepulse/internal/Pair.java | 62 +++++++ .../filepulse/schema/SchemaMerger.java | 15 +- .../source/internal/ConnectSchemaMapper.java | 2 +- connect-file-pulse-dataformat/pom.xml | 10 ++ .../filepulse/avro/AvroSchemaConverter.java | 78 +++++++++ .../avro/UnsupportedAvroTypeException.java | 29 ++++ .../AbstracConnectSchemaConverter.java | 104 ++++++++++++ .../avro/internal/ArraySchemaConverter.java | 47 ++++++ .../avro/internal/BytesSchemaConverter.java | 81 +++++++++ .../avro/internal/ConnectSchemaConverter.java | 101 ++++++++++++ .../internal/ConnectSchemaConverters.java | 80 +++++++++ .../avro/internal/CyclicSchemaWrapper.java | 120 ++++++++++++++ .../avro/internal/FixedSchemaConverter.java | 43 +++++ .../avro/internal/IntSchemaConverter.java | 58 +++++++ .../avro/internal/LongSchemaConverter.java | 50 ++++++ .../avro/internal/MapSchemaConverter.java | 52 ++++++ .../avro/internal/RecordSchemaConverter.java | 73 ++++++++ .../avro/internal/UnionSchemaConverter.java | 57 +++++++ .../filepulse/config/ConfigSchema.java | 152 +++++++++++++++++ .../avro/AvroSchemaConverterTest.java | 42 +++++ .../src/test/resources/datasets/circular.avsc | 43 +++++ .../filepulse/config/CommonSourceConfig.java | 156 ++++++------------ .../filepulse/config/ConnectSchemaType.java | 35 ++++ 25 files changed, 1388 insertions(+), 117 deletions(-) create mode 100644 connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/Pair.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/UnsupportedAvroTypeException.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/AbstracConnectSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ArraySchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/BytesSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverters.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/CyclicSchemaWrapper.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/FixedSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/IntSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/LongSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/MapSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/RecordSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/UnionSchemaConverter.java create mode 100644 connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConfigSchema.java create mode 100644 connect-file-pulse-dataformat/src/test/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverterTest.java create mode 100644 connect-file-pulse-dataformat/src/test/resources/datasets/circular.avsc create mode 100644 connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConnectSchemaType.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7657f5bf0..87189082e 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -51,6 +51,7 @@ + @@ -65,4 +66,7 @@ + + + \ No newline at end of file diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java index 4f501a65b..c7c5ede8c 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Objects; import java.util.function.BiFunction; -import java.util.stream.Collectors; public class StructSchema implements Schema, Iterable { @@ -107,7 +106,9 @@ public TypedField field(final String fieldName) { } public List fields() { - return new ArrayList<>(fields.values()); + ArrayList ordered = new ArrayList<>(fields.values()); + ordered.sort(Comparator.comparing(TypedField::name)); + return ordered; } void set(final String fieldName, final Schema fieldSchema) { @@ -151,11 +152,7 @@ TypedField remove(final String fieldName) { */ @Override public Iterator iterator() { - return this.fields.values() - .stream() - .sorted(Comparator.comparing(TypedField::name)) - .collect(Collectors.toUnmodifiableList()) - .iterator(); + return fields().iterator(); } /** diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/Pair.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/Pair.java new file mode 100644 index 000000000..0d91a3b62 --- /dev/null +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/Pair.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.internal; +import java.util.Objects; + +public class Pair { + private final K key; + private final V value; + + public Pair(K key, V value) { + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + /** {@inheritDoc} **/ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Pair pair = (Pair) o; + return Objects.equals(key, pair.key) && Objects.equals(value, pair.value); + } + + /** {@inheritDoc} **/ + @Override + public int hashCode() { + return Objects.hash(key, value); + } + + /** {@inheritDoc} **/ + @Override + public String toString() { + return "Pair{" + + "key=" + key + + ", value=" + value + + '}'; + } +} \ No newline at end of file diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/schema/SchemaMerger.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/schema/SchemaMerger.java index e7160797b..96e0efd49 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/schema/SchemaMerger.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/schema/SchemaMerger.java @@ -135,8 +135,10 @@ private static Schema mergeStruct(final Schema left, final Schema right, final SchemaContext context) { - if (!Objects.equals(left.name(), right.name())) - throw new DataException("Cannot merge two schemas wih different name " + left.name() + "<>" + right.name()); + if (left.name() != null && right.name() != null) { + if (!Objects.equals(left.name(), right.name())) + throw new DataException("Cannot merge two schemas wih different name " + left.name() + "<>" + right.name()); + } final SchemaBuilder merged = mergeMetadata(left, right, new SchemaBuilder(Type.STRUCT)); @@ -169,10 +171,11 @@ private static Schema mergeStruct(final Schema left, // Remaining fields only exist on LEFT schema. fieldSchemas.putAll(remaining); - // Fields should be added ordered by name to make schema merge operation as idempotent as possible. + // Fields should be added ordered by name to make + // schema merge operation as idempotent as possible. fieldSchemas.entrySet().stream() .sorted(Map.Entry.comparingByKey()) - .forEach(it -> merged.field(it.getKey(), context.buildSchemaWithCyclicSchemaWrapper(it.getValue()))); + .forEachOrdered(it -> merged.field(it.getKey(), context.buildSchemaWithCyclicSchemaWrapper(it.getValue()))); return context.buildSchemaWithCyclicSchemaWrapper(merged.build()); } @@ -181,8 +184,8 @@ private static SchemaBuilder mergeMetadata(final Schema left, final Schema right, final SchemaBuilder merged) { - merged.name(left.name()); - merged.doc(left.doc()); + merged.name(left.name() != null ? left.name() : right.name()); + merged.doc(left.doc() != null ? left.doc() : right.doc()); if (left.isOptional() || right.isOptional()) { merged.optional(); diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java index e582bcc96..a53428811 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java @@ -217,8 +217,8 @@ public SchemaAndValue map(final Schema connectSchema, final TypedStruct value) { private static Struct toConnectStruct(final Schema connectSchema, final TypedStruct struct) { final Struct connectStruct = new Struct(connectSchema); - for (Field connectField : connectSchema.fields()) { + for (Field connectField : connectSchema.fields()) { final String recordName = connectSchema.name(); final String fieldName = connectField.name(); diff --git a/connect-file-pulse-dataformat/pom.xml b/connect-file-pulse-dataformat/pom.xml index 83d614db0..28a77d66d 100644 --- a/connect-file-pulse-dataformat/pom.xml +++ b/connect-file-pulse-dataformat/pom.xml @@ -26,13 +26,23 @@ kafka-connect-filepulse-api ${project.version} + + org.apache.kafka + kafka-clients + provided + org.apache.kafka connect-api + provided com.jsoniter jsoniter + + org.apache.avro + avro + \ No newline at end of file diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverter.java new file mode 100644 index 000000000..ed0334d7b --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro; + +import io.streamthoughts.kafka.connect.filepulse.avro.internal.ConnectSchemaConverter; +import io.streamthoughts.kafka.connect.filepulse.avro.internal.ConnectSchemaConverters; +import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils; +import org.apache.kafka.connect.data.Schema; + +import java.util.HashMap; +import java.util.Map; + +/** + * Utilities for converting between Connect runtime data format and Avro. + */ +public final class AvroSchemaConverter { + + private final Map toConnectSchemaCache; + + /** + * Creates a new {@link AvroSchemaConverter} instance. + */ + public AvroSchemaConverter() { + toConnectSchemaCache = new HashMap<>(); + } + + /** + * Convert the given {@link org.apache.avro.Schema} into connect one. + * + * @param schema the string avro schema to be converted. + * @return {@link Schema}. + */ + public Schema toConnectSchema(final String schema) { + if (StringUtils.isBlank(schema)) { + return null; + } + org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); + return toConnectSchema(parser.parse(schema)); + } + + /** + * Convert the given {@link org.apache.avro.Schema} into connect one. + * + * @param schema the avro schema to be converted. + * @return {@link Schema}. + */ + public Schema toConnectSchema(final org.apache.avro.Schema schema) { + Schema cachedSchema = toConnectSchemaCache.get(schema); + if (cachedSchema != null) { + return cachedSchema; + } + + ConnectSchemaConverter converter = ConnectSchemaConverters.forType(schema.getType()); + org.apache.kafka.connect.data.Schema resultSchema = converter.toConnectSchema( + schema, + new ConnectSchemaConverter.Options().forceOptional(false), + new ConnectSchemaConverter.CyclicContext() + ); + toConnectSchemaCache.put(schema, resultSchema); + return resultSchema; + } +} \ No newline at end of file diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/UnsupportedAvroTypeException.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/UnsupportedAvroTypeException.java new file mode 100644 index 000000000..3a2484657 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/UnsupportedAvroTypeException.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro; + +import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException; + +public class UnsupportedAvroTypeException extends ConnectFilePulseException { + + public UnsupportedAvroTypeException(final String message) { + super(message); + } + +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/AbstracConnectSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/AbstracConnectSchemaConverter.java new file mode 100644 index 000000000..8bd7b509a --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/AbstracConnectSchemaConverter.java @@ -0,0 +1,104 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.DataException; + +import java.util.Map; + +public abstract class AbstracConnectSchemaConverter implements ConnectSchemaConverter { + + public static final String NAMESPACE = "io.streamthoughts.connect.avro"; + + public static final String DEFAULT_SCHEMA_NAME = "ConnectDefault"; + + public static final String DEFAULT_SCHEMA_FULL_NAME = NAMESPACE + "." + DEFAULT_SCHEMA_NAME; + + static final String AVRO_PROP = "avro"; + + protected void addSchemaMetadata(Schema schema, + Options options, + SchemaBuilder builder) { + // Doc + String docVal = options.docDefaultValue() != null ? options.docDefaultValue() : schema.getDoc(); + if (docVal != null) { + builder.doc(docVal); + } + + // Parameters + for (Map.Entry entry : schema.getObjectProps().entrySet()) { + if (entry.getKey().startsWith(AVRO_PROP)) { + builder.parameter(entry.getKey(), entry.getValue().toString()); + } + } + + // Default + if (options.fieldDefaultValue() != null) { + builder.defaultValue(options.fieldDefaultValue()); + } + + String name = null; + if (schema.getType() == Schema.Type.RECORD + || schema.getType() == Schema.Type.ENUM + || schema.getType() == Schema.Type.FIXED) { + name = schema.getName(); + } + + if (name != null && !name.startsWith(DEFAULT_SCHEMA_FULL_NAME)) { + if (builder.name() != null) { + if (!name.equals(builder.name())) { + throw new DataException("Mismatched names: name already added to SchemaBuilder (" + + builder.name() + + ") differs from name in source schema (" + + name + ")"); + } + } else { + builder.name(name); + } + } + + if (options.forceOptional()) { + builder.optional(); + } + } + + protected org.apache.kafka.connect.data.Schema toConnectSchemaWithCycles(Schema schema, + Options options, + CyclicContext context) { + org.apache.kafka.connect.data.Schema resolvedSchema; + if (context.cycleReferences().containsKey(schema)) { + context.detectedCycles().add(schema); + resolvedSchema = cyclicSchemaWrapper(context.cycleReferences(), schema, options.forceOptional()); + } else { + ConnectSchemaConverter converter = ConnectSchemaConverters.forType(schema.getType()); + resolvedSchema = converter.toConnectSchema(schema, options, context); + } + return resolvedSchema; + } + + private CyclicSchemaWrapper cyclicSchemaWrapper( + Map toConnectCycles, + org.apache.avro.Schema memberSchema, + boolean optional) { + return new CyclicSchemaWrapper(toConnectCycles.get(memberSchema).schema(), optional); + } + +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ArraySchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ArraySchemaConverter.java new file mode 100644 index 000000000..1742f5111 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ArraySchemaConverter.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +public final class ArraySchemaConverter extends AbstracConnectSchemaConverter { + + ArraySchemaConverter(){} + + /** {@inheritDoc} **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + final SchemaBuilder builder; + + Schema elementType = schema.getElementType(); + + org.apache.kafka.connect.data.Schema avroElementSchema = toConnectSchemaWithCycles( + elementType, + options, + context + ); + builder = SchemaBuilder.array(avroElementSchema); + addSchemaMetadata(schema, options, builder); + + return builder; + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/BytesSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/BytesSchemaConverter.java new file mode 100644 index 000000000..5e6cd7a8a --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/BytesSchemaConverter.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.DataException; + +public final class BytesSchemaConverter extends AbstracConnectSchemaConverter { + + static final String AVRO_LOGICAL_DECIMAL = "decimal"; + static final String AVRO_LOGICAL_DECIMAL_SCALE_PROP = "scale"; + static final String AVRO_LOGICAL_DECIMAL_PRECISION_PROP = "precision"; + static final String AVRO_LOGICAL_TYPE_PROP = "logicalType"; + static final Integer CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT = 64; + + static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP = "connect.decimal.precision"; + + BytesSchemaConverter() {} + + /** {@inheritDoc} **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP); + + final SchemaBuilder builder; + if (AVRO_LOGICAL_DECIMAL.equalsIgnoreCase(logicalType)) { + builder = schemaBuilderForLogicalDecimal(schema); + } else { + builder = SchemaBuilder.bytes(); + } + + addSchemaMetadata(schema, options, builder); + + return builder; + } + + private static SchemaBuilder schemaBuilderForLogicalDecimal(org.apache.avro.Schema schema) { + final SchemaBuilder builder; + Object scaleNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_SCALE_PROP); + // In Avro: scale, a JSON integer representing the scale. If not specified the scale is 0. + int scale = scaleNode instanceof Number ? ((Number) scaleNode).intValue() : 0; + builder = Decimal.builder(scale); + + Object precisionNode = schema.getObjectProp(AVRO_LOGICAL_DECIMAL_PRECISION_PROP); + if (null != precisionNode) { + // In Avro: precision, a JSON integer representing the (maximum) precision + // of decimals stored in this type (required). + if (!(precisionNode instanceof Number)) { + throw new DataException(AVRO_LOGICAL_DECIMAL_PRECISION_PROP + + " property must be a JSON Integer." + + " https://avro.apache.org/docs/1.11.1/specification/#decimal"); + } + // Capture the precision as a parameter only if it is not the default + int precision = ((Number) precisionNode).intValue(); + if (precision != CONNECT_AVRO_DECIMAL_PRECISION_DEFAULT) { + builder.parameter(CONNECT_AVRO_DECIMAL_PRECISION_PROP, String.valueOf(precision)); + } + } + return builder; + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverter.java new file mode 100644 index 000000000..7018b77e1 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverter.java @@ -0,0 +1,101 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; + +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; + +public interface ConnectSchemaConverter { + + org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context); + + /** + * Class that holds the options for performing conversion. + */ + class Options { + private final boolean forceOptional; + private final Object fieldDefaultValue; + private final String docDefaultValue; + + public Options() { + this(false, null, null); + } + + public Options(boolean forceOptional, Object fieldDefaultValue, String docDefaultValue) { + this.forceOptional = forceOptional; + this.fieldDefaultValue = fieldDefaultValue; + this.docDefaultValue = docDefaultValue; + } + + public boolean forceOptional() { + return forceOptional; + } + + public Options forceOptional(boolean forceOptional) { + return new Options(forceOptional, fieldDefaultValue, docDefaultValue); + } + + public Options fieldDefaultValue(Object fieldDefaultValue) { + return new Options(forceOptional, fieldDefaultValue, docDefaultValue); + } + + public Object fieldDefaultValue() { + return fieldDefaultValue; + } + + public Options docDefaultValue(String docDefaultValue) { + return new Options(forceOptional, fieldDefaultValue, docDefaultValue); + } + + public String docDefaultValue() { + return this.docDefaultValue; + } + } + + /** + * Class that holds the context for performing conversion. + */ + class CyclicContext { + private final Map cycleReferences; + private final Set detectedCycles; + + /** + * cycleReferences - map that holds connect Schema references to resolve cycles + * detectedCycles - avro schemas that have been detected to have cycles + */ + public CyclicContext() { + this.cycleReferences = new IdentityHashMap<>(); + this.detectedCycles = new HashSet<>(); + } + + public Map cycleReferences() { + return cycleReferences; + } + + public Set detectedCycles() { + return detectedCycles; + } + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverters.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverters.java new file mode 100644 index 000000000..e8ef977cd --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/ConnectSchemaConverters.java @@ -0,0 +1,80 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import io.streamthoughts.kafka.connect.filepulse.avro.UnsupportedAvroTypeException; +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.DataException; + +import java.util.EnumMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public final class ConnectSchemaConverters { + + private static final Map CONVERTERS = + new EnumMap<>(Schema.Type.class); + + static { + CONVERTERS.put(Schema.Type.BYTES, new BytesSchemaConverter()); + CONVERTERS.put(Schema.Type.FIXED, new FixedSchemaConverter()); + CONVERTERS.put(Schema.Type.ARRAY, new ArraySchemaConverter()); + CONVERTERS.put(Schema.Type.MAP, new MapSchemaConverter()); + CONVERTERS.put(Schema.Type.RECORD, new RecordSchemaConverter()); + CONVERTERS.put(Schema.Type.UNION, new UnionSchemaConverter()); + CONVERTERS.put(Schema.Type.LONG, new LongSchemaConverter()); + CONVERTERS.put(Schema.Type.INT, new IntSchemaConverter()); + CONVERTERS.put(Schema.Type.DOUBLE, connectSchemaConverter(SchemaBuilder::float64)); + CONVERTERS.put(Schema.Type.FLOAT,connectSchemaConverter(SchemaBuilder::float32)); + CONVERTERS.put(Schema.Type.BOOLEAN,connectSchemaConverter(SchemaBuilder::bool)); + CONVERTERS.put(Schema.Type.ENUM,connectSchemaConverter(SchemaBuilder::string)); + CONVERTERS.put(Schema.Type.STRING,connectSchemaConverter(SchemaBuilder::string)); + CONVERTERS.put(Schema.Type.NULL, + (schema, options, context) -> { + throw new DataException("Standalone null schemas are not supported."); + }); + } + + private static AbstracConnectSchemaConverter connectSchemaConverter(Supplier supplier) { + return new AbstracConnectSchemaConverter() { + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + SchemaBuilder builder = supplier.get(); + addSchemaMetadata(schema, options, builder); + return builder; + } + }; + } + + public static ConnectSchemaConverter forType(final Schema.Type type) { + ConnectSchemaConverter converter = CONVERTERS.get(type); + return Optional.ofNullable(converter) + .orElseThrow( + () -> + new UnsupportedAvroTypeException( + "Cannot convert to connect schema. to Avro data, type is not" + + " supported '" + + type + + "'")); + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/CyclicSchemaWrapper.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/CyclicSchemaWrapper.java new file mode 100644 index 000000000..ac671c129 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/CyclicSchemaWrapper.java @@ -0,0 +1,120 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public final class CyclicSchemaWrapper implements Schema { + + private final Schema schema; + private final boolean optional; + + public CyclicSchemaWrapper(Schema schema) { + this(schema, schema.isOptional()); + } + + public CyclicSchemaWrapper(Schema schema, boolean optional) { + this.schema = schema; + this.optional = optional; + } + + @Override + public Type type() { + return schema.type(); + } + + @Override + public boolean isOptional() { + return optional; + } + + @Override + public Object defaultValue() { + return schema.defaultValue(); + } + + @Override + public String name() { + return schema.name(); + } + + @Override + public Integer version() { + return schema.version(); + } + + @Override + public String doc() { + return schema.doc(); + } + + @Override + public Map parameters() { + return schema.parameters(); + } + + @Override + public Schema keySchema() { + return schema.keySchema(); + } + + @Override + public Schema valueSchema() { + return schema.valueSchema(); + } + + @Override + public List fields() { + return schema.fields(); + } + + @Override + public Field field(String s) { + return schema.field(s); + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CyclicSchemaWrapper other = (CyclicSchemaWrapper) o; + return Objects.equals(optional, other.optional) && Objects.equals(schema, other.schema); + } + + @Override + public int hashCode() { + return Objects.hashCode(optional) + Objects.hashCode(schema); + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/FixedSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/FixedSchemaConverter.java new file mode 100644 index 000000000..c5b56f547 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/FixedSchemaConverter.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +public final class FixedSchemaConverter extends AbstracConnectSchemaConverter { + + static final String CONNECT_AVRO_FIXED_SIZE_PROP = "connect.fixed.size"; + + FixedSchemaConverter() {} + + /** {@inheritDoc} **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + + final SchemaBuilder builder = SchemaBuilder.bytes(); + builder.parameter(CONNECT_AVRO_FIXED_SIZE_PROP, String.valueOf(schema.getFixedSize())); + + addSchemaMetadata(schema, options, builder); + + return builder; + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/IntSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/IntSchemaConverter.java new file mode 100644 index 000000000..90403b57d --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/IntSchemaConverter.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Time; + +public final class IntSchemaConverter extends AbstracConnectSchemaConverter { + static final String AVRO_LOGICAL_TYPE_PROP = "logicalType"; + + static final String AVRO_LOGICAL_TIME_MILLIS = "time-millis"; + static final String AVRO_LOGICAL_DATE = "date"; + + IntSchemaConverter() {} + + /** {@inheritDoc} **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + final SchemaBuilder builder; + + String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP); + if (logicalType == null) { + builder = SchemaBuilder.int32(); + } else { + if (AVRO_LOGICAL_DATE.equalsIgnoreCase(logicalType)) { + builder = Date.builder(); + } else if (AVRO_LOGICAL_TIME_MILLIS.equalsIgnoreCase(logicalType)) { + builder = Time.builder(); + } else { + builder = SchemaBuilder.int32(); + } + } + + addSchemaMetadata(schema, options, builder); + + return builder; + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/LongSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/LongSchemaConverter.java new file mode 100644 index 000000000..de80ec0b8 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/LongSchemaConverter.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Timestamp; + +public final class LongSchemaConverter extends AbstracConnectSchemaConverter { + + static final String AVRO_LOGICAL_TYPE_PROP = "logicalType"; + static final String AVRO_LOGICAL_TIMESTAMP_MILLIS = "timestamp-millis"; + + LongSchemaConverter() {} + + /** {@inheritDoc} **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + final SchemaBuilder builder; + + String logicalType = schema.getProp(AVRO_LOGICAL_TYPE_PROP); + if (AVRO_LOGICAL_TIMESTAMP_MILLIS.equalsIgnoreCase(logicalType)) { + builder = Timestamp.builder(); + } else { + builder = SchemaBuilder.int64(); + } + + addSchemaMetadata(schema, options, builder); + + return builder; + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/MapSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/MapSchemaConverter.java new file mode 100644 index 000000000..58d100928 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/MapSchemaConverter.java @@ -0,0 +1,52 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; + +public final class MapSchemaConverter extends AbstracConnectSchemaConverter { + + MapSchemaConverter() {} + + /** + * {@inheritDoc} + **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + + Schema valueSchema = schema.getValueType(); + + org.apache.kafka.connect.data.Schema avroValueSchema = toConnectSchemaWithCycles( + valueSchema, + options, + context + ); + + SchemaBuilder builder = SchemaBuilder.map(STRING_SCHEMA, avroValueSchema); + + addSchemaMetadata(schema, options, builder); + + return builder; + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/RecordSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/RecordSchemaConverter.java new file mode 100644 index 000000000..85a5e47e2 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/RecordSchemaConverter.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RecordSchemaConverter extends AbstracConnectSchemaConverter { + + private static final Logger LOG = LoggerFactory.getLogger(RecordSchemaConverter.class); + + RecordSchemaConverter() {} + + /** + * {@inheritDoc} + **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + final SchemaBuilder builder; + builder = SchemaBuilder.struct(); + context.cycleReferences().put(schema, new CyclicSchemaWrapper(builder)); + + for (org.apache.avro.Schema.Field field : schema.getFields()) { + Object defaultVal = null; + try { + defaultVal = field.defaultVal(); + } catch (Exception e) { + LOG.warn("Ignoring invalid default for field " + field, e); + } + + Options fieldOptions = options + .forceOptional(false) + .fieldDefaultValue(defaultVal); + + org.apache.kafka.connect.data.Schema fieldSchema = toConnectSchemaWithCycles( + field.schema(), + fieldOptions, + context + ); + builder.field(field.name(), fieldSchema); + } + + addSchemaMetadata(schema, options, builder); + + if (!context.detectedCycles().contains(schema) + && context.cycleReferences().containsKey(schema)) { + context.cycleReferences().remove(schema); + } + + return builder; + + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/UnionSchemaConverter.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/UnionSchemaConverter.java new file mode 100644 index 000000000..805542bba --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/avro/internal/UnionSchemaConverter.java @@ -0,0 +1,57 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro.internal; + +import org.apache.avro.Schema; +import org.apache.kafka.connect.errors.DataException; + +public final class UnionSchemaConverter extends AbstracConnectSchemaConverter { + + private static final org.apache.avro.Schema NULL_AVRO_SCHEMA = + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL); + + UnionSchemaConverter() {} + + /** + * {@inheritDoc} + **/ + @Override + public org.apache.kafka.connect.data.Schema toConnectSchema(Schema schema, + Options options, + CyclicContext context) { + // handle NULLABLE field with default to null + if (schema.getTypes().size() == 2) { + if (schema.getTypes().contains(NULL_AVRO_SCHEMA)) { + for (org.apache.avro.Schema memberSchema : schema.getTypes()) { + if (!memberSchema.equals(NULL_AVRO_SCHEMA)) { + Options memberOptions = options + .forceOptional(true) + .fieldDefaultValue(null); + return toConnectSchemaWithCycles( + memberSchema, + memberOptions, + context + ); + } + } + } + } + throw new DataException("Couldn't translate multiple union type."); + } +} diff --git a/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConfigSchema.java b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConfigSchema.java new file mode 100644 index 000000000..ca966b6a2 --- /dev/null +++ b/connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConfigSchema.java @@ -0,0 +1,152 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.config; + +import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Supplier; + +public final class ConfigSchema implements Supplier { + + public static ConfigSchema fromConnectSchema(Schema schema) { + Schema.Type type = schema.type(); + + ConfigSchema configSchema = new ConfigSchema(); + configSchema.type = type; + configSchema.name = schema.name(); + configSchema.doc = schema.doc(); + configSchema.defaultValue = schema.defaultValue(); + configSchema.parameters = schema.parameters(); + configSchema.isOptional = schema.isOptional(); + switch (type) { + case MAP: + configSchema.keySchema = fromConnectSchema(schema.keySchema()); + configSchema.valueSchema = fromConnectSchema(schema.valueSchema()); + break; + case ARRAY: + configSchema.valueSchema = fromConnectSchema(schema.valueSchema()); + break; + case STRUCT: + configSchema.fieldSchemas = new HashMap<>(); + schema.fields().stream() + .sorted(Comparator.comparing(Field::index)) + .forEachOrdered(f -> configSchema.fieldSchemas.put(f.name(), fromConnectSchema(f.schema()))); + break; + default: + break; + } + return configSchema; + } + + public Schema.Type type; + public boolean isOptional; + + public String name; + + public Integer version; + public Object defaultValue; + public String doc; + public Map parameters; + public ConfigSchema keySchema; + public ConfigSchema valueSchema; + public Map fieldSchemas; + + /** + * {@inheritDoc} + **/ + @Override + public Schema get() { + final SchemaBuilder builder; + switch (this.type) { + case MAP: + Objects.requireNonNull(keySchema, "keySchema cannot be null."); + Objects.requireNonNull(valueSchema, "valueSchema cannot be null."); + builder = SchemaBuilder.map(keySchema.get(), valueSchema.get()); + break; + case ARRAY: + Objects.requireNonNull(valueSchema, "valueSchema cannot be null."); + builder = SchemaBuilder.array(valueSchema.get()); + break; + default: + builder = SchemaBuilder.type(type); + break; + } + + if (Schema.Type.STRUCT == type) { + fieldSchemas.entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .forEachOrdered(it -> builder.field(it.getKey(), it.getValue().get())); + } + + if (StringUtils.isNotBlank(name)) + builder.name(name); + + if (StringUtils.isNotBlank(doc)) + builder.doc(doc); + + if (null != defaultValue) { + Object value; + switch (type) { + case INT8: + value = ((Number) defaultValue).byteValue(); + break; + case INT16: + value = ((Number) defaultValue).shortValue(); + break; + case INT32: + value = ((Number) defaultValue).intValue(); + break; + case INT64: + value = ((Number) defaultValue).longValue(); + break; + case FLOAT32: + value = ((Number) defaultValue).floatValue(); + break; + case FLOAT64: + value = ((Number) defaultValue).doubleValue(); + break; + default: + value = defaultValue; + break; + } + builder.defaultValue(value); + } + + if (null != parameters) { + builder.parameters(parameters); + } + + if (isOptional) { + builder.optional(); + } + + if (null != version) { + builder.version(version); + } + return builder.build(); + } +} diff --git a/connect-file-pulse-dataformat/src/test/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverterTest.java b/connect-file-pulse-dataformat/src/test/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverterTest.java new file mode 100644 index 000000000..d82350cfa --- /dev/null +++ b/connect-file-pulse-dataformat/src/test/java/io/streamthoughts/kafka/connect/filepulse/avro/AvroSchemaConverterTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.avro; + +import org.apache.kafka.connect.data.Schema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; + +class AvroSchemaConverterTest { + + private final org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); + + @Test + void should_support_avro_circular_reference() throws IOException { + InputStream inputStream = AvroSchemaConverterTest.class.getClassLoader() + .getResourceAsStream("datasets/circular.avsc"); + + org.apache.avro.Schema avroSchema = parser.parse(inputStream); + AvroSchemaConverter converter = new AvroSchemaConverter(); + Schema connectSchema = converter.toConnectSchema(avroSchema); + Assertions.assertNotNull(connectSchema); + } +} \ No newline at end of file diff --git a/connect-file-pulse-dataformat/src/test/resources/datasets/circular.avsc b/connect-file-pulse-dataformat/src/test/resources/datasets/circular.avsc new file mode 100644 index 000000000..a975784ab --- /dev/null +++ b/connect-file-pulse-dataformat/src/test/resources/datasets/circular.avsc @@ -0,0 +1,43 @@ +{ + "type":"record", + "name":"Parent", + "fields":[ + { + "name":"name", + "type":[ + "null", + "string" + ], + "default":null + }, + { + "name":"child", + "type":[ + "null", + { + "type":"record", + "name":"Child", + "fields":[ + { + "name":"name", + "type":[ + "null", + "string" + ], + "default":null + }, + { + "name":"parent", + "type":[ + "null", + "Parent" + ], + "default":null + } + ] + } + ], + "default":null + } + ] +} \ No newline at end of file diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java index d584ae32e..788436121 100644 --- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java +++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java @@ -19,6 +19,7 @@ package io.streamthoughts.kafka.connect.filepulse.config; import com.jsoniter.JsonIterator; +import io.streamthoughts.kafka.connect.filepulse.avro.AvroSchemaConverter; import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter; import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing; import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileOrder; @@ -33,13 +34,10 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.function.Supplier; /** * @@ -51,23 +49,23 @@ public class CommonSourceConfig extends AbstractConfig { public static final String OUTPUT_TOPIC_CONFIG = "topic"; private static final String OUTPUT_TOPIC_DOC = "The Kafka topic to write the value to."; - public static final String FS_LISTING_CLASS_CONFIG = "fs.listing.class"; - private static final String FS_LISTING_CLASS_DOC = "Class which is used to list eligible files from the scanned file system."; + public static final String FS_LISTING_CLASS_CONFIG = "fs.listing.class"; + private static final String FS_LISTING_CLASS_DOC = "Class which is used to list eligible files from the scanned file system."; - public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters"; - private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files."; + public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters"; + private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files."; public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class"; - private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file."; + private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file."; public static final String TASKS_FILE_PROCESSING_ORDER_BY_CONFIG = "tasks.file.processing.order.by"; private static final String TASKS_FILE_PROCESSING_ORDER_BY_DOC = "The strategy to be used for sorting files for processing. Valid values are: LAST_MODIFIED, URI, CONTENT_LENGTH, CONTENT_LENGTH_DESC."; - public static final String TASKS_HALT_ON_ERROR_CONFIG = "tasks.halt.on.error"; - private static final String TASKS_HALT_ON_ERROR_DOC = "Should a task halt when it encounters an error or continue to the next file."; + public static final String TASKS_HALT_ON_ERROR_CONFIG = "tasks.halt.on.error"; + private static final String TASKS_HALT_ON_ERROR_DOC = "Should a task halt when it encounters an error or continue to the next file."; public static final String TASKS_EMPTY_POLL_WAIT_MS_CONFIG = "tasks.empty.poll.wait.ms"; - public static final String TASKS_EMPTY_POLL_WAIT_MS_DOC = "The amount of time in millisecond a tasks should wait if a poll returns an empty list of records."; + public static final String TASKS_EMPTY_POLL_WAIT_MS_DOC = "The amount of time in millisecond a tasks should wait if a poll returns an empty list of records."; public static final String OFFSET_STRATEGY_CLASS_CONFIG = "offset.policy.class"; private static final String OFFSET_STRATEGY_CLASS_DOC = "Class which is used to determine the source partition and offset that uniquely identify a input record"; @@ -86,6 +84,9 @@ public class CommonSourceConfig extends AbstractConfig { public static final String RECORD_VALUE_SCHEMA_CONFIG = "value.connect.schema"; private static final String RECORD_VALUE_SCHEMA_DOC = "The schema for the record-value"; + public static final String RECORD_VALUE_SCHEMA_TYPE_CONFIG = "value.connect.schema.type"; + private static final String RECORD_VALUE_SCHEMA_TYPE_DOC = "The type of the schema passed through 'value.connect.schema' (supported values are: 'CONNECT', 'AVRO')"; + public static final String RECORD_VALUE_SCHEMA_CONDITION_TOPIC_PATTERN_CONFIG = "value.connect.schema.condition.topic.pattern"; private static final String RECORD_VALUE_SCHEMA_CONDITION_TOPIC_PATTERN_DOC = @@ -197,6 +198,17 @@ public static ConfigDef getConfigDef() { ConfigDef.Width.NONE, RECORD_VALUE_SCHEMA_CONDITION_TOPIC_PATTERN_CONFIG ) + .define( + RECORD_VALUE_SCHEMA_TYPE_CONFIG, + ConfigDef.Type.STRING, + ConnectSchemaType.CONNECT.name(), + ConfigDef.Importance.MEDIUM, + RECORD_VALUE_SCHEMA_TYPE_DOC, + GROUP, + groupCounter++, + ConfigDef.Width.NONE, + RECORD_VALUE_SCHEMA_TYPE_CONFIG + ) .define( RECORD_VALUE_SCHEMA_CONFIG, ConfigDef.Type.STRING, @@ -305,7 +317,20 @@ public String getValueSchemaConditionTopicPattern() { } public Schema getValueConnectSchema() { - return readSchema(RECORD_VALUE_SCHEMA_CONFIG); + String valueConnectSchemaTypeString = getString(RECORD_VALUE_SCHEMA_TYPE_CONFIG); + ConnectSchemaType schemaType = ConnectSchemaType.getForNameIgnoreCase(valueConnectSchemaTypeString); + switch (schemaType) { + case CONNECT: + return readConfigSchema(); + case AVRO: + return readAvroSchema(); + case INVALID: + default: + throw new ConfigException( + "Unsupported or invalid value for '" + + RECORD_VALUE_SCHEMA_TYPE_CONFIG + "' , was " + + valueConnectSchemaTypeString); + } } public boolean isValueConnectSchemaMergeEnabled() { @@ -316,103 +341,28 @@ public boolean isSchemaKeepLeadingUnderscoreOnFieldName() { return getBoolean(CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG); } - private Schema readSchema(final String key) { - final String schema = this.getString(key); + private Schema readAvroSchema() { + try { + final String schema = getString(CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG); + if (StringUtils.isBlank(schema)) return null; - if (StringUtils.isBlank(schema)) { - return null; + AvroSchemaConverter converter = new AvroSchemaConverter(); + return converter.toConnectSchema(schema); + } catch (Exception e) { + throw new ConfigException( + "Failed to read avro-schema for '" + CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG + "'", e); } + } + + private Schema readConfigSchema() { + final String schema = getString(CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG); + if (StringUtils.isBlank(schema)) return null; try { return JsonIterator.deserialize(schema, ConfigSchema.class).get(); } catch (Exception e) { - throw new ConfigException("Failed to read schema for '" + key + "'", e); - } - } - - public static class ConfigSchema implements Supplier { - - public Schema.Type type; - public boolean isOptional; - public String name; - public Integer version; - public Object defaultValue; - public String doc; - public Map parameters; - public ConfigSchema keySchema; - public ConfigSchema valueSchema; - public Map fieldSchemas; - - @Override - public Schema get() { - final SchemaBuilder builder; - switch (this.type) { - case MAP: - Objects.requireNonNull(keySchema, "keySchema cannot be null."); - Objects.requireNonNull(valueSchema, "valueSchema cannot be null."); - builder = SchemaBuilder.map(keySchema.get(), valueSchema.get()); - break; - case ARRAY: - Objects.requireNonNull(valueSchema, "valueSchema cannot be null."); - builder = SchemaBuilder.array(valueSchema.get()); - break; - default: - builder = SchemaBuilder.type(type); - break; - } - - if (Schema.Type.STRUCT == type) { - for (Map.Entry kvp : fieldSchemas.entrySet()) { - builder.field(kvp.getKey(), kvp.getValue().get()); - } - } - - if (StringUtils.isNotBlank(name)) - builder.name(name); - - if (StringUtils.isNotBlank(doc)) - builder.doc(doc); - - if (null != defaultValue) { - Object value; - switch (type) { - case INT8: - value = ((Number) defaultValue).byteValue(); - break; - case INT16: - value = ((Number) defaultValue).shortValue(); - break; - case INT32: - value = ((Number) defaultValue).intValue(); - break; - case INT64: - value = ((Number) defaultValue).longValue(); - break; - case FLOAT32: - value = ((Number) defaultValue).floatValue(); - break; - case FLOAT64: - value = ((Number) defaultValue).doubleValue(); - break; - default: - value = defaultValue; - break; - } - builder.defaultValue(value); - } - - if (null != parameters) { - builder.parameters(parameters); - } - - if (isOptional) { - builder.optional(); - } - - if (null != version) { - builder.version(version); - } - return builder.build(); + throw new ConfigException( + "Failed to read connect-schema for '" + CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG + "'", e); } } } diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConnectSchemaType.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConnectSchemaType.java new file mode 100644 index 000000000..099e178d7 --- /dev/null +++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConnectSchemaType.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.config; + +import java.util.Arrays; +import java.util.Locale; + +public enum ConnectSchemaType { + INVALID, + CONNECT, + AVRO; + + public static ConnectSchemaType getForNameIgnoreCase(final String str) { + return Arrays.stream(ConnectSchemaType.values()) + .filter(e -> e.name().equals(str.toUpperCase(Locale.ROOT))) + .findFirst() + .orElse(ConnectSchemaType.INVALID); + } +}