From cf29742512378106ccbd50c47b8ebb2d2204afc6 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Tue, 26 Mar 2019 09:23:18 +0000 Subject: [PATCH] Minor: Mark the `keyField` of structured and plan nodes optional. (#2604) * Mark the `keyField` of structured and plan nodes optional. --- .../ksql/physical/PhysicalPlanBuilder.java | 4 +- .../ksql/planner/plan/AggregateNode.java | 5 +- .../ksql/planner/plan/FilterNode.java | 11 ++- .../confluent/ksql/planner/plan/JoinNode.java | 83 ++++++++++--------- .../ksql/planner/plan/KsqlBareOutputNode.java | 4 +- .../plan/KsqlStructuredDataOutputNode.java | 15 ++-- .../confluent/ksql/planner/plan/PlanNode.java | 3 +- .../ksql/planner/plan/ProjectNode.java | 3 +- .../plan/StructuredDataSourceNode.java | 4 +- .../ksql/structured/QueuedSchemaKStream.java | 21 ----- .../ksql/structured/SchemaKGroupedStream.java | 9 +- .../ksql/structured/SchemaKGroupedTable.java | 5 +- .../ksql/structured/SchemaKStream.java | 67 ++++++++------- .../ksql/structured/SchemaKTable.java | 13 +-- .../ksql/planner/LogicalPlannerTest.java | 4 +- .../ksql/planner/plan/AggregateNodeTest.java | 10 ++- .../ksql/planner/plan/JoinNodeTest.java | 7 +- .../KsqlStructuredDataOutputNodeTest.java | 10 +-- .../ksql/planner/plan/ProjectNodeTest.java | 3 +- .../plan/StructuredDataSourceNodeTest.java | 3 +- .../structured/SchemaKGroupedStreamTest.java | 3 +- .../structured/SchemaKGroupedTableTest.java | 5 +- .../ksql/structured/SchemaKStreamTest.java | 49 ++++++----- .../ksql/structured/SchemaKTableTest.java | 21 ++--- .../model/StructuredDataSourceMatchers.java | 10 +++ 25 files changed, 197 insertions(+), 175 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index 14076fb77d11..49c4c413e9b1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -239,7 +239,7 @@ private QueryMetadata buildPlanForStructuredOutputNode( sqlExpression, outputNode.getId().toString(), outputNode.getSchema(), - Optional.ofNullable(schemaKTable.getKeyField()), + schemaKTable.getKeyField(), outputNode.getTimestampExtractionPolicy(), outputNode.getKsqlTopic(), schemaKTable.getKeySerdeFactory() @@ -250,7 +250,7 @@ private QueryMetadata buildPlanForStructuredOutputNode( sqlExpression, outputNode.getId().toString(), outputNode.getSchema(), - Optional.ofNullable(schemaKStream.getKeyField()), + schemaKStream.getKeyField(), outputNode.getTimestampExtractionPolicy(), outputNode.getKsqlTopic(), schemaKStream.getKeySerdeFactory() diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index 6daee04abba8..6fddc54f6ba1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serde; @@ -119,8 +120,8 @@ public Schema getSchema() { } @Override - public Field getKeyField() { - return null; + public Optional getKeyField() { + return Optional.empty(); } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java index e0253a0ae6ba..df9aa28b3fdb 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/FilterNode.java @@ -27,18 +27,18 @@ import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.KsqlConfig; import java.util.List; +import java.util.Optional; import javax.annotation.concurrent.Immutable; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.streams.StreamsBuilder; @Immutable -public class FilterNode - extends PlanNode { +public class FilterNode extends PlanNode { + private final PlanNode source; private final Expression predicate; private final Schema schema; - private final Field keyField; @JsonCreator public FilterNode(@JsonProperty("id") final PlanNodeId id, @@ -49,7 +49,6 @@ public FilterNode(@JsonProperty("id") final PlanNodeId id, this.source = source; this.schema = source.getSchema(); this.predicate = predicate; - this.keyField = source.getKeyField(); } @JsonProperty("predicate") @@ -63,8 +62,8 @@ public Schema getSchema() { } @Override - public Field getKeyField() { - return keyField; + public Optional getKeyField() { + return source.getKeyField(); } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 49a42d53b41d..033ef23be3ff 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; @@ -128,8 +129,8 @@ public Schema getSchema() { } @Override - public Field getKeyField() { - return this.keyField; + public Optional getKeyField() { + return Optional.of(keyField); } @Override @@ -284,7 +285,7 @@ Joiner getJoiner(final DataSource.DataSourceType leftType, } } - private abstract static class Joiner { + private abstract static class Joiner { protected final StreamsBuilder builder; protected final KsqlConfig ksqlConfig; private final ServiceContext serviceContext; @@ -316,9 +317,9 @@ private abstract static class Joiner { this.contextStacker = Objects.requireNonNull(contextStacker, "contextStacker"); } - public abstract SchemaKStream join(); + public abstract SchemaKStream join(); - protected SchemaKStream buildStream(final PlanNode node, final String keyFieldName) { + protected SchemaKStream buildStream(final PlanNode node, final String keyFieldName) { return maybeRePartitionByKey( node.buildStream( @@ -332,11 +333,13 @@ protected SchemaKStream buildStream(final PlanNode node, final String keyFieldNa contextStacker); } - - protected SchemaKTable buildTable(final PlanNode node, - final String keyFieldName, - final String tableName) { - final SchemaKStream schemaKStream = node.buildStream( + @SuppressWarnings("unchecked") + protected SchemaKTable buildTable( + final PlanNode node, + final String keyFieldName, + final String tableName + ) { + final SchemaKStream schemaKStream = node.buildStream( builder, ksqlConfig.cloneWithPropertyOverwrite( Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")), @@ -349,15 +352,15 @@ protected SchemaKTable buildTable(final PlanNode node, throw new RuntimeException("Expected to find a Table, found a stream instead."); } - if (schemaKStream.getKeyField() != null + if (schemaKStream.getKeyField().isPresent() && !keyFieldName.equals(SchemaUtil.ROWKEY_NAME) - && !SchemaUtil.matchFieldName(schemaKStream.getKeyField(), keyFieldName)) { + && !SchemaUtil.matchFieldName(schemaKStream.getKeyField().get(), keyFieldName)) { throw new KsqlException( String.format( "Source table (%s) key column (%s) " + "is not the column used in the join criteria (%s).", tableName, - schemaKStream.getKeyField().name(), + schemaKStream.getKeyField().get().name(), keyFieldName ) ); @@ -366,7 +369,8 @@ protected SchemaKTable buildTable(final PlanNode node, return (SchemaKTable) schemaKStream; } - static SchemaKStream maybeRePartitionByKey( + @SuppressWarnings("unchecked") + static SchemaKStream maybeRePartitionByKey( final SchemaKStream stream, final String targetKey, final QueryContext.Stacker contextStacker) { @@ -408,7 +412,7 @@ Field getJoinKey(final String alias, final String keyFieldName) { } } - private static final class StreamToStreamJoiner extends Joiner { + private static final class StreamToStreamJoiner extends Joiner { private StreamToStreamJoiner( final StreamsBuilder builder, @@ -431,9 +435,9 @@ private StreamToStreamJoiner( contextStacker); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"}) @Override - public SchemaKStream join() { + public SchemaKStream join() { if (joinNode.withinExpression == null) { throw new KsqlException("Stream-Stream joins must have a WITHIN clause specified. None was " + "provided. To learn about how to specify a WITHIN clause with a " @@ -442,17 +446,18 @@ public SchemaKStream join() { + "#create-stream-as-select"); } - final SchemaKStream leftStream = buildStream(joinNode.getLeft(), - joinNode.getLeftKeyFieldName()); - final SchemaKStream rightStream = buildStream(joinNode.getRight(), - joinNode.getRightKeyFieldName()); + final SchemaKStream leftStream = buildStream( + joinNode.getLeft(), joinNode.getLeftKeyFieldName()); + + final SchemaKStream rightStream = buildStream( + joinNode.getRight(), joinNode.getRightKeyFieldName()); switch (joinNode.joinType) { case LEFT: return leftStream.leftJoin(rightStream, joinNode.schema, getJoinKey(joinNode.leftAlias, - leftStream.getKeyField().name()), + leftStream.getKeyField().get().name()), joinNode.withinExpression.joinWindow(), getSerDeForNode( joinNode.left, @@ -465,7 +470,7 @@ public SchemaKStream join() { return leftStream.outerJoin(rightStream, joinNode.schema, getJoinKey(joinNode.leftAlias, - leftStream.getKeyField().name()), + leftStream.getKeyField().get().name()), joinNode.withinExpression.joinWindow(), getSerDeForNode( joinNode.left, @@ -478,7 +483,7 @@ public SchemaKStream join() { return leftStream.join(rightStream, joinNode.schema, getJoinKey(joinNode.leftAlias, - leftStream.getKeyField().name()), + leftStream.getKeyField().get().name()), joinNode.withinExpression.joinWindow(), getSerDeForNode( joinNode.left, @@ -493,7 +498,7 @@ public SchemaKStream join() { } } - private static final class StreamToTableJoiner extends Joiner { + private static final class StreamToTableJoiner extends Joiner { private StreamToTableJoiner( final StreamsBuilder builder, @@ -516,19 +521,19 @@ private StreamToTableJoiner( contextStacker); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"}) @Override - public SchemaKStream join() { + public SchemaKStream join() { if (joinNode.withinExpression != null) { throw new KsqlException("A window definition was provided for a Stream-Table join. These " + "joins are not windowed. Please drop the window definition (ie." + " the WITHIN clause) and try to execute your join again."); } - final SchemaKTable rightTable = buildTable(joinNode.getRight(), + final SchemaKTable rightTable = buildTable(joinNode.getRight(), joinNode.getRightKeyFieldName(), joinNode.getRightAlias()); - final SchemaKStream leftStream = buildStream(joinNode.getLeft(), + final SchemaKStream leftStream = buildStream(joinNode.getLeft(), joinNode.getLeftKeyFieldName()); switch (joinNode.joinType) { @@ -536,7 +541,7 @@ public SchemaKStream join() { return leftStream.leftJoin(rightTable, joinNode.schema, getJoinKey(joinNode.leftAlias, - leftStream.getKeyField().name()), + leftStream.getKeyField().get().name()), getSerDeForNode( joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), @@ -546,7 +551,7 @@ public SchemaKStream join() { return leftStream.join(rightTable, joinNode.schema, getJoinKey(joinNode.leftAlias, - leftStream.getKeyField().name()), + leftStream.getKeyField().get().name()), getSerDeForNode( joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), @@ -561,7 +566,7 @@ public SchemaKStream join() { } } - private static final class TableToTableJoiner extends Joiner { + private static final class TableToTableJoiner extends Joiner { TableToTableJoiner( final StreamsBuilder builder, @@ -584,9 +589,9 @@ private static final class TableToTableJoiner extends Joiner { contextStacker); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"}) @Override - public SchemaKTable join() { + public SchemaKTable join() { if (joinNode.withinExpression != null) { throw new KsqlException("A window definition was provided for a Table-Table join. These " + "joins are not windowed. Please drop the window definition " @@ -594,10 +599,10 @@ public SchemaKTable join() { + "join again."); } - final SchemaKTable leftTable = buildTable(joinNode.getLeft(), + final SchemaKTable leftTable = buildTable(joinNode.getLeft(), joinNode.getLeftKeyFieldName(), joinNode.getLeftAlias()); - final SchemaKTable rightTable = buildTable(joinNode.getRight(), + final SchemaKTable rightTable = buildTable(joinNode.getRight(), joinNode.getRightKeyFieldName(), joinNode.getRightAlias()); @@ -606,19 +611,19 @@ public SchemaKTable join() { return leftTable.leftJoin( rightTable, joinNode.schema, - getJoinKey(joinNode.leftAlias, leftTable.getKeyField().name()), + getJoinKey(joinNode.leftAlias, leftTable.getKeyField().get().name()), contextStacker); case INNER: return leftTable.join( rightTable, joinNode.schema, - getJoinKey(joinNode.leftAlias, leftTable.getKeyField().name()), + getJoinKey(joinNode.leftAlias, leftTable.getKeyField().get().name()), contextStacker); case OUTER: return leftTable.outerJoin( rightTable, joinNode.schema, - getJoinKey(joinNode.leftAlias, leftTable.getKeyField().name()), + getJoinKey(joinNode.leftAlias, leftTable.getKeyField().get().name()), contextStacker); default: throw new KsqlException("Invalid join type encountered: " + joinNode.joinType); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlBareOutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlBareOutputNode.java index 7e01cc455a4e..212374c49204 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlBareOutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlBareOutputNode.java @@ -51,8 +51,8 @@ public QueryId getQueryId(final QueryIdGenerator queryIdGenerator) { } @Override - public Field getKeyField() { - return null; + public Optional getKeyField() { + return Optional.empty(); } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java index 82954e957a44..2b2aed948e39 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java @@ -40,6 +40,7 @@ import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import org.apache.kafka.clients.admin.TopicDescription; @@ -51,7 +52,7 @@ public class KsqlStructuredDataOutputNode extends OutputNode { private final String kafkaTopicName; private final KsqlTopic ksqlTopic; - private final Field keyField; + private final Optional keyField; private final boolean doCreateInto; private final Map outputProperties; @@ -61,7 +62,7 @@ public KsqlStructuredDataOutputNode( @JsonProperty("source") final PlanNode source, @JsonProperty("schema") final Schema schema, @JsonProperty("timestamp") final TimestampExtractionPolicy timestampExtractionPolicy, - @JsonProperty("key") final Field keyField, + @JsonProperty("key") final Optional keyField, @JsonProperty("ksqlTopic") final KsqlTopic ksqlTopic, @JsonProperty("topicName") final String kafkaTopicName, @JsonProperty("outputProperties") final Map outputProperties, @@ -69,7 +70,7 @@ public KsqlStructuredDataOutputNode( @JsonProperty("doCreateInto") final boolean doCreateInto) { super(id, source, schema, limit, timestampExtractionPolicy); this.kafkaTopicName = kafkaTopicName; - this.keyField = keyField; + this.keyField = Objects.requireNonNull(keyField, "keyField"); this.ksqlTopic = ksqlTopic; this.outputProperties = outputProperties; this.doCreateInto = doCreateInto; @@ -96,7 +97,7 @@ public QueryId getQueryId(final QueryIdGenerator queryIdGenerator) { } @Override - public Field getKeyField() { + public Optional getKeyField() { return keyField; } @@ -216,7 +217,7 @@ private SchemaKStream createOutputStream( keyFieldName ))); - outputNodeBuilder.withKeyField(keyField); + outputNodeBuilder.withKeyField(Optional.of(keyField)); return result.selectKey(keyField, false, contextStacker); } return result; @@ -335,7 +336,7 @@ public static class Builder { private PlanNode source; private Schema schema; private TimestampExtractionPolicy timestampExtractionPolicy; - private Field keyField; + private Optional keyField; private KsqlTopic ksqlTopic; private String kafkaTopicName; private Map outputProperties; @@ -390,7 +391,7 @@ Builder withKsqlTopic(final KsqlTopic ksqlTopic) { return this; } - Builder withKeyField(final Field keyField) { + Builder withKeyField(final Optional keyField) { this.keyField = keyField; return this; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java index a5e849e924ea..994b45508a0d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java @@ -28,6 +28,7 @@ import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.KsqlConfig; import java.util.List; +import java.util.Optional; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.streams.StreamsBuilder; @@ -56,7 +57,7 @@ public DataSourceType getNodeOutputType() { public abstract Schema getSchema(); - public abstract Field getKeyField(); + public abstract Optional getKeyField(); public abstract List getSources(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java index b7784f329490..b03d6538fa0a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/ProjectNode.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import javax.annotation.concurrent.Immutable; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -82,7 +83,7 @@ protected int getPartitions(final KafkaTopicClient kafkaTopicClient) { } @Override - public Field getKeyField() { + public Optional getKeyField() { return source.getKeyField(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/StructuredDataSourceNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/StructuredDataSourceNode.java index f4622af22f84..7aaaa8b84287 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/StructuredDataSourceNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/StructuredDataSourceNode.java @@ -129,8 +129,8 @@ public Schema getSchema() { } @Override - public Field getKeyField() { - return structuredDataSource.getKeyField().orElse(null); + public Optional getKeyField() { + return structuredDataSource.getKeyField(); } public StructuredDataSource getStructuredDataSource() { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java index 6f1c833f649e..be0c68ec181f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/QueuedSchemaKStream.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.streams.kstream.KStream; public class QueuedSchemaKStream extends SchemaKStream { @@ -96,24 +95,4 @@ public SchemaKGroupedStream groupBy( final QueryContext.Stacker contextStacker) { throw new UnsupportedOperationException(); } - - @Override - public Field getKeyField() { - return super.getKeyField(); - } - - @Override - public Schema getSchema() { - return super.getSchema(); - } - - @Override - public KStream getKstream() { - return super.getKstream(); - } - - @Override - public List getSourceSchemaKStreams() { - return super.getSourceSchemaKStreams(); - } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java index 57d8a6fa67de..0f8450c3fb5b 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -48,7 +49,7 @@ public class SchemaKGroupedStream { final Schema schema; final KGroupedStream kgroupedStream; - final Field keyField; + final Optional keyField; final List sourceSchemaKStreams; final KsqlConfig ksqlConfig; final FunctionRegistry functionRegistry; @@ -57,7 +58,7 @@ public class SchemaKGroupedStream { SchemaKGroupedStream( final Schema schema, final KGroupedStream kgroupedStream, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final KsqlConfig ksqlConfig, final FunctionRegistry functionRegistry @@ -76,7 +77,7 @@ public class SchemaKGroupedStream { SchemaKGroupedStream( final Schema schema, final KGroupedStream kgroupedStream, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final KsqlConfig ksqlConfig, final FunctionRegistry functionRegistry, @@ -91,7 +92,7 @@ public class SchemaKGroupedStream { this.materializedFactory = materializedFactory; } - public Field getKeyField() { + public Optional getKeyField() { return keyField; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java index c7e966f8fc9c..75ccfdf14901 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -45,7 +46,7 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream { SchemaKGroupedTable( final Schema schema, final KGroupedTable kgroupedTable, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final KsqlConfig ksqlConfig, final FunctionRegistry functionRegistry @@ -63,7 +64,7 @@ public class SchemaKGroupedTable extends SchemaKGroupedStream { SchemaKGroupedTable( final Schema schema, final KGroupedTable kgroupedTable, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final KsqlConfig ksqlConfig, final FunctionRegistry functionRegistry, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 97b22c384a39..36f445fca885 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -62,7 +63,7 @@ public enum Type { SOURCE, PROJECT, FILTER, AGGREGATE, SINK, REKEY, JOIN } final Schema schema; final KStream kstream; - final Field keyField; + final Optional keyField; final List sourceSchemaKStreams; final Type type; final KsqlConfig ksqlConfig; @@ -75,7 +76,7 @@ public enum Type { SOURCE, PROJECT, FILTER, AGGREGATE, SINK, REKEY, JOIN } public SchemaKStream( final Schema schema, final KStream kstream, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final SerdeFactory keySerdeFactory, final Type type, @@ -99,7 +100,7 @@ public SchemaKStream( SchemaKStream( final Schema schema, final KStream kstream, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final SerdeFactory keySerdeFactory, final Type type, @@ -110,7 +111,7 @@ public SchemaKStream( ) { this.schema = schema; this.kstream = kstream; - this.keyField = keyField; + this.keyField = Objects.requireNonNull(keyField, "keyField"); this.sourceSchemaKStreams = sourceSchemaKStreams; this.type = type; this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); @@ -207,7 +208,7 @@ public SchemaKStream select( class Selection { private final Schema schema; - private final Field key; + private final Optional key; private final SelectValueMapper selectValueMapper; Selection( @@ -225,13 +226,15 @@ class Selection { processingLogger); } - private Field findKeyField(final List selectExpressions) { - if (getKeyField() == null) { - return null; + private Optional findKeyField(final List selectExpressions) { + if (!getKeyField().isPresent()) { + return Optional.empty(); } - if (getKeyField().index() == -1) { + + final Field keyField = getKeyField().get(); + if (keyField.index() == -1) { // The key "field" isn't an actual field in the schema - return getKeyField(); + return Optional.of(keyField); } for (int i = 0; i < selectExpressions.size(); i++) { final String toName = selectExpressions.get(i).getName(); @@ -247,20 +250,20 @@ private Field findKeyField(final List selectExpressions) { if (toExpression instanceof DereferenceExpression) { final DereferenceExpression dereferenceExpression = (DereferenceExpression) toExpression; - if (SchemaUtil.matchFieldName(getKeyField(), dereferenceExpression.toString())) { - return new Field(toName, i, getKeyField().schema()); + if (SchemaUtil.matchFieldName(keyField, dereferenceExpression.toString())) { + return Optional.of(new Field(toName, i, keyField.schema())); } } else if (toExpression instanceof QualifiedNameReference) { final QualifiedNameReference qualifiedNameReference = (QualifiedNameReference) toExpression; if (SchemaUtil.matchFieldName( - getKeyField(), + keyField, qualifiedNameReference.getName().getSuffix())) { - return new Field(toName, i, getKeyField().schema()); + return Optional.of(new Field(toName, i, keyField.schema())); } } } - return null; + return Optional.empty(); } private Schema buildSchema( @@ -287,7 +290,7 @@ public Schema getProjectedSchema() { return schema; } - public Field getKey() { + public Optional getKey() { return key; } @@ -319,7 +322,7 @@ public SchemaKStream leftJoin( return new SchemaKStream( joinSchema, joinedKStream, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, schemaKTable), keySerdeFactory, Type.JOIN, @@ -355,7 +358,7 @@ public SchemaKStream leftJoin( return new SchemaKStream<>( joinSchema, joinStream, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, otherSchemaKStream), keySerdeFactory, Type.JOIN, @@ -387,7 +390,7 @@ public SchemaKStream join( return new SchemaKStream<>( joinSchema, joinedKStream, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, schemaKTable), keySerdeFactory, Type.JOIN, @@ -422,7 +425,7 @@ public SchemaKStream join( return new SchemaKStream<>( joinSchema, joinStream, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, otherSchemaKStream), keySerdeFactory, Type.JOIN, @@ -455,7 +458,7 @@ public SchemaKStream outerJoin( return new SchemaKStream<>( joinSchema, joinStream, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, otherSchemaKStream), keySerdeFactory, Type.JOIN, @@ -470,8 +473,14 @@ public SchemaKStream outerJoin( public SchemaKStream selectKey( final Field newKeyField, final boolean updateRowKey, - final QueryContext.Stacker contextStacker) { - if (keyField != null && keyField.name().equals(newKeyField.name())) { + final QueryContext.Stacker contextStacker + ) { + final boolean namesMatch = keyField + .map(Field::name) + .map(name -> name.equals(newKeyField.name())) + .orElse(false); + + if (namesMatch) { return this; } @@ -489,7 +498,7 @@ && extractColumn(newKeyField, value) != null) return new SchemaKStream<>( schema, keyedKStream, - newKeyField, + Optional.of(newKeyField), Collections.singletonList(this), Serdes::String, Type.REKEY, @@ -522,8 +531,8 @@ private boolean rekeyRequired(final List groupByExpressions) { return true; } - final Field keyField = getKeyField(); - if (keyField == null) { + final Optional keyField = getKeyField(); + if (!keyField.isPresent()) { return true; } @@ -532,7 +541,7 @@ private boolean rekeyRequired(final List groupByExpressions) { return true; } - final String keyFieldName = SchemaUtil.getFieldNameWithNoAlias(keyField); + final String keyFieldName = SchemaUtil.getFieldNameWithNoAlias(keyField.get()); return !groupByField.equals(keyFieldName); } @@ -578,14 +587,14 @@ public SchemaKGroupedStream groupBy( return new SchemaKGroupedStream( schema, kgroupedStream, - newKeyField, + Optional.of(newKeyField), Collections.singletonList(this), ksqlConfig, functionRegistry ); } - public Field getKeyField() { + public Optional getKeyField() { return keyField; } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 6758df76b43a..6bed36ff1772 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -48,7 +49,7 @@ public class SchemaKTable extends SchemaKStream { public SchemaKTable( final Schema schema, final KTable ktable, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final SerdeFactory keySerdeFactory, final Type type, @@ -73,7 +74,7 @@ public SchemaKTable( SchemaKTable( final Schema schema, final KTable ktable, - final Field keyField, + final Optional keyField, final List sourceSchemaKStreams, final SerdeFactory keySerdeFactory, final Type type, @@ -209,7 +210,7 @@ public SchemaKGroupedStream groupBy( return new SchemaKGroupedTable( schema, kgroupedTable, - newKeyField, + Optional.of(newKeyField), Collections.singletonList(this), ksqlConfig, functionRegistry); @@ -230,7 +231,7 @@ public SchemaKTable join( return new SchemaKTable<>( joinSchema, joinedKTable, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, schemaKTable), keySerdeFactory, Type.JOIN, @@ -256,7 +257,7 @@ public SchemaKTable leftJoin( return new SchemaKTable<>( joinSchema, joinedKTable, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, schemaKTable), keySerdeFactory, Type.JOIN, @@ -282,7 +283,7 @@ public SchemaKTable outerJoin( return new SchemaKTable<>( joinSchema, joinedKTable, - joinKey, + Optional.of(joinKey), ImmutableList.of(this, schemaKTable), keySerdeFactory, Type.JOIN, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java index cf3467ac925c..9fe4170995c3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.planner; +import static io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.FieldMatchers.hasName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -22,6 +23,7 @@ import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.StructuredDataSource; +import io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.OptionalMatchers; import io.confluent.ksql.planner.plan.AggregateNode; import io.confluent.ksql.planner.plan.FilterNode; import io.confluent.ksql.planner.plan.JoinNode; @@ -103,7 +105,7 @@ public void testSimpleLeftJoinFilterLogicalPlan() { assertThat(logicalPlan.getSources().get(0), instanceOf(ProjectNode.class)); final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0); - assertThat(projectNode.getKeyField().name(), equalTo("t1.col1".toUpperCase())); + assertThat(projectNode.getKeyField(), OptionalMatchers.of(hasName("T1.COL1"))); assertThat(projectNode.getSchema().fields().size(), equalTo(5)); assertThat(projectNode.getSources().get(0), instanceOf(FilterNode.class)); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java index 044457e62eef..99d5b7d38a3e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.planner.plan; +import static io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.FieldMatchers.hasName; import static io.confluent.ksql.planner.plan.PlanTestUtil.MAPVALUES_NODE; import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE; import static io.confluent.ksql.planner.plan.PlanTestUtil.getNodeByName; @@ -43,6 +44,7 @@ import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLoggerUtil; import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.OptionalMatchers; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.structured.QueryContext; @@ -351,21 +353,21 @@ public void shouldCreateLoggerForStatestore() { @Test public void shouldGroupByFunction() { // Given: - final SchemaKStream stream = buildQuery("SELECT UCASE(col1), sum(col3), count(col3) FROM test1 " + final SchemaKStream stream = buildQuery("SELECT UCASE(col1), sum(col3), count(col3) FROM test1 " + "GROUP BY UCASE(col1);"); // Then: - assertThat(stream.getKeyField().name(), is("UCASE(KSQL_INTERNAL_COL_0)")); + assertThat(stream.getKeyField(), OptionalMatchers.of(hasName("UCASE(KSQL_INTERNAL_COL_0)"))); } @Test public void shouldGroupByArithmetic() { // Given: - final SchemaKStream stream = buildQuery("SELECT col0 + 10, sum(col3), count(col3) FROM test1 " + final SchemaKStream stream = buildQuery("SELECT col0 + 10, sum(col3), count(col3) FROM test1 " + "GROUP BY col0 + 10;"); // Then: - assertThat(stream.getKeyField().name(), is("(KSQL_INTERNAL_COL_0 + 10)")); + assertThat(stream.getKeyField(), OptionalMatchers.of(hasName("(KSQL_INTERNAL_COL_0 + 10)"))); } private SchemaKStream buildQuery(final String queryString) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index eb95c5a26f72..b7db81d07c5b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -150,6 +150,7 @@ public void setUp() { EasyMock.expect(mockKsqlConfig.cloneWithPropertyOverwrite( Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))) .andStubReturn(mockKsqlConfigClonedWithOffsetReset); + EasyMock.expect(rightSchemaKTable.getKeyField()).andReturn(Optional.empty()).anyTimes(); EasyMock.replay(serviceContext, mockKsqlConfig); @@ -557,10 +558,10 @@ private static Optional getNonKeyColumn(final Schema schema, final Strin ); } - @SuppressWarnings("unchecked") @Test public void shouldFailJoinIfTableCriteriaColumnIsNotKey() { setupStream(left, CONTEXT_STACKER, leftSchemaKStream, leftSchema, 2); + EasyMock.reset(rightSchemaKTable); setupTable(right, rightSchemaKTable, rightSchema, 2); expectKeyField(rightSchemaKTable, rightKeyFieldName); replay(left, right, leftSchemaKStream, rightSchemaKTable); @@ -835,6 +836,7 @@ public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() { public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() { setupTable(left, leftSchemaKTable, leftSchema, 2); expectKeyField(leftSchemaKTable, leftKeyFieldName); + EasyMock.reset(rightSchemaKTable); setupTable(right, rightSchemaKTable, rightSchema, 2); expectKeyField(rightSchemaKTable, rightKeyFieldName); replay(left, right, leftSchemaKTable, rightSchemaKTable); @@ -1115,10 +1117,9 @@ private void setupStreamWithoutSerde( expectBuildStream(node, contextStacker, stream, schema); } - private static void expectKeyField(final SchemaKStream stream, final String keyFieldName) { final Field field = niceMock(Field.class); - expect(stream.getKeyField()).andStubReturn(field); + expect(stream.getKeyField()).andStubReturn(Optional.of(field)); expect(field.name()).andStubReturn(keyFieldName); replay(field); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index d28c297ac99e..6ea04152bbb7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -22,6 +22,7 @@ import static io.confluent.ksql.planner.plan.PlanTestUtil.verifyProcessorNode; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -167,7 +168,7 @@ private void createOutputNode(final Map props, final boolean cre sourceNode, schema, new LongColumnTimestampExtractionPolicy("timestamp"), - schema.field("key"), + Optional.of(schema.field("key")), new KsqlTopic(SINK_TOPIC_NAME, SINK_KAFKA_TOPIC_NAME, new KsqlJsonTopicSerDe(), true), SINK_KAFKA_TOPIC_NAME, props, @@ -245,8 +246,7 @@ public void shouldPartitionByFieldNameInPartitionByProperty() { stream = buildStream(); // Then: - final Field keyField = stream.getKeyField(); - assertThat(keyField, equalTo(new Field("field2", 1, Schema.OPTIONAL_STRING_SCHEMA))); + assertThat(stream.getKeyField(), is(Optional.of(new Field("field2", 1, Schema.OPTIONAL_STRING_SCHEMA)))); assertThat(stream.getSchema().fields(), equalTo(schema.fields())); } @@ -458,7 +458,7 @@ public void shouldUseCorrectLoggerNameForSerializer() { sourceNode, schema, new LongColumnTimestampExtractionPolicy("timestamp"), - schema.field("key"), + Optional.ofNullable(schema.field("key")), mockTopic(topicSerde), "output", Collections.emptyMap(), @@ -507,7 +507,7 @@ private KsqlStructuredDataOutputNode getKsqlStructuredDataOutputNodeForTable tableSourceNode, schema, new MetadataTimestampExtractionPolicy(), - schema.field("key"), + Optional.ofNullable(schema.field("key")), new KsqlTopic(SINK_TOPIC_NAME, SINK_KAFKA_TOPIC_NAME, new KsqlJsonTopicSerDe(), true), SINK_KAFKA_TOPIC_NAME, props, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java index d5cfc6e68206..f76ec48c483f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -143,7 +144,7 @@ public void shouldCreateProjectionWithFieldNameExpressionPairs() { @SuppressWarnings("unchecked") private void mockSourceNode() { when(source.getKeyField()) - .thenReturn(new Field("field1", 0, Schema.OPTIONAL_STRING_SCHEMA)); + .thenReturn(Optional.of(new Field("field1", 0, Schema.OPTIONAL_STRING_SCHEMA))); when(source.buildStream( any(StreamsBuilder.class), any(KsqlConfig.class), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/StructuredDataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/StructuredDataSourceNodeTest.java index 161110a4f0b0..7e4a3ca45177 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/StructuredDataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/StructuredDataSourceNodeTest.java @@ -272,7 +272,8 @@ public void shouldBeOfTypeSchemaKStreamWhenDataSourceIsKsqlStream() { @Test public void shouldExtracKeyField() { - assertThat(realStream.getKeyField(), equalTo(new Field("key", 4, Schema.OPTIONAL_STRING_SCHEMA))); + assertThat(realStream.getKeyField(), + equalTo(Optional.of(new Field("key", 4, Schema.OPTIONAL_STRING_SCHEMA)))); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index f14606ccba23..7cd6712a4226 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.connect.data.Field; @@ -67,7 +68,7 @@ public class SchemaKGroupedStreamTest { @Mock private KGroupedStream groupedStream; @Mock - private Field keyField; + private Optional keyField; @Mock private List sourceStreams; @Mock diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java index aae92a8a19d2..b10b49f0d4f7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java @@ -55,6 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serde; @@ -112,7 +113,7 @@ private SchemaKGroupedTable buildSchemaKGroupedTableFromQuery( final SchemaKTable initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), kTable, - ksqlTable.getKeyField().get(), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -207,7 +208,7 @@ private SchemaKGroupedTable buildSchemaKGroupedTable( return new SchemaKGroupedTable( schema, kGroupedTable, - schema.fields().get(0), + Optional.of(schema.fields().get(0)), Collections.emptyList(), ksqlConfig, functionRegistry, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 711042c492db..fa2a99bf9869 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -15,8 +15,9 @@ package io.confluent.ksql.structured; +import static io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.FieldMatchers.hasIndex; +import static io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.FieldMatchers.hasName; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -38,6 +39,7 @@ import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.metastore.model.KsqlTopic; +import io.confluent.ksql.metastore.model.StructuredDataSourceMatchers.OptionalMatchers; import io.confluent.ksql.parser.tree.DereferenceExpression; import io.confluent.ksql.parser.tree.Expression; import io.confluent.ksql.parser.tree.FunctionCall; @@ -64,6 +66,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; @@ -169,7 +172,7 @@ public void init() { schemaKTable = new SchemaKTable<>( ksqlTable.getSchema(), kTable, - ksqlTable.getKeyField().get(), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -236,7 +239,7 @@ public void shouldUpdateKeyIfRenamed() { processingLogContext); assertThat( projectedSchemaKStream.getKeyField(), - equalTo(new Field("NEWKEY", 0, Schema.OPTIONAL_INT64_SCHEMA))); + equalTo(Optional.of(new Field("NEWKEY", 0, Schema.OPTIONAL_INT64_SCHEMA)))); } @Test @@ -266,7 +269,7 @@ public void shouldUpdateKeyIfMovedToDifferentIndex() { processingLogContext); assertThat( projectedSchemaKStream.getKeyField(), - equalTo(new Field("COL0", 1, Schema.OPTIONAL_INT64_SCHEMA))); + equalTo(Optional.of(new Field("COL0", 1, Schema.OPTIONAL_INT64_SCHEMA)))); } @Test @@ -279,7 +282,7 @@ public void shouldDropKeyIfNotSelected() { selectExpressions, childContextStacker, processingLogContext); - assertThat(projectedSchemaKStream.getKeyField(), nullValue()); + assertThat(projectedSchemaKStream.getKeyField(), is(Optional.empty())); } @Test @@ -345,11 +348,11 @@ public void testFilter() { @Test public void testSelectKey() { givenInitialKStreamOf("SELECT col0, col2, col3 FROM test1 WHERE col0 > 100;"); - final SchemaKStream rekeyedSchemaKStream = initialSchemaKStream.selectKey( + final SchemaKStream rekeyedSchemaKStream = initialSchemaKStream.selectKey( initialSchemaKStream.getSchema().fields().get(3), true, childContextStacker); - assertThat(rekeyedSchemaKStream.getKeyField().name().toUpperCase(), equalTo("TEST1.COL1")); + assertThat(rekeyedSchemaKStream.getKeyField(), OptionalMatchers.of(hasName("TEST1.COL1"))); } @Test @@ -369,8 +372,8 @@ public void testGroupByKey() { childContextStacker); // Then: - assertThat(groupedSchemaKStream.getKeyField().name(), is("COL0")); - assertThat(groupedSchemaKStream.getKeyField().index(), is(2)); + assertThat(groupedSchemaKStream.getKeyField(), OptionalMatchers.of(hasName("COL0"))); + assertThat(groupedSchemaKStream.getKeyField(), OptionalMatchers.of(hasIndex(2))); } @Test @@ -392,8 +395,8 @@ public void testGroupByMultipleColumns() { childContextStacker); // Then: - assertThat(groupedSchemaKStream.getKeyField().name(), is("TEST1.COL1|+|TEST1.COL0")); - assertThat(groupedSchemaKStream.getKeyField().index(), is(-1)); + assertThat(groupedSchemaKStream.getKeyField(), OptionalMatchers.of(hasName("TEST1.COL1|+|TEST1.COL0"))); + assertThat(groupedSchemaKStream.getKeyField(), OptionalMatchers.of(hasIndex(-1))); } @Test @@ -410,8 +413,8 @@ public void testGroupByMoreComplexExpression() { childContextStacker); // Then: - assertThat(groupedSchemaKStream.getKeyField().name(), is("UCASE(TEST1.COL1)")); - assertThat(groupedSchemaKStream.getKeyField().index(), is(-1)); + assertThat(groupedSchemaKStream.getKeyField(), OptionalMatchers.of(hasName("UCASE(TEST1.COL1)"))); + assertThat(groupedSchemaKStream.getKeyField(), OptionalMatchers.of(hasIndex(-1))); } @Test @@ -506,7 +509,7 @@ public void shouldPerformStreamToStreamLeftJoin() { assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(initialSchemaKStream, secondSchemaKStream), joinedKStream.sourceSchemaKStreams); } @@ -548,7 +551,7 @@ public void shouldPerformStreamToStreamInnerJoin() { assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(initialSchemaKStream, secondSchemaKStream), joinedKStream.sourceSchemaKStreams); } @@ -589,7 +592,7 @@ public void shouldPerformStreamToStreamOuterJoin() { assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(initialSchemaKStream, secondSchemaKStream), joinedKStream.sourceSchemaKStreams); } @@ -625,7 +628,7 @@ public void shouldPerformStreamToTableLeftJoin() { assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(initialSchemaKStream, schemaKTable), joinedKStream.sourceSchemaKStreams); } @@ -663,7 +666,7 @@ public void shouldPerformStreamToTableInnerJoin() { assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(initialSchemaKStream, schemaKTable), joinedKStream.sourceSchemaKStreams); } @@ -677,7 +680,7 @@ public void shouldSummarizeExecutionPlanCorrectly() { final SchemaKStream schemaKtream = new SchemaKStream( simpleSchema, mock(KStream.class), - simpleSchema.field("key"), + Optional.of(simpleSchema.field("key")), ImmutableList.of(parentSchemaKStream), Serdes::String, Type.SOURCE, @@ -698,7 +701,7 @@ public void shouldSummarizeExecutionPlanCorrectlyForRoot() { final SchemaKStream schemaKtream = new SchemaKStream( simpleSchema, mock(KStream.class), - simpleSchema.field("key"), + Optional.of(simpleSchema.field("key")), Collections.emptyList(), Serdes::String, Type.SOURCE, @@ -724,7 +727,7 @@ public void shouldSummarizeExecutionPlanCorrectlyWhenMultipleParents() { final SchemaKStream schemaKtream = new SchemaKStream( simpleSchema, mock(KStream.class), - simpleSchema.field("key"), + Optional.of(simpleSchema.field("key")), ImmutableList.of(parentSchemaKStream1, parentSchemaKStream2), Serdes::String, Type.SOURCE, @@ -767,7 +770,7 @@ private SchemaKStream buildSchemaKStream( return new SchemaKStream( schema, kStream, - ksqlStream.getKeyField().orElse(null), + ksqlStream.getKeyField(), new ArrayList<>(), Serdes::String, Type.SOURCE, @@ -833,7 +836,7 @@ private PlanNode givenInitialKStreamOf(final String selectQuery) { initialSchemaKStream = new SchemaKStream( logicalPlan.getTheSourceNode().getSchema(), kStream, - ksqlStream.getKeyField().orElse(null), + ksqlStream.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index ab4ae0f0cde7..0699d3fa3939 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -59,6 +59,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.connect.data.Field; @@ -130,7 +131,7 @@ private SchemaKTable buildSchemaKTable( return new SchemaKTable( schema, kTable, - ksqlTable.getKeyField().orElse(null), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, Type.SOURCE, @@ -178,7 +179,7 @@ public void testSelectSchemaKStream() { initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), kTable, - ksqlTable.getKeyField().orElse(null), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -217,7 +218,7 @@ public void testSelectWithExpression() { initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), kTable, - ksqlTable.getKeyField().orElse(null), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -259,7 +260,7 @@ public void testFilter() { initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), kTable, - ksqlTable.getKeyField().orElse(null), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -302,7 +303,7 @@ public void testGroupBy() { initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), kTable, - ksqlTable.getKeyField().orElse(null), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -329,7 +330,7 @@ public void testGroupBy() { childContextStacker); assertThat(groupedSchemaKTable, instanceOf(SchemaKGroupedTable.class)); - assertThat(groupedSchemaKTable.getKeyField().name(), equalTo("TEST2.COL2|+|TEST2.COL1")); + assertThat(groupedSchemaKTable.getKeyField().get().name(), equalTo("TEST2.COL2|+|TEST2.COL1")); } @Test @@ -377,7 +378,7 @@ public void shouldGroupKeysCorrectly() { initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), mockKTable, - ksqlTable.getKeyField().orElse(null), + ksqlTable.getKeyField(), new ArrayList<>(), Serdes::String, SchemaKStream.Type.SOURCE, @@ -433,7 +434,7 @@ public void shouldPerformTableToTableLeftJoin() { assertThat(joinedKStream, instanceOf(SchemaKTable.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(firstSchemaKTable, secondSchemaKTable), joinedKStream.sourceSchemaKStreams); } @@ -455,7 +456,7 @@ public void shouldPerformTableToTableInnerJoin() { assertThat(joinedKStream, instanceOf(SchemaKTable.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(firstSchemaKTable, secondSchemaKTable), joinedKStream.sourceSchemaKStreams); } @@ -477,7 +478,7 @@ public void shouldPerformTableToTableOuterJoin() { assertThat(joinedKStream, instanceOf(SchemaKTable.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); assertEquals(joinSchema, joinedKStream.schema); - assertEquals(joinSchema.fields().get(0), joinedKStream.keyField); + assertEquals(Optional.of(joinSchema.fields().get(0)), joinedKStream.keyField); assertEquals(Arrays.asList(firstSchemaKTable, secondSchemaKTable), joinedKStream.sourceSchemaKStreams); diff --git a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceMatchers.java b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceMatchers.java index b41986d9473e..d460634a2f60 100644 --- a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceMatchers.java +++ b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceMatchers.java @@ -71,6 +71,16 @@ protected String featureValueOf(final Field actual) { }; } + public static Matcher hasIndex(final int index) { + return new FeatureMatcher + (is(index), "field with index", "index") { + @Override + protected Integer featureValueOf(final Field actual) { + return actual.index(); + } + }; + } + public static Matcher hasSchema(final Schema schema) { return new FeatureMatcher (is(schema), "field with schema", "schema") {