Skip to content

Commit

Permalink
Minor: Mark the keyField of structured and plan nodes optional. (#2604
Browse files Browse the repository at this point in the history
)

* Mark the `keyField` of structured and plan nodes optional.
  • Loading branch information
big-andy-coates authored Mar 26, 2019
1 parent a9efa39 commit cf29742
Show file tree
Hide file tree
Showing 25 changed files with 197 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
sqlExpression,
outputNode.getId().toString(),
outputNode.getSchema(),
Optional.ofNullable(schemaKTable.getKeyField()),
schemaKTable.getKeyField(),
outputNode.getTimestampExtractionPolicy(),
outputNode.getKsqlTopic(),
schemaKTable.getKeySerdeFactory()
Expand All @@ -250,7 +250,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
sqlExpression,
outputNode.getId().toString(),
outputNode.getSchema(),
Optional.ofNullable(schemaKStream.getKeyField()),
schemaKStream.getKeyField(),
outputNode.getTimestampExtractionPolicy(),
outputNode.getKsqlTopic(),
schemaKStream.getKeySerdeFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -119,8 +120,8 @@ public Schema getSchema() {
}

@Override
public Field getKeyField() {
return null;
public Optional<Field> getKeyField() {
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
import java.util.Optional;
import javax.annotation.concurrent.Immutable;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsBuilder;

@Immutable
public class FilterNode
extends PlanNode {
public class FilterNode extends PlanNode {

private final PlanNode source;
private final Expression predicate;
private final Schema schema;
private final Field keyField;

@JsonCreator
public FilterNode(@JsonProperty("id") final PlanNodeId id,
Expand All @@ -49,7 +49,6 @@ public FilterNode(@JsonProperty("id") final PlanNodeId id,
this.source = source;
this.schema = source.getSchema();
this.predicate = predicate;
this.keyField = source.getKeyField();
}

@JsonProperty("predicate")
Expand All @@ -63,8 +62,8 @@ public Schema getSchema() {
}

@Override
public Field getKeyField() {
return keyField;
public Optional<Field> getKeyField() {
return source.getKeyField();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -128,8 +129,8 @@ public Schema getSchema() {
}

@Override
public Field getKeyField() {
return this.keyField;
public Optional<Field> getKeyField() {
return Optional.of(keyField);
}

@Override
Expand Down Expand Up @@ -284,7 +285,7 @@ Joiner getJoiner(final DataSource.DataSourceType leftType,
}
}

private abstract static class Joiner {
private abstract static class Joiner<K> {
protected final StreamsBuilder builder;
protected final KsqlConfig ksqlConfig;
private final ServiceContext serviceContext;
Expand Down Expand Up @@ -316,9 +317,9 @@ private abstract static class Joiner {
this.contextStacker = Objects.requireNonNull(contextStacker, "contextStacker");
}

public abstract SchemaKStream join();
public abstract SchemaKStream<K> join();

protected SchemaKStream buildStream(final PlanNode node, final String keyFieldName) {
protected SchemaKStream<K> buildStream(final PlanNode node, final String keyFieldName) {

return maybeRePartitionByKey(
node.buildStream(
Expand All @@ -332,11 +333,13 @@ protected SchemaKStream buildStream(final PlanNode node, final String keyFieldNa
contextStacker);
}


protected SchemaKTable buildTable(final PlanNode node,
final String keyFieldName,
final String tableName) {
final SchemaKStream schemaKStream = node.buildStream(
@SuppressWarnings("unchecked")
protected SchemaKTable<K> buildTable(
final PlanNode node,
final String keyFieldName,
final String tableName
) {
final SchemaKStream<?> schemaKStream = node.buildStream(
builder,
ksqlConfig.cloneWithPropertyOverwrite(
Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")),
Expand All @@ -349,15 +352,15 @@ protected SchemaKTable buildTable(final PlanNode node,
throw new RuntimeException("Expected to find a Table, found a stream instead.");
}

if (schemaKStream.getKeyField() != null
if (schemaKStream.getKeyField().isPresent()
&& !keyFieldName.equals(SchemaUtil.ROWKEY_NAME)
&& !SchemaUtil.matchFieldName(schemaKStream.getKeyField(), keyFieldName)) {
&& !SchemaUtil.matchFieldName(schemaKStream.getKeyField().get(), keyFieldName)) {
throw new KsqlException(
String.format(
"Source table (%s) key column (%s) "
+ "is not the column used in the join criteria (%s).",
tableName,
schemaKStream.getKeyField().name(),
schemaKStream.getKeyField().get().name(),
keyFieldName
)
);
Expand All @@ -366,7 +369,8 @@ protected SchemaKTable buildTable(final PlanNode node,
return (SchemaKTable) schemaKStream;
}

static SchemaKStream maybeRePartitionByKey(
@SuppressWarnings("unchecked")
static <K> SchemaKStream<K> maybeRePartitionByKey(
final SchemaKStream stream,
final String targetKey,
final QueryContext.Stacker contextStacker) {
Expand Down Expand Up @@ -408,7 +412,7 @@ Field getJoinKey(final String alias, final String keyFieldName) {
}
}

private static final class StreamToStreamJoiner extends Joiner {
private static final class StreamToStreamJoiner<K> extends Joiner<K> {

private StreamToStreamJoiner(
final StreamsBuilder builder,
Expand All @@ -431,9 +435,9 @@ private StreamToStreamJoiner(
contextStacker);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"})
@Override
public SchemaKStream join() {
public SchemaKStream<K> join() {
if (joinNode.withinExpression == null) {
throw new KsqlException("Stream-Stream joins must have a WITHIN clause specified. None was "
+ "provided. To learn about how to specify a WITHIN clause with a "
Expand All @@ -442,17 +446,18 @@ public SchemaKStream join() {
+ "#create-stream-as-select");
}

final SchemaKStream leftStream = buildStream(joinNode.getLeft(),
joinNode.getLeftKeyFieldName());
final SchemaKStream rightStream = buildStream(joinNode.getRight(),
joinNode.getRightKeyFieldName());
final SchemaKStream<K> leftStream = buildStream(
joinNode.getLeft(), joinNode.getLeftKeyFieldName());

final SchemaKStream<K> rightStream = buildStream(
joinNode.getRight(), joinNode.getRightKeyFieldName());

switch (joinNode.joinType) {
case LEFT:
return leftStream.leftJoin(rightStream,
joinNode.schema,
getJoinKey(joinNode.leftAlias,
leftStream.getKeyField().name()),
leftStream.getKeyField().get().name()),
joinNode.withinExpression.joinWindow(),
getSerDeForNode(
joinNode.left,
Expand All @@ -465,7 +470,7 @@ public SchemaKStream join() {
return leftStream.outerJoin(rightStream,
joinNode.schema,
getJoinKey(joinNode.leftAlias,
leftStream.getKeyField().name()),
leftStream.getKeyField().get().name()),
joinNode.withinExpression.joinWindow(),
getSerDeForNode(
joinNode.left,
Expand All @@ -478,7 +483,7 @@ public SchemaKStream join() {
return leftStream.join(rightStream,
joinNode.schema,
getJoinKey(joinNode.leftAlias,
leftStream.getKeyField().name()),
leftStream.getKeyField().get().name()),
joinNode.withinExpression.joinWindow(),
getSerDeForNode(
joinNode.left,
Expand All @@ -493,7 +498,7 @@ public SchemaKStream join() {
}
}

private static final class StreamToTableJoiner extends Joiner {
private static final class StreamToTableJoiner<K> extends Joiner<K> {

private StreamToTableJoiner(
final StreamsBuilder builder,
Expand All @@ -516,27 +521,27 @@ private StreamToTableJoiner(
contextStacker);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"})
@Override
public SchemaKStream join() {
public SchemaKStream<K> join() {
if (joinNode.withinExpression != null) {
throw new KsqlException("A window definition was provided for a Stream-Table join. These "
+ "joins are not windowed. Please drop the window definition (ie."
+ " the WITHIN clause) and try to execute your join again.");
}

final SchemaKTable rightTable = buildTable(joinNode.getRight(),
final SchemaKTable<K> rightTable = buildTable(joinNode.getRight(),
joinNode.getRightKeyFieldName(),
joinNode.getRightAlias());
final SchemaKStream leftStream = buildStream(joinNode.getLeft(),
final SchemaKStream<K> leftStream = buildStream(joinNode.getLeft(),
joinNode.getLeftKeyFieldName());

switch (joinNode.joinType) {
case LEFT:
return leftStream.leftJoin(rightTable,
joinNode.schema,
getJoinKey(joinNode.leftAlias,
leftStream.getKeyField().name()),
leftStream.getKeyField().get().name()),
getSerDeForNode(
joinNode.left,
contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
Expand All @@ -546,7 +551,7 @@ public SchemaKStream join() {
return leftStream.join(rightTable,
joinNode.schema,
getJoinKey(joinNode.leftAlias,
leftStream.getKeyField().name()),
leftStream.getKeyField().get().name()),
getSerDeForNode(
joinNode.left,
contextStacker.push(LEFT_SERDE_CONTEXT_NAME)),
Expand All @@ -561,7 +566,7 @@ public SchemaKStream join() {
}
}

private static final class TableToTableJoiner extends Joiner {
private static final class TableToTableJoiner<K> extends Joiner<K> {

TableToTableJoiner(
final StreamsBuilder builder,
Expand All @@ -584,20 +589,20 @@ private static final class TableToTableJoiner extends Joiner {
contextStacker);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"})
@Override
public SchemaKTable join() {
public SchemaKTable<K> join() {
if (joinNode.withinExpression != null) {
throw new KsqlException("A window definition was provided for a Table-Table join. These "
+ "joins are not windowed. Please drop the window definition "
+ "(i.e. the WITHIN clause) and try to execute your Table-Table "
+ "join again.");
}

final SchemaKTable leftTable = buildTable(joinNode.getLeft(),
final SchemaKTable<K> leftTable = buildTable(joinNode.getLeft(),
joinNode.getLeftKeyFieldName(),
joinNode.getLeftAlias());
final SchemaKTable rightTable = buildTable(joinNode.getRight(),
final SchemaKTable<K> rightTable = buildTable(joinNode.getRight(),
joinNode.getRightKeyFieldName(),
joinNode.getRightAlias());

Expand All @@ -606,19 +611,19 @@ public SchemaKTable join() {
return leftTable.leftJoin(
rightTable,
joinNode.schema,
getJoinKey(joinNode.leftAlias, leftTable.getKeyField().name()),
getJoinKey(joinNode.leftAlias, leftTable.getKeyField().get().name()),
contextStacker);
case INNER:
return leftTable.join(
rightTable,
joinNode.schema,
getJoinKey(joinNode.leftAlias, leftTable.getKeyField().name()),
getJoinKey(joinNode.leftAlias, leftTable.getKeyField().get().name()),
contextStacker);
case OUTER:
return leftTable.outerJoin(
rightTable,
joinNode.schema,
getJoinKey(joinNode.leftAlias, leftTable.getKeyField().name()),
getJoinKey(joinNode.leftAlias, leftTable.getKeyField().get().name()),
contextStacker);
default:
throw new KsqlException("Invalid join type encountered: " + joinNode.joinType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public QueryId getQueryId(final QueryIdGenerator queryIdGenerator) {
}

@Override
public Field getKeyField() {
return null;
public Optional<Field> getKeyField() {
return Optional.empty();
}

@Override
Expand Down
Loading

0 comments on commit cf29742

Please sign in to comment.