Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: register correct unwrapped schema #6188

Merged
merged 3 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.datagen.RowGenerator;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.serde.EnabledSerdeFeatures;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericRowSerDe;
Expand Down Expand Up @@ -187,7 +188,7 @@ private static Serde<GenericRow> getGenericRowSerde(
) {
return GenericRowSerDe.from(
format,
PersistenceSchema.from((ConnectSchema) schema, false),
PersistenceSchema.from((ConnectSchema) schema, EnabledSerdeFeatures.of()),
new KsqlConfig(Collections.emptyMap()),
schemaRegistryClientFactory,
"benchmark",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@

package io.confluent.ksql.schema.ksql;

import static io.confluent.ksql.serde.SerdeFeature.UNWRAP_SINGLES;
import static io.confluent.ksql.serde.SerdeFeature.WRAP_SINGLES;
import static java.util.Objects.requireNonNull;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter;
import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option;
import io.confluent.ksql.serde.EnabledSerdeFeatures;
import io.confluent.ksql.testing.EffectivelyImmutable;
import java.util.Objects;
import org.apache.kafka.connect.data.ConnectSchema;
Expand All @@ -41,59 +46,47 @@ public final class PersistenceSchema {
private static final SqlSchemaFormatter FORMATTER =
new SqlSchemaFormatter(word -> false, Option.APPEND_NOT_NULL);

private final boolean unwrapped;
private final ConnectSchema ksqlSchema;
private final ConnectSchema serializedSchema;
private final ConnectSchema schema;
private final EnabledSerdeFeatures features;

/**
* Build a persistence schema from the logical key or value schema.
*
* @param ksqlSchema the schema ksql uses internally, i.e. STRUCT schema.
* @param unwrapSingle flag indicating if the serialized form is unwrapped.
* @param schema the schema ksql uses internally, i.e. STRUCT schema.
* @param features the serder features used for persistence.
* @return the persistence schema.
*/
public static PersistenceSchema from(final ConnectSchema ksqlSchema, final boolean unwrapSingle) {
return new PersistenceSchema(ksqlSchema, unwrapSingle);
public static PersistenceSchema from(
final ConnectSchema schema,
final EnabledSerdeFeatures features
) {
return new PersistenceSchema(schema, features);
}

private PersistenceSchema(final ConnectSchema ksqlSchema, final boolean unwrapSingle) {
this.unwrapped = unwrapSingle;
this.ksqlSchema = Objects.requireNonNull(ksqlSchema, "ksqlSchema");
private PersistenceSchema(
final ConnectSchema ksqlSchema,
final EnabledSerdeFeatures features
) {
this.features = requireNonNull(features, "features");
this.schema = requireNonNull(ksqlSchema, "ksqlSchema");

if (ksqlSchema.type() != Type.STRUCT) {
throw new IllegalArgumentException("Expected STRUCT schema type");
}

final boolean singleField = ksqlSchema.fields().size() == 1;
if (unwrapSingle && !singleField) {
throw new IllegalArgumentException("Unwrapping only valid for single field");
if (features.enabled(WRAP_SINGLES) || features.enabled(UNWRAP_SINGLES)) {
if (ksqlSchema.fields().size() != 1) {
throw new IllegalArgumentException("Unwrapping only valid for single field");
}
}

this.serializedSchema = unwrapSingle
? (ConnectSchema) ksqlSchema.fields().get(0).schema()
: ksqlSchema;
}

public boolean isUnwrapped() {
return unwrapped;
}

/**
* The schema used internally by KSQL.
*
* <p>This schema will _always_ be a struct.
*
* @return logical schema.
*/
public ConnectSchema ksqlSchema() {
return ksqlSchema;
public EnabledSerdeFeatures features() {
return features;
}

/**
* @return schema of serialized form
*/
public ConnectSchema serializedSchema() {
return serializedSchema;
public ConnectSchema connectSchema() {
return schema;
}

@Override
Expand All @@ -105,20 +98,20 @@ public boolean equals(final Object o) {
return false;
}
final PersistenceSchema that = (PersistenceSchema) o;
return unwrapped == that.unwrapped
&& Objects.equals(serializedSchema, that.serializedSchema);
return Objects.equals(features, that.features)
&& Objects.equals(schema, that.schema);
}

@Override
public int hashCode() {
return Objects.hash(unwrapped, serializedSchema);
return Objects.hash(features, schema);
}

@Override
public String toString() {
return "Persistence{"
+ "schema=" + FORMATTER.format(serializedSchema)
+ ", unwrapped=" + unwrapped
+ "schema=" + FORMATTER.format(schema)
+ ", features=" + features
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
import static java.util.Objects.requireNonNull;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.SerdeOptions;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import org.apache.kafka.connect.data.ConnectSchema;

/**
* Physical KSQL schema.
Expand Down Expand Up @@ -80,8 +76,10 @@ private PhysicalSchema(
) {
this.logicalSchema = requireNonNull(logicalSchema, "logicalSchema");
this.serdeOptions = requireNonNull(serdeOptions, "serdeOptions");
this.keySchema = buildKeyPhysical(logicalSchema.keyConnectSchema());
this.valueSchema = buildValuePhysical(logicalSchema.valueConnectSchema(), serdeOptions);
this.keySchema = PersistenceSchema
.from(logicalSchema.keyConnectSchema(), serdeOptions.keyFeatures());
this.valueSchema = PersistenceSchema
.from(logicalSchema.valueConnectSchema(), serdeOptions.valueFeatures());
}

@Override
Expand Down Expand Up @@ -110,28 +108,4 @@ public String toString() {
+ ", serdeOptions=" + serdeOptions
+ '}';
}

private static PersistenceSchema buildKeyPhysical(
final ConnectSchema keyConnectSchema
) {
return PersistenceSchema.from(keyConnectSchema, false);
}

private static PersistenceSchema buildValuePhysical(
final ConnectSchema valueConnectSchema,
final SerdeOptions serdeOptions
) {
final boolean singleField = valueConnectSchema.fields().size() == 1;

final boolean unwrapSingle = serdeOptions.valueWrapping()
.map(option -> option == SerdeOption.UNWRAP_SINGLE_VALUES)
.orElse(false);

if (unwrapSingle && !singleField) {
throw new KsqlException("'" + CommonCreateConfigs.WRAP_SINGLE_VALUE + "' "
+ "is only valid for single-field value schemas");
}

return PersistenceSchema.from(valueConnectSchema, unwrapSingle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ public interface SqlToConnectTypeConverter {
Schema toConnectSchema(SqlType sqlType, String name, String doc);
}

public interface ConnectToJavaTypeConverter {
/**
* Convert the supplied Connect {@code schema} to its corresponding Java type.
*
* @param schema the Connect schema.
* @return the java type.
*/
Class<?> toJavaType(Schema schema);
}

public interface JavaToSqlTypeConverter {

/**
Expand Down Expand Up @@ -174,6 +184,12 @@ public static SqlToConnectTypeConverter sqlToConnectConverter() {
return SQL_TO_CONNECT_CONVERTER;
}

public static ConnectToJavaTypeConverter connectToJavaTypeConverter() {
return schema -> SchemaConverters.sqlToJavaConverter().toJavaType(
SchemaConverters.connectToSqlConverter().toSqlType(schema)
);
}

public static JavaToSqlTypeConverter javaToSqlConverter() {
return JAVA_TO_SQL_CONVERTER;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import static io.confluent.ksql.serde.SerdeFeature.UNWRAP_SINGLES;
import static io.confluent.ksql.serde.SerdeFeature.WRAP_SINGLES;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;
import java.util.Set;


/**
* Validated set of enabled features
*
* <p>Known to not have conflicting features enabled
*/
@Immutable
public final class EnabledSerdeFeatures {

private final ImmutableSet<SerdeFeature> features;
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved

@JsonCreator
public static EnabledSerdeFeatures from(final Set<SerdeFeature> features) {
return new EnabledSerdeFeatures(features);
}

public static EnabledSerdeFeatures of(final SerdeFeature... features) {
return new EnabledSerdeFeatures(ImmutableSet.copyOf(features));
}

private EnabledSerdeFeatures(final Set<SerdeFeature> features) {
validate(features);
this.features = ImmutableSet.copyOf(features);
}

public boolean enabled(final SerdeFeature feature) {
return features.contains(feature);
}

public Set<SerdeFeature> all() {
return features;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final EnabledSerdeFeatures that = (EnabledSerdeFeatures) o;
return Objects.equals(features, that.features);
}

@Override
public int hashCode() {
return Objects.hash(features);
}

@Override
public String toString() {
return features.toString();
}

private static void validate(final Set<SerdeFeature> features) {
if (features.contains(WRAP_SINGLES) && features.contains(UNWRAP_SINGLES)) {
throw new IllegalArgumentException("Can't set both "
+ WRAP_SINGLES + " and " + UNWRAP_SINGLES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Validated set of {@link SerdeOption}s.
Expand Down Expand Up @@ -49,6 +50,19 @@ private SerdeOptions(final ImmutableSet<SerdeOption> options) {
this.options = validate(Objects.requireNonNull(options, "options"));
}

@SuppressWarnings("MethodMayBeStatic")
public EnabledSerdeFeatures keyFeatures() {
// Currently there are no key features:
return EnabledSerdeFeatures.of();
}

public EnabledSerdeFeatures valueFeatures() {
// Currently there are no key features, so all are value features:
return EnabledSerdeFeatures.from(options.stream()
.map(SerdeOption::requiredFeature)
.collect(Collectors.toSet()));
}

public Set<SerdeOption> all() {
return options;
}
Expand Down Expand Up @@ -78,7 +92,7 @@ public int hashCode() {

@Override
public String toString() {
return "SerdeOptions" + options;
return options.toString();
}

private static ImmutableSet<SerdeOption> validate(final ImmutableSet<SerdeOption> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public final class DecimalUtil {

private static final String PRECISION_FIELD = "connect.decimal.precision";
public static final String PRECISION_FIELD = "connect.decimal.precision";

private DecimalUtil() {
}
Expand Down
Loading