Skip to content

Commit

Permalink
fix: preserve old schema behavior for protobuf wrapped primitives (#8934
Browse files Browse the repository at this point in the history
)
  • Loading branch information
vcrfxia committed Mar 31, 2022
1 parent 1c6caa1 commit 36485e2
Show file tree
Hide file tree
Showing 384 changed files with 56,705 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,13 @@ private void analyzeNonStdOutSink(final Sink sink) {
srcTopic.getKeyFormat().getFormatInfo()
);

final String valueFormatName = formatName(
props.getValueFormat(),
srcTopic.getValueFormat().getFormatInfo()
);
final FormatInfo valueFmtInfo = buildFormatInfo(
formatName(props.getValueFormat(), srcTopic.getValueFormat().getFormatInfo()),
props.getValueFormatProperties(),
valueFormatName,
props.getValueFormatProperties(valueFormatName),
srcTopic.getValueFormat().getFormatInfo()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Injectors implements BiFunction<KsqlExecutionContext, ServiceContext

NO_TOPIC_DELETE((ec, sc) -> InjectorChain.of(
new DefaultFormatInjector(),
new SourcePropertyInjector(),
new DefaultSchemaInjector(
new SchemaRegistryTopicSchemaSupplier(sc.getSchemaRegistryClient()), ec, sc),
new TopicCreateInjector(ec, sc),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.statement;

import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.util.ErrorMessageUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;

/**
* An injector which injects information into {@code CreateSourceProperties}
* or {@code CreateSourceAsProperties}.
*
* <p>The only information injected currently is that new queries using the
* protobuf format should use the new default converter behavior to unwrap
* wrapped primitives. This injector causes new queries to use the new behavior
* while old (existing) queries will continue to use the old behavior.
*
* <p>If a statement that is not {@code CreateAsSelect} or {@code CreateSource}
* is passed in, this results in a no-op that returns the incoming statement.</p>
*/
public class SourcePropertyInjector implements Injector {

@SuppressWarnings("unchecked")
public <T extends Statement> ConfiguredStatement<T> inject(
final ConfiguredStatement<T> statement
) {
if (!(statement.getStatement() instanceof CreateSource)
&& !(statement.getStatement() instanceof CreateAsSelect)) {
return statement;
}

try {
if (statement.getStatement() instanceof CreateAsSelect) {
return (ConfiguredStatement<T>) injectForCreateAsSelect(
(ConfiguredStatement<? extends CreateAsSelect>) statement);
} else {
return (ConfiguredStatement<T>) injectForCreateSource(
(ConfiguredStatement<? extends CreateSource>) statement);
}
} catch (final KsqlStatementException e) {
throw e;
} catch (final KsqlException e) {
throw new KsqlStatementException(
ErrorMessageUtil.buildErrorMessage(e),
statement.getStatementText(),
e.getCause());
}
}

private ConfiguredStatement<? extends CreateSource> injectForCreateSource(
final ConfiguredStatement<? extends CreateSource> original
) {
final CreateSource statement = original.getStatement();
final CreateSourceProperties properties = statement.getProperties();

final CreateSourceProperties injectedProps = properties.withUnwrapProtobufPrimitives(true);

return buildConfiguredStatement(original, injectedProps);
}

private ConfiguredStatement<? extends CreateAsSelect> injectForCreateAsSelect(
final ConfiguredStatement<? extends CreateAsSelect> original
) {
final CreateAsSelect createAsSelect = original.getStatement();
final CreateSourceAsProperties properties = createAsSelect.getProperties();

final CreateSourceAsProperties injectedProps = properties.withUnwrapProtobufPrimitives(true);

return buildConfiguredStatement(original, injectedProps);
}

private static ConfiguredStatement<CreateSource> buildConfiguredStatement(
final ConfiguredStatement<? extends CreateSource> original,
final CreateSourceProperties injectedProps
) {
final CreateSource statement = original.getStatement();

final CreateSource withProps = statement.copyWith(
original.getStatement().getElements(),
injectedProps
);

final PreparedStatement<CreateSource> prepared = buildPreparedStatement(withProps);
return ConfiguredStatement.of(prepared, original.getSessionConfig());
}

private static ConfiguredStatement<CreateAsSelect> buildConfiguredStatement(
final ConfiguredStatement<? extends CreateAsSelect> original,
final CreateSourceAsProperties injectedProps
) {
final CreateAsSelect statement = original.getStatement();

final CreateAsSelect withProps = statement.copyWith(injectedProps);

final PreparedStatement<CreateAsSelect> prepared = buildPreparedStatement(withProps);
return ConfiguredStatement.of(prepared, original.getSessionConfig());
}

private static <T extends Statement> PreparedStatement<T> buildPreparedStatement(
final T stmt
) {
return PreparedStatement.of(SqlFormatter.formatSql(stmt), stmt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ public void shouldMaintainOrderOfReturnedQueries() {

// Then:
assertThat(queries, hasSize(2));
assertThat(queries.get(0).getStatementString(), containsString("create stream foo"));
assertThat(queries.get(1).getStatementString(), containsString("create stream bar"));
assertThat(queries.get(0).getStatementString(), containsString("CREATE STREAM FOO"));
assertThat(queries.get(1).getStatementString(), containsString("CREATE STREAM BAR"));
}

@Test(expected = KsqlStatementException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.SourcePropertyInjector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.QueryMetadata;
Expand Down Expand Up @@ -174,10 +175,11 @@ private static ExecuteResult execute(
final ConfiguredStatement<?> configured = ConfiguredStatement.of(
prepared, SessionConfig.of(ksqlConfig, overriddenProperties));
final ConfiguredStatement<?> withFormats = new DefaultFormatInjector().inject(configured);
final ConfiguredStatement<?> withSourceProps = new SourcePropertyInjector().inject(withFormats);
final ConfiguredStatement<?> withSchema =
schemaInjector
.map(injector -> injector.inject(withFormats))
.orElse((ConfiguredStatement) withFormats);
.map(injector -> injector.inject(withSourceProps))
.orElse((ConfiguredStatement) withSourceProps);
try {
return executionContext.execute(serviceContext, withSchema);
} catch (final KsqlStatementException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.statement;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.TableElements;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SourcePropertyInjectorTest {

@Mock
private SessionConfig sessionConfig;

@Mock
private CreateSource createSource;
@Mock
private ConfiguredStatement<CreateSource> csStatement;
@Mock
private CreateSourceProperties originalCSProps;
@Mock
private CreateSourceProperties csPropsWithUnwrapping;
@Mock
private TableElements tableElements;
@Mock
private CreateSource csWithUnwrapping;

@Mock
private CreateAsSelect createAsSelect;
@Mock
private ConfiguredStatement<CreateAsSelect> csasStatement;
@Mock
private CreateSourceAsProperties originalCsasProps;
@Mock
private CreateSourceAsProperties csasPropsWithUnwrapping;
@Mock
private CreateAsSelect csasWithUnwrapping;

private SourcePropertyInjector injector;

@Before
public void setUp() {
when(csStatement.getStatement()).thenReturn(createSource);
when(csStatement.getSessionConfig()).thenReturn(sessionConfig);
when(createSource.getProperties()).thenReturn(originalCSProps);
when(createSource.getElements()).thenReturn(tableElements);
when(originalCSProps.withUnwrapProtobufPrimitives(true)).thenReturn(csPropsWithUnwrapping);
when(createSource.copyWith(tableElements, csPropsWithUnwrapping)).thenReturn(csWithUnwrapping);

when(csasStatement.getStatement()).thenReturn(createAsSelect);
when(csasStatement.getSessionConfig()).thenReturn(sessionConfig);
when(createAsSelect.getProperties()).thenReturn(originalCsasProps);
when(originalCsasProps.withUnwrapProtobufPrimitives(true)).thenReturn(csasPropsWithUnwrapping);
when(createAsSelect.copyWith(csasPropsWithUnwrapping)).thenReturn(csasWithUnwrapping);

injector = new SourcePropertyInjector();
}

@Test
public void shouldInjectForCreateSource() {
// When:
final ConfiguredStatement<CreateSource> configured = injector.inject(csStatement);

// Then:
assertThat(configured.getStatement(), is(csWithUnwrapping));
}

@Test
public void shouldInjectForCreateAsSelect() {
// When:
final ConfiguredStatement<CreateAsSelect> configured = injector.inject(csasStatement);

// Then:
assertThat(configured.getStatement(), is(csasWithUnwrapping));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ public final class AssertExecutor {
CommonCreateConfigs.KEY_FORMAT_PROPERTY,
CommonCreateConfigs.FORMAT_PROPERTY
)).add(new SourceProperty(
ds -> ds.getKsqlTopic().getKeyFormat().getFormatInfo().getFormat(),
(cs, cfg) -> cs.getProperties().getKeyFormat(cs.getName()).map(FormatInfo::getFormat)
.orElse(cfg.getString(KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG)),
ds -> ds.getKsqlTopic().getKeyFormat().getFormatInfo().getProperties(),
(cs, cfg) -> cs.getProperties().getKeyFormatProperties(
cs.getProperties().getKeyFormat(cs.getName()).map(FormatInfo::getFormat)
.orElse(cfg.getString(KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG)),
cs.getName().text()
),
"key format properties",
CommonCreateConfigs.KEY_DELIMITER_PROPERTY,
CommonCreateConfigs.KEY_SCHEMA_FULL_NAME
Expand All @@ -95,7 +98,10 @@ public final class AssertExecutor {
CommonCreateConfigs.FORMAT_PROPERTY
)).add(new SourceProperty(
ds -> ds.getKsqlTopic().getValueFormat().getFormatInfo().getProperties(),
(cs, cfg) -> cs.getProperties().getValueFormatProperties(),
(cs, cfg) -> cs.getProperties().getValueFormatProperties(
cs.getProperties().getValueFormat().map(FormatInfo::getFormat)
.orElse(cfg.getString(KsqlConfig.KSQL_DEFAULT_VALUE_FORMAT_CONFIG))
),
"value format properties",
CommonCreateConfigs.VALUE_AVRO_SCHEMA_FULL_NAME,
CommonCreateConfigs.VALUE_SCHEMA_FULL_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,31 @@

package io.confluent.ksql.test.serde.protobuf;

import static io.confluent.connect.protobuf.ProtobufDataConfig.SCHEMAS_CACHE_SIZE_CONFIG;
import static io.confluent.connect.protobuf.ProtobufDataConfig.WRAPPER_FOR_RAW_PRIMITIVES_CONFIG;

import com.google.common.collect.ImmutableMap;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.connect.protobuf.ProtobufDataConfig;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.test.serde.ConnectSerdeSupplier;
import org.apache.kafka.connect.data.Schema;

public class ValueSpecProtobufSerdeSupplier extends ConnectSerdeSupplier<ProtobufSchema> {

public ValueSpecProtobufSerdeSupplier() {
private final boolean unwrapPrimitives;

public ValueSpecProtobufSerdeSupplier(final boolean unwrapPrimitives) {
super(ProtobufConverter::new);
this.unwrapPrimitives = unwrapPrimitives;
}

@Override
protected Schema fromParsedSchema(final ProtobufSchema schema) {
return new ProtobufData(1).toConnectSchema(schema);
return new ProtobufData(new ProtobufDataConfig(ImmutableMap.of(
SCHEMAS_CACHE_SIZE_CONFIG, 1,
WRAPPER_FOR_RAW_PRIMITIVES_CONFIG, unwrapPrimitives
))).toConnectSchema(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.SourcePropertyInjector;
import io.confluent.ksql.util.KsqlConfig;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -227,7 +228,9 @@ private static Topic createTopicFromStatement(
final ConfiguredStatement<?> configured =
ConfiguredStatement.of(prepare, SessionConfig.of(ksqlConfig, Collections.emptyMap()));
final ConfiguredStatement<?> withFormats = new DefaultFormatInjector().inject(configured);
topics.add(extractTopic.apply(withFormats));
final ConfiguredStatement<?> withSourceProps =
new SourcePropertyInjector().inject(withFormats);
topics.add(extractTopic.apply(withSourceProps));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.InjectorChain;
import io.confluent.ksql.statement.SourcePropertyInjector;
import io.confluent.ksql.test.tools.stubs.StubKafkaService;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
Expand Down Expand Up @@ -485,10 +486,12 @@ private PlannedStatement planStatement(final ParsedStatement stmt) {

final ConfiguredStatement<?> withFormats =
new DefaultFormatInjector().inject(configured);
final ConfiguredStatement<?> withSourceProps =
new SourcePropertyInjector().inject(withFormats);
final ConfiguredStatement<?> withSchema =
schemaInjector
.map(injector -> injector.inject(withFormats))
.orElse((ConfiguredStatement) withFormats);
.map(injector -> injector.inject(withSourceProps))
.orElse((ConfiguredStatement) withSourceProps);

final KsqlPlan plan = executionContext
.plan(executionContext.getServiceContext(), withSchema);
Expand Down
Loading

0 comments on commit 36485e2

Please sign in to comment.