Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce Format interface #4437

Merged
merged 1 commit into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import io.confluent.ksql.datagen.RowGenerator;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.avro.AvroFormat;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Pair;
import java.io.InputStream;
Expand Down Expand Up @@ -158,7 +159,7 @@ private static Serde<GenericRow> getJsonSerdeHelper(
final org.apache.kafka.connect.data.Schema schema
) {
return getGenericRowSerde(
FormatInfo.of(Format.JSON.name()),
FormatInfo.of(FormatFactory.JSON.name()),
schema,
() -> null
);
Expand All @@ -171,7 +172,8 @@ private static Serde<GenericRow> getAvroSerde(

return getGenericRowSerde(
FormatInfo.of(
Format.AVRO.name(), ImmutableMap.of(FormatInfo.FULL_SCHEMA_NAME, "benchmarkSchema")),
FormatFactory.AVRO.name(),
ImmutableMap.of(AvroFormat.FULL_SCHEMA_NAME, "benchmarkSchema")),
schema,
() -> schemaRegistryClient
);
Expand Down
6 changes: 3 additions & 3 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.test.util.KsqlIdentifierTestUtil;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -179,7 +179,7 @@ public static void classSetUp() {
orderDataProvider = new OrderDataProvider();
TEST_HARNESS.getKafkaCluster().createTopic(orderDataProvider.topicName());

TEST_HARNESS.produceRows(orderDataProvider.topicName(), orderDataProvider, Format.JSON);
TEST_HARNESS.produceRows(orderDataProvider.topicName(), orderDataProvider, FormatFactory.JSON);

try (Cli cli = Cli.build(1L, 1000L, OutputFormat.JSON, restClient)) {
createKStream(orderDataProvider, cli);
Expand Down Expand Up @@ -270,7 +270,7 @@ private void testCreateStreamAsSelect(
final Map<Long, GenericRow> results = TEST_HARNESS.verifyAvailableUniqueRows(
streamName,
expectedResults.size(),
Format.JSON,
FormatFactory.JSON,
resultSchema
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.cli;

import static io.confluent.ksql.serde.Format.JSON;
import static io.confluent.ksql.serde.FormatFactory.JSON;
import static java.util.Collections.emptyMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
@Immutable
public final class FormatInfo {

public static final String FULL_SCHEMA_NAME = "fullSchemaName";
public static final String DELIMITER = "delimiter";

private final String format;
private final ImmutableMap<String, String> properties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.util.KsqlException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -36,8 +35,8 @@ public class FormatInfoTest {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
FormatInfo.of("DELIMITED", ImmutableMap.of(FormatInfo.DELIMITER, "x")),
FormatInfo.of("DELIMITED", ImmutableMap.of(FormatInfo.DELIMITER, "x"))
FormatInfo.of("DELIMITED", ImmutableMap.of("prop", "x")),
FormatInfo.of("DELIMITED", ImmutableMap.of("prop", "x"))
)
.addEqualityGroup(
FormatInfo.of("DELIMITED"),
Expand All @@ -47,15 +46,15 @@ public void shouldImplementEquals() {
FormatInfo.of("AVRO")
)
.addEqualityGroup(
FormatInfo.of("DELIMITED", ImmutableMap.of(FormatInfo.DELIMITER, "|"))
FormatInfo.of("DELIMITED", ImmutableMap.of("prop", "|"))
)
.testEquals();
}

@Test
public void shouldImplementToStringAvro() {
// Given:
final FormatInfo info = FormatInfo.of("AVRO", ImmutableMap.of(FormatInfo.FULL_SCHEMA_NAME, "something"));
final FormatInfo info = FormatInfo.of("AVRO", ImmutableMap.of("property", "something"));

// When:
final String result = info.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeOption;
Expand Down Expand Up @@ -232,7 +233,8 @@ private KeyFormat buildKeyFormat() {
.map(WindowExpression::getKsqlWindowExpression);

return ksqlWindow
.map(w -> KeyFormat.windowed(FormatInfo.of(Format.KAFKA.name()), w.getWindowInfo()))
.map(w -> KeyFormat.windowed(
FormatInfo.of(FormatFactory.KAFKA.name()), w.getWindowInfo()))
.orElseGet(() -> analysis
.getFromDataSources()
.get(0)
Expand Down Expand Up @@ -264,7 +266,7 @@ private List<ColumnName> getColumnNames() {

private Format getValueFormat(final Sink sink) {
return sink.getProperties().getValueFormat()
.orElseGet(() -> Format.of(getSourceInfo()));
.orElseGet(() -> FormatFactory.of(getSourceInfo()));
}

private FormatInfo getSourceInfo() {
Expand Down Expand Up @@ -612,7 +614,7 @@ private List<Column> systemColumnsToTheFront(final List<Column> columns) {
public void validate() {
final String kafkaSources = analysis.getFromDataSources().stream()
.filter(s -> s.getDataSource().getKsqlTopic().getValueFormat().getFormat()
== Format.KAFKA)
== FormatFactory.KAFKA)
.map(AliasedDataSource::getAlias)
.map(SourceName::name)
.collect(Collectors.joining(", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
Expand Down Expand Up @@ -110,7 +109,8 @@ public CreateStreamCommand createStreamCommand(
keyFieldName,
timestampColumn,
topic.getKafkaTopicName(),
Formats.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
io.confluent.ksql.execution.plan.Formats
.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
topic.getKeyFormat().getWindowInfo()
);
}
Expand Down Expand Up @@ -143,7 +143,8 @@ public CreateTableCommand createTableCommand(
keyFieldName,
timestampColumn,
topic.getKafkaTopicName(),
Formats.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
io.confluent.ksql.execution.plan.Formats
.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
topic.getKeyFormat().getWindowInfo()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
Expand Down Expand Up @@ -437,7 +437,7 @@ private byte[] serializeValue(
try {
return valueSerde.serializer().serialize(topicName, row);
} catch (final Exception e) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat() == Format.AVRO) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) {
final Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof RestClientException) {
switch (((RestClientException) rootCause).getStatus()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
Expand Down Expand Up @@ -205,11 +207,18 @@ public TransientQueryMetadata executeQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement
) {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.executeQuery(statement);
registerQuery(query);
return query;
try {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getConfig(), statement.getOverrides())
.executeQuery(statement);
registerQuery(query);
return query;
} catch (final KsqlStatementException e) {
throw e;
} catch (final KsqlException e) {
// add the statement text to the KsqlException
throw new KsqlStatementException(e.getMessage(), statement.getStatementText(), e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaResult;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.IdentifierUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -74,14 +74,20 @@ public <T extends Statement> ConfiguredStatement<T> inject(
final ConfiguredStatement<CreateSource> createStatement =
(ConfiguredStatement<CreateSource>) statement;

return (ConfiguredStatement<T>) forCreateStatement(createStatement).orElse(createStatement);
try {
return (ConfiguredStatement<T>) forCreateStatement(createStatement).orElse(createStatement);
} catch (final KsqlStatementException e) {
throw e;
} catch (final KsqlException e) {
throw new KsqlStatementException(e.getMessage(), statement.getStatementText(), e.getCause());
}
}

private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final ConfiguredStatement<CreateSource> statement
) {
if (hasElements(statement)
|| statement.getStatement().getProperties().getValueFormat() != Format.AVRO) {
|| !statement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ private static Set<SerdeOption> build(
: SerdeOption.none();
}

if (!valueFormat.supportsUnwrapping()) {
if (!valueFormat.supportsWrapping()) {
throw new KsqlException("'" + CommonCreateConfigs.WRAP_SINGLE_VALUE
+ "' can not be used with format '"
+ valueFormat + "' as it does not support wrapping");
+ valueFormat.name() + "' as it does not support wrapping");
}

if (!singleField) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KGroupedStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
Expand All @@ -28,7 +27,7 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeOption;
Expand Down Expand Up @@ -87,7 +86,7 @@ public SchemaKTable<?> aggregate(
step = ExecutionStepFactory.streamWindowedAggregate(
contextStacker,
sourceStep,
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
io.confluent.ksql.execution.plan.Formats.of(keyFormat, valueFormat, SerdeOption.none()),
nonAggregateColumns,
aggregations,
windowExpression.get().getKsqlWindowExpression()
Expand All @@ -97,7 +96,7 @@ public SchemaKTable<?> aggregate(
step = ExecutionStepFactory.streamAggregate(
contextStacker,
sourceStep,
Formats.of(keyFormat, valueFormat, SerdeOption.none()),
io.confluent.ksql.execution.plan.Formats.of(keyFormat, valueFormat, SerdeOption.none()),
nonAggregateColumns,
aggregations
);
Expand All @@ -115,7 +114,7 @@ public SchemaKTable<?> aggregate(

private static KeyFormat getKeyFormat(final WindowExpression windowExpression) {
return KeyFormat.windowed(
FormatInfo.of(Format.KAFKA.name()),
FormatInfo.of(FormatFactory.KAFKA.name()),
windowExpression.getKsqlWindowExpression().getWindowInfo()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.confluent.ksql.parser.tree.DropStatement;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand All @@ -45,7 +45,7 @@
* passed through. Furthermore, it will remove the DELETE TOPIC clause from
* the statement, indicating that the operation has already been done.
*
* <p>If the topic being deleted is {@link Format#AVRO},
* <p>If the topic being deleted is {@link FormatFactory#AVRO},
* this injector will also clean up the corresponding schema in the schema
* registry.
*/
Expand Down Expand Up @@ -108,7 +108,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
}

try {
if (source.getKsqlTopic().getValueFormat().getFormat() == Format.AVRO) {
if (source.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) {
SchemaRegistryUtil.deleteSubjectWithRetries(
schemaRegistryClient,
source.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.ValueFormat;
Expand All @@ -39,9 +39,9 @@ public static KsqlTopic create(final CreateSourceProperties properties) {

final KeyFormat keyFormat = windowType
.map(type -> KeyFormat
.windowed(FormatInfo.of(Format.KAFKA.name()), WindowInfo.of(type, windowSize)))
.windowed(FormatInfo.of(FormatFactory.KAFKA.name()), WindowInfo.of(type, windowSize)))
.orElseGet(() -> KeyFormat
.nonWindowed(FormatInfo.of(Format.KAFKA.name())));
.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name())));

final ValueFormat valueFormat = ValueFormat.of(properties.getFormatInfo());

Expand Down
Loading