Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove Connect schema from Format interface #4637

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private Optional<DdlCommand> maybeCreateSinkDdl(
}

final SchemaRegistryClient srClient = serviceContext.getSchemaRegistryClient();
AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient, ksqlConfig);
AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient);
return Optional.of(ddl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,28 @@

package io.confluent.ksql.schema.ksql.inference;

import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.execution.expression.tree.Type;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SchemaParser;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.parser.tree.TableElement.Namespace;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaResult;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;

/**
* An injector which injects the value columns into the supplied {@code statement}.
Expand All @@ -59,9 +54,6 @@
*/
public class DefaultSchemaInjector implements Injector {

private static final SqlSchemaFormatter FORMATTER = new SqlSchemaFormatter(
w -> !IdentifierUtil.isValid(w), Option.AS_COLUMN_LIST);

private final TopicSchemaSupplier schemaSupplier;

public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) {
Expand Down Expand Up @@ -140,7 +132,7 @@ private static CreateSource addSchemaFields(
final ConfiguredStatement<CreateSource> preparedStatement,
final SchemaAndId schema
) {
final TableElements elements = buildElements(schema.schema, preparedStatement);
final TableElements elements = buildElements(schema.columns, preparedStatement);

final CreateSource statement = preparedStatement.getStatement();
final CreateSourceProperties properties = statement.getProperties();
Expand All @@ -152,17 +144,16 @@ private static CreateSource addSchemaFields(
}

private static TableElements buildElements(
final Schema valueSchema,
final List<? extends SimpleColumn> valueColumns,
final ConfiguredStatement<CreateSource> preparedStatement
) {
throwOnInvalidSchema(valueSchema);

final List<TableElement> elements = new ArrayList<>();

getKeyColumns(preparedStatement)
.forEach(elements::add);

getColumnsFromSchema(valueSchema)
valueColumns.stream()
.map(col -> new TableElement(Namespace.VALUE, col.name(), new Type(col.type())))
.forEach(elements::add);

return TableElements.of(elements);
Expand All @@ -175,23 +166,6 @@ private static Stream<TableElement> getKeyColumns(
.filter(e -> e.getNamespace() == Namespace.KEY);
}

private static Stream<TableElement> getColumnsFromSchema(final Schema schema) {
return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream();
}

private static void throwOnInvalidSchema(final Schema schema) {
try {
SchemaConverters.connectToSqlConverter().toSqlType(schema);
} catch (final Exception e) {
throw new KsqlException(
"Schema contains types not supported by KSQL: " + e.getMessage()
+ System.lineSeparator()
+ "Schema: " + schema,
e
);
}
}

private static PreparedStatement<CreateSource> buildPreparedStatement(
final CreateSource stmt
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,38 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.links.DocumentationLinks;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.connect.ConnectSchemaTranslator;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.http.HttpStatus;
import org.apache.kafka.connect.data.Schema;

/**
* A {@link TopicSchemaSupplier} that retrieves schemas from the Schema Registry.
*/
public class SchemaRegistryTopicSchemaSupplier implements TopicSchemaSupplier {

private final SchemaRegistryClient srClient;
private final Function<Schema, Schema> toKsqlTranslator;
private final Function<String, Format> formatFactory;

public SchemaRegistryTopicSchemaSupplier(final SchemaRegistryClient srClient) {
this(
srClient,
new ConnectSchemaTranslator()::toKsqlSchema,
FormatFactory::fromName
);
}

@VisibleForTesting
SchemaRegistryTopicSchemaSupplier(
final SchemaRegistryClient srClient,
final Function<Schema, Schema> toKsqlTranslator,
final Function<String, Format> formatFactory
) {
this.srClient = Objects.requireNonNull(srClient, "srClient");
this.toKsqlTranslator = Objects.requireNonNull(toKsqlTranslator, "toKsqlTranslator");
this.formatFactory = Objects.requireNonNull(formatFactory, "formatFactory");
}

Expand Down Expand Up @@ -96,8 +92,8 @@ public SchemaResult fromParsedSchema(
) {
try {
final Format format = formatFactory.apply(parsedSchema.schemaType());
final Schema connectSchema = toKsqlTranslator.apply(format.toConnectSchema(parsedSchema));
return SchemaResult.success(SchemaAndId.schemaAndId(connectSchema, id));
final List<SimpleColumn> columns = format.toColumns(parsedSchema);
return SchemaResult.success(SchemaAndId.schemaAndId(columns, id));
} catch (final Exception e) {
return notCompatible(topic, parsedSchema.canonicalString(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

package io.confluent.ksql.schema.ksql.inference;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;

/**
* Supplier of schemas for topics
Expand All @@ -37,15 +39,21 @@ public interface TopicSchemaSupplier {
final class SchemaAndId {

final int id;
final Schema schema;
final List<? extends SimpleColumn> columns;

private SchemaAndId(final Schema schema, final int id) {
private SchemaAndId(
final List<? extends SimpleColumn> columns,
final int id
) {
this.id = id;
this.schema = Objects.requireNonNull(schema, "schema");
this.columns = ImmutableList.copyOf(Objects.requireNonNull(columns, "columns"));
}

static SchemaAndId schemaAndId(final Schema schema, final int id) {
return new SchemaAndId(schema, id);
static SchemaAndId schemaAndId(
final List<? extends SimpleColumn> columns,
final int id
) {
return new SchemaAndId(columns, id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ private AvroUtil() {
public static void throwOnInvalidSchemaEvolution(
final String statementText,
final CreateSourceCommand ddl,
final SchemaRegistryClient schemaRegistryClient,
final KsqlConfig ksqlConfig
final SchemaRegistryClient schemaRegistryClient
) {
final io.confluent.ksql.execution.plan.Formats formats = ddl.getFormats();
final FormatInfo format = formats.getValueFormat();
Expand All @@ -51,8 +50,7 @@ public static void throwOnInvalidSchemaEvolution(
final org.apache.avro.Schema avroSchema = AvroSchemas.getAvroSchema(
physicalSchema.valueSchema(),
format.getProperties()
.getOrDefault(AvroFormat.FULL_SCHEMA_NAME, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME),
ksqlConfig
.getOrDefault(AvroFormat.FULL_SCHEMA_NAME, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME)
);

final String topicName = ddl.getTopicName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,11 @@ private Deserializer<GenericRow> getValueDeserializer(

public void ensureSchema(
final String topicName,
final PhysicalSchema schema,
final KsqlConfig ksqlConfig) {
final PhysicalSchema schema) {
final SchemaRegistryClient srClient = serviceContext.get().getSchemaRegistryClient();
try {
final org.apache.avro.Schema avroSchema = AvroSchemas
.getAvroSchema(schema.valueSchema(), "test_" + topicName, ksqlConfig);
.getAvroSchema(schema.valueSchema(), "test_" + topicName);

srClient.register(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, new AvroSchema(avroSchema));
} catch (final Exception e) {
Expand Down
Loading