Skip to content

Commit

Permalink
Merge branch 'master' into command-topic-deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Sep 22, 2020
2 parents f63c88c + 2b29ee0 commit 2d66b31
Show file tree
Hide file tree
Showing 41 changed files with 248 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.api.client.InsertsPublisher;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.OutputRefinement;

import java.util.Objects;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import java.util.Optional;

import io.confluent.ksql.parser.OutputRefinement;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ private KeyFormat buildKeyFormat() {
final Optional<KsqlWindowExpression> ksqlWindow = analysis.getWindowExpression()
.map(WindowExpression::getKsqlWindowExpression);

final KeyFormat keyFormat = analysis
.getFrom()
.getDataSource()
.getKsqlTopic()
.getKeyFormat();

return ksqlWindow
.map(w -> KeyFormat.windowed(
FormatInfo.of(FormatFactory.KAFKA.name()), w.getWindowInfo()))
.orElseGet(() -> analysis
.getFrom()
.getDataSource()
.getKsqlTopic()
.getKeyFormat());
.map(w -> KeyFormat.windowed(keyFormat.getFormatInfo(), w.getWindowInfo()))
.orElse(keyFormat);
}

private Format getValueFormat(final Sink sink) {
Expand All @@ -224,7 +225,6 @@ private FormatInfo getSourceInfo() {
.getFormatInfo();
}


@Override
protected AstNode visitQuery(
final Query node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.util.KsqlException;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,31 @@
import com.google.common.collect.Iterables;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.SerdeOptions;
import io.confluent.ksql.serde.SerdeOptionsFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.topic.TopicFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
Expand Down Expand Up @@ -82,31 +84,32 @@ public CreateStreamCommand createStreamCommand(
final KsqlConfig ksqlConfig
) {
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final CreateSourceProperties props = statement.getProperties();
final String topicName = ensureTopicExists(props, serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
schema
);
final Optional<TimestampColumn> timestampColumn =
buildTimestampColumn(ksqlConfig, props, schema);

final FormatInfo keyFormat = SourcePropertiesUtil.getKeyFormat(props);
final FormatInfo valueFormat = SourcePropertiesUtil.getValueFormat(props);

final SerdeOptions serdeOptions = serdeOptionsSupplier.build(
schema,
topic.getKeyFormat().getFormat(),
topic.getValueFormat().getFormat(),
statement.getProperties().getSerdeOptions(),
FormatFactory.of(keyFormat),
FormatFactory.of(valueFormat),
props.getSerdeOptions(),
ksqlConfig
);

validateSerdesCanHandleSchemas(ksqlConfig, PhysicalSchema.from(schema, serdeOptions), topic);
validateSerdesCanHandleSchemas(ksqlConfig, schema, serdeOptions, keyFormat, valueFormat);

return new CreateStreamCommand(
sourceName,
schema,
timestampColumn,
topic.getKafkaTopicName(),
Formats.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
topic.getKeyFormat().getWindowInfo(),
topicName,
Formats.of(keyFormat, valueFormat, serdeOptions),
getWindowInfo(props),
Optional.of(statement.isOrReplace())
);
}
Expand All @@ -116,47 +119,48 @@ public CreateTableCommand createTableCommand(
final KsqlConfig ksqlConfig
) {
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final CreateSourceProperties props = statement.getProperties();
final String topicName = ensureTopicExists(props, serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
if (schema.key().isEmpty()) {
final boolean usingSchemaInference = statement.getProperties().getSchemaId().isPresent();
final boolean usingSchemaInference = props.getSchemaId().isPresent();

final String additional = usingSchemaInference
? System.lineSeparator()
+ "Use a partial schema to define the primary key and still load the value columns from "
+ "the Schema Registry, for example:"
+ System.lineSeparator()
+ "\tCREATE TABLE " + statement.getName().text() + " (ID INT PRIMARY KEY) WITH (...);"
+ "\tCREATE TABLE " + sourceName.text() + " (ID INT PRIMARY KEY) WITH (...);"
: "";

throw new KsqlException(
"Tables require a PRIMARY KEY. Please define the PRIMARY KEY." + additional
);
}

final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
statement.getProperties(),
schema
);
final Optional<TimestampColumn> timestampColumn =
buildTimestampColumn(ksqlConfig, props, schema);

final FormatInfo keyFormat = SourcePropertiesUtil.getKeyFormat(props);
final FormatInfo valueFormat = SourcePropertiesUtil.getValueFormat(props);

final SerdeOptions serdeOptions = serdeOptionsSupplier.build(
schema,
topic.getKeyFormat().getFormat(),
topic.getValueFormat().getFormat(),
statement.getProperties().getSerdeOptions(),
FormatFactory.of(keyFormat),
FormatFactory.of(valueFormat),
props.getSerdeOptions(),
ksqlConfig
);

validateSerdesCanHandleSchemas(ksqlConfig, PhysicalSchema.from(schema, serdeOptions), topic);
validateSerdesCanHandleSchemas(ksqlConfig, schema, serdeOptions, keyFormat, valueFormat);

return new CreateTableCommand(
sourceName,
schema,
timestampColumn,
topic.getKafkaTopicName(),
Formats.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
topic.getKeyFormat().getWindowInfo(),
topicName,
Formats.of(keyFormat, valueFormat, serdeOptions),
getWindowInfo(props),
Optional.of(statement.isOrReplace())
);
}
Expand All @@ -175,7 +179,11 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
return tableElements.toLogicalSchema();
}

private static KsqlTopic buildTopic(
private static Optional<WindowInfo> getWindowInfo(final CreateSourceProperties props) {
return props.getWindowType().map(type -> WindowInfo.of(type, props.getWindowSize()));
}

private static String ensureTopicExists(
final CreateSourceProperties properties,
final ServiceContext serviceContext
) {
Expand All @@ -184,7 +192,7 @@ private static KsqlTopic buildTopic(
throw new KsqlException("Kafka topic does not exist: " + kafkaTopicName);
}

return TopicFactory.create(properties);
return kafkaTopicName;
}

private static Optional<TimestampColumn> buildTimestampColumn(
Expand All @@ -203,11 +211,15 @@ private static Optional<TimestampColumn> buildTimestampColumn(

private void validateSerdesCanHandleSchemas(
final KsqlConfig ksqlConfig,
final PhysicalSchema physicalSchema,
final KsqlTopic topic
final LogicalSchema schema,
final SerdeOptions serdeOptions,
final FormatInfo keyFormat,
final FormatInfo valueFormat
) {
final PhysicalSchema physicalSchema = PhysicalSchema.from(schema, serdeOptions);

keySerdeFactory.create(
topic.getKeyFormat().getFormatInfo(),
keyFormat,
physicalSchema.keySchema(),
ksqlConfig,
serviceContext.getSchemaRegistryClientFactory(),
Expand All @@ -216,7 +228,7 @@ private void validateSerdesCanHandleSchemas(
).close();

valueSerdeFactory.create(
topic.getValueFormat().getFormatInfo(),
valueFormat,
physicalSchema.valueSchema(),
ksqlConfig,
serviceContext.getSchemaRegistryClientFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {

return schemaKStream.into(
getKsqlTopic().getKafkaTopicName(),
getKsqlTopic().getKeyFormat(),
getKsqlTopic().getValueFormat(),
serdeOptions,
contextStacker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
private Optional<ConfiguredStatement<CreateSource>> forCreateStatement(
final ConfiguredStatement<CreateSource> statement
) {
if (hasValueElements(statement)
|| !supportsSchemaInference(statement)) {
if (hasValueElements(statement) || !valueFormatSupportsSchemaInference(statement)) {
return Optional.empty();
}

Expand Down Expand Up @@ -131,11 +130,12 @@ private static boolean hasValueElements(
.anyMatch(e -> e.getNamespace().equals(Namespace.VALUE));
}

private static boolean supportsSchemaInference(
private static boolean valueFormatSupportsSchemaInference(
final ConfiguredStatement<CreateSource> statement
) {
final FormatInfo valueFormat =
SourcePropertiesUtil.getValueFormat(statement.getStatement().getProperties());
final FormatInfo valueFormat = SourcePropertiesUtil
.getValueFormat(statement.getStatement().getProperties());

return FormatFactory.of(valueFormat).supportsSchemaInference();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,15 @@ private void registerForCreateSource(final ConfiguredStatement<? extends CreateS

final LogicalSchema schema = cs.getStatement().getElements().toLogicalSchema();

final FormatInfo valueFormat =
SourcePropertiesUtil.getValueFormat(cs.getStatement().getProperties());
final FormatInfo keyFormat = SourcePropertiesUtil
.getKeyFormat(cs.getStatement().getProperties());

final FormatInfo valueFormat = SourcePropertiesUtil
.getValueFormat(cs.getStatement().getProperties());

final SerdeOptions serdeOptions = SerdeOptionsFactory.buildForCreateStatement(
schema,
FormatFactory.of(SourcePropertiesUtil.getKeyFormat(cs.getStatement().getProperties())),
FormatFactory.of(keyFormat),
FormatFactory.of(valueFormat),
cs.getStatement().getProperties().getSerdeOptions(),
cs.getSessionConfig().getConfig(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,24 @@ public enum Type { SOURCE, PROJECT, FILTER, AGGREGATE, SINK, REKEY, JOIN }

public SchemaKStream<K> into(
final String kafkaTopicName,
final KeyFormat keyFormat,
final ValueFormat valueFormat,
final SerdeOptions options,
final QueryContext.Stacker contextStacker,
final Optional<TimestampColumn> timestampColumn
) {
if (!this.keyFormat.getWindowInfo().equals(keyFormat.getWindowInfo())) {
throw new IllegalArgumentException("Into can't change windowing");
}

final StreamSink<K> step = ExecutionStepFactory.streamSink(
contextStacker,
Formats.of(keyFormat, valueFormat, options),
sourceStep,
kafkaTopicName,
timestampColumn
);

return new SchemaKStream<>(
step,
resolveSchema(step),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,24 @@ public SchemaKTable(
@Override
public SchemaKTable<K> into(
final String kafkaTopicName,
final KeyFormat keyFormat,
final ValueFormat valueFormat,
final SerdeOptions options,
final QueryContext.Stacker contextStacker,
final Optional<TimestampColumn> timestampColumn
) {
if (!this.keyFormat.getWindowInfo().equals(keyFormat.getWindowInfo())) {
throw new IllegalArgumentException("Can't change windowing");
}

final TableSink<K> step = ExecutionStepFactory.tableSink(
contextStacker,
sourceTableStep,
Formats.of(keyFormat, valueFormat, options),
kafkaTopicName,
timestampColumn
);

return new SchemaKTable<>(
step,
resolveSchema(step),
Expand Down
Loading

0 comments on commit 2d66b31

Please sign in to comment.