From e243c7408dc42a6c16ceacaad6ee340d44fe790c Mon Sep 17 00:00:00 2001 From: Rohan Date: Fri, 20 Sep 2019 02:24:45 -0700 Subject: [PATCH] feat: move joins to plan builder (#3361) Moves code for joining kstreams/ktables out of SchemaKStream/Table and into execution plan builders. Also moves KsqlValueJoiner into ksql-streams, so that it can be used from the plan builders. Also adds a buildKeySerde API for building both windowed and unwindowed keys, since most of the plan builders dont have type information for the key. --- .../confluent/ksql/planner/plan/JoinNode.java | 56 +-- .../ksql/streams/StreamsFactories.java | 1 + .../ksql/structured/SchemaKStream.java | 181 ++++------ .../ksql/structured/SchemaKTable.java | 33 +- .../ksql/planner/plan/JoinNodeTest.java | 92 +---- .../ksql/streams/JoinedFactoryTest.java | 1 + .../ksql/structured/SchemaKStreamTest.java | 96 ++--- .../ksql/structured/SchemaKTableTest.java | 15 +- .../ksql/execution/plan/StreamStreamJoin.java | 42 ++- .../ksql/execution/plan/StreamTableJoin.java | 33 +- .../ksql/execution/plan/TableTableJoin.java | 26 +- .../streams/ExecutionStepFactory.java | 19 +- .../execution}/streams/JoinedFactory.java | 3 +- .../execution/streams/KsqlValueJoiner.java | 75 ++++ .../streams/StreamStreamJoinBuilder.java | 93 +++++ .../streams/StreamTableJoinBuilder.java | 79 +++++ .../streams/TableTableJoinBuilder.java | 45 +++ .../streams}/KsqlValueJoinerTest.java | 11 +- .../streams/StreamStreamJoinBuilderTest.java | 331 ++++++++++++++++++ .../streams/StreamTableJoinBuilderTest.java | 282 +++++++++++++++ .../streams/TableTableJoinBuilderTest.java | 171 +++++++++ 21 files changed, 1340 insertions(+), 345 deletions(-) rename {ksql-engine/src/main/java/io/confluent/ksql => ksql-streams/src/main/java/io/confluent/ksql/execution}/streams/JoinedFactory.java (94%) create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java rename {ksql-engine/src/test/java/io/confluent/ksql/structured => ksql-streams/src/test/java/io/confluent/ksql/execution/streams}/KsqlValueJoinerTest.java (78%) create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index e88154d1bd59..f78d03bea4ed 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -16,7 +16,6 @@ package io.confluent.ksql.planner.plan; import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; @@ -25,9 +24,7 @@ import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; @@ -43,8 +40,6 @@ import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serde; - public class JoinNode extends PlanNode { @@ -52,9 +47,6 @@ public enum JoinType { INNER, LEFT, OUTER } - private static final String LEFT_SERDE_CONTEXT_NAME = "left"; - private static final String RIGHT_SERDE_CONTEXT_NAME = "right"; - private final JoinType joinType; private final DataSourceNode left; private final DataSourceNode right; @@ -285,25 +277,6 @@ static ValueFormat getFormatForSource(final DataSourceNode sourceNode) { .getValueFormat(); } - Serde getSerDeForSource( - final DataSourceNode sourceNode, - final QueryContext.Stacker contextStacker - ) { - final ValueFormat valueFormat = getFormatForSource(sourceNode); - - final LogicalSchema logicalSchema = sourceNode.getSchema() - .withoutAlias(); - - return builder.buildValueSerde( - valueFormat.getFormatInfo(), - PhysicalSchema.from( - logicalSchema, - SerdeOption.none() - ), - contextStacker.getQueryContext() - ); - } - /** * The key field of the resultant joined stream. * @@ -336,7 +309,6 @@ static KeyField getOuterJoinedKeyField(final String leftAlias, final KeyField le } private static final class StreamToStreamJoiner extends Joiner { - private StreamToStreamJoiner( final KsqlQueryBuilder builder, final JoinNode joinNode, @@ -370,9 +342,9 @@ public SchemaKStream join() { joinNode.withinExpression.get().joinWindow(), getFormatForSource(joinNode.left), getFormatForSource(joinNode.right), - getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), - getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)), - contextStacker); + contextStacker, + builder + ); case OUTER: return leftStream.outerJoin( rightStream, @@ -381,9 +353,9 @@ public SchemaKStream join() { joinNode.withinExpression.get().joinWindow(), getFormatForSource(joinNode.left), getFormatForSource(joinNode.right), - getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), - getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)), - contextStacker); + contextStacker, + builder + ); case INNER: return leftStream.join( rightStream, @@ -392,9 +364,9 @@ public SchemaKStream join() { joinNode.withinExpression.get().joinWindow(), getFormatForSource(joinNode.left), getFormatForSource(joinNode.right), - getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), - getSerDeForSource(joinNode.right, contextStacker.push(RIGHT_SERDE_CONTEXT_NAME)), - contextStacker); + contextStacker, + builder + ); default: throw new KsqlException("Invalid join type encountered: " + joinNode.joinType); } @@ -432,8 +404,9 @@ public SchemaKStream join() { joinNode.schema, getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()), getFormatForSource(joinNode.left), - getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), - contextStacker); + contextStacker, + builder + ); case INNER: return leftStream.join( @@ -441,8 +414,9 @@ public SchemaKStream join() { joinNode.schema, getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()), getFormatForSource(joinNode.left), - getSerDeForSource(joinNode.left, contextStacker.push(LEFT_SERDE_CONTEXT_NAME)), - contextStacker); + contextStacker, + builder + ); case OUTER: throw new KsqlException("Full outer joins between streams and tables are not supported."); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java b/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java index 6616f1cabd46..8343a7938fef 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java @@ -16,6 +16,7 @@ package io.confluent.ksql.streams; import io.confluent.ksql.execution.streams.GroupedFactory; +import io.confluent.ksql.execution.streams.JoinedFactory; import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.util.KsqlConfig; import java.util.Objects; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 09e048fcc39e..bba5f09cea67 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -41,6 +41,8 @@ import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; +import io.confluent.ksql.execution.plan.StreamStreamJoin; +import io.confluent.ksql.execution.plan.StreamTableJoin; import io.confluent.ksql.execution.plan.StreamToTable; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.StreamFilterBuilder; @@ -49,8 +51,9 @@ import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder; import io.confluent.ksql.execution.streams.StreamSinkBuilder; import io.confluent.ksql.execution.streams.StreamSourceBuilder; +import io.confluent.ksql.execution.streams.StreamStreamJoinBuilder; +import io.confluent.ksql.execution.streams.StreamTableJoinBuilder; import io.confluent.ksql.execution.streams.StreamToTableBuilder; -import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.DataSource; @@ -69,20 +72,17 @@ import io.confluent.ksql.util.IdentifierUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.SchemaUtil; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.Topology.AutoOffsetReset; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling @@ -437,26 +437,15 @@ public KeyField getKey() { } } - @SuppressWarnings("unchecked") public SchemaKStream leftJoin( final SchemaKTable schemaKTable, final LogicalSchema joinSchema, final KeyField keyField, final ValueFormat valueFormat, - final Serde leftValueSerDe, - final QueryContext.Stacker contextStacker + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder ) { - final KStream joinedKStream = - kstream.leftJoin( - schemaKTable.getKtable(), - new KsqlValueJoiner(this.getSchema(), schemaKTable.getSchema()), - streamsFactories.getJoinedFactory().create( - keySerde, - leftValueSerDe, - null, - StreamsUtil.buildOpName(contextStacker.getQueryContext())) - ); - final ExecutionStep> step = ExecutionStepFactory.streamTableJoin( + final StreamTableJoin step = ExecutionStepFactory.streamTableJoin( contextStacker, JoinType.LEFT, Formats.of(keyFormat, valueFormat, SerdeOption.none()), @@ -465,7 +454,14 @@ public SchemaKStream leftJoin( joinSchema ); return new SchemaKStream<>( - joinedKStream, + StreamTableJoinBuilder.build( + kstream, + schemaKTable.getKtable(), + step, + (fmt, schema, qctx) -> keySerde, + queryBuilder, + streamsFactories.getJoinedFactory() + ), step, keyFormat, keySerde, @@ -484,23 +480,10 @@ public SchemaKStream leftJoin( final JoinWindows joinWindows, final ValueFormat leftFormat, final ValueFormat rightFormat, - final Serde leftSerde, - final Serde rightSerde, - final QueryContext.Stacker contextStacker) { - - final KStream joinStream = - kstream - .leftJoin( - otherSchemaKStream.kstream, - new KsqlValueJoiner(this.getSchema(), otherSchemaKStream.getSchema()), - joinWindows, - streamsFactories.getJoinedFactory().create( - keySerde, - leftSerde, - rightSerde, - StreamsUtil.buildOpName(contextStacker.getQueryContext())) - ); - final ExecutionStep> step = ExecutionStepFactory.streamStreamJoin( + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder) { + + final StreamStreamJoin step = ExecutionStepFactory.streamStreamJoin( contextStacker, JoinType.LEFT, Formats.of(keyFormat, leftFormat, SerdeOption.none()), @@ -511,7 +494,14 @@ public SchemaKStream leftJoin( joinWindows ); return new SchemaKStream<>( - joinStream, + StreamStreamJoinBuilder.build( + kstream, + otherSchemaKStream.kstream, + step, + (fmt, schema, qctx) -> keySerde, + queryBuilder, + streamsFactories.getJoinedFactory() + ), step, keyFormat, keySerde, @@ -523,26 +513,15 @@ public SchemaKStream leftJoin( ); } - @SuppressWarnings("unchecked") public SchemaKStream join( final SchemaKTable schemaKTable, final LogicalSchema joinSchema, final KeyField keyField, final ValueFormat valueFormat, - final Serde joinSerDe, - final QueryContext.Stacker contextStacker + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder ) { - final KStream joinedKStream = - kstream.join( - schemaKTable.getKtable(), - new KsqlValueJoiner(getSchema(), schemaKTable.getSchema()), - streamsFactories.getJoinedFactory().create( - keySerde, - joinSerDe, - null, - StreamsUtil.buildOpName(contextStacker.getQueryContext())) - ); - final ExecutionStep> step = ExecutionStepFactory.streamTableJoin( + final StreamTableJoin step = ExecutionStepFactory.streamTableJoin( contextStacker, JoinType.INNER, Formats.of(keyFormat, valueFormat, SerdeOption.none()), @@ -551,7 +530,14 @@ public SchemaKStream join( joinSchema ); return new SchemaKStream<>( - joinedKStream, + StreamTableJoinBuilder.build( + kstream, + schemaKTable.getKtable(), + step, + (fmt, schema, qctx) -> keySerde, + queryBuilder, + streamsFactories.getJoinedFactory() + ), step, keyFormat, keySerde, @@ -570,22 +556,9 @@ public SchemaKStream join( final JoinWindows joinWindows, final ValueFormat leftFormat, final ValueFormat rightFormat, - final Serde leftSerde, - final Serde rightSerde, - final QueryContext.Stacker contextStacker) { - final KStream joinStream = - kstream - .join( - otherSchemaKStream.kstream, - new KsqlValueJoiner(this.getSchema(), otherSchemaKStream.getSchema()), - joinWindows, - streamsFactories.getJoinedFactory().create( - keySerde, - leftSerde, - rightSerde, - StreamsUtil.buildOpName(contextStacker.getQueryContext())) - ); - final ExecutionStep> step = ExecutionStepFactory.streamStreamJoin( + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder) { + final StreamStreamJoin step = ExecutionStepFactory.streamStreamJoin( contextStacker, JoinType.INNER, Formats.of(keyFormat, leftFormat, SerdeOption.none()), @@ -596,7 +569,14 @@ public SchemaKStream join( joinWindows ); return new SchemaKStream<>( - joinStream, + StreamStreamJoinBuilder.build( + kstream, + otherSchemaKStream.kstream, + step, + (fmt, schema, qctx) -> keySerde, + queryBuilder, + streamsFactories.getJoinedFactory() + ), step, keyFormat, keySerde, @@ -615,21 +595,9 @@ public SchemaKStream outerJoin( final JoinWindows joinWindows, final ValueFormat leftFormat, final ValueFormat rightFormat, - final Serde leftSerde, - final Serde rightSerde, - final QueryContext.Stacker contextStacker) { - final KStream joinStream = kstream - .outerJoin( - otherSchemaKStream.kstream, - new KsqlValueJoiner(this.getSchema(), otherSchemaKStream.getSchema()), - joinWindows, - streamsFactories.getJoinedFactory().create( - keySerde, - leftSerde, - rightSerde, - StreamsUtil.buildOpName(contextStacker.getQueryContext())) - ); - final ExecutionStep> step = ExecutionStepFactory.streamStreamJoin( + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder) { + final StreamStreamJoin step = ExecutionStepFactory.streamStreamJoin( contextStacker, JoinType.OUTER, Formats.of(keyFormat, leftFormat, SerdeOption.none()), @@ -640,7 +608,14 @@ public SchemaKStream outerJoin( joinWindows ); return new SchemaKStream<>( - joinStream, + StreamStreamJoinBuilder.build( + kstream, + otherSchemaKStream.kstream, + step, + (fmt, schema, qctx) -> keySerde, + queryBuilder, + streamsFactories.getJoinedFactory() + ), step, keyFormat, keySerde, @@ -917,40 +892,4 @@ String groupedKeyNameFor(final List groupByExpressions) { .map(Expression::toString) .collect(Collectors.joining(GROUP_BY_COLUMN_SEPARATOR)); } - - protected static class KsqlValueJoiner - implements ValueJoiner { - - private final LogicalSchema leftSchema; - private final LogicalSchema rightSchema; - - KsqlValueJoiner(final LogicalSchema leftSchema, final LogicalSchema rightSchema) { - this.leftSchema = Objects.requireNonNull(leftSchema, "leftSchema"); - this.rightSchema = Objects.requireNonNull(rightSchema, "rightSchema"); - } - - @Override - public GenericRow apply(final GenericRow left, final GenericRow right) { - final List columns = new ArrayList<>(); - if (left != null) { - columns.addAll(left.getColumns()); - } else { - fillWithNulls(columns, leftSchema.value().size()); - } - - if (right != null) { - columns.addAll(right.getColumns()); - } else { - fillWithNulls(columns, rightSchema.value().size()); - } - - return new GenericRow(columns); - } - - private static void fillWithNulls(final List columns, final int numToFill) { - for (int i = 0; i < numToFill; ++i) { - columns.add(null); - } - } - } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 7dac67d34d31..0032e11f56f2 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -28,11 +28,13 @@ import io.confluent.ksql.execution.plan.TableGroupBy; import io.confluent.ksql.execution.plan.TableMapValues; import io.confluent.ksql.execution.plan.TableSink; +import io.confluent.ksql.execution.plan.TableTableJoin; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.TableFilterBuilder; import io.confluent.ksql.execution.streams.TableGroupByBuilder; import io.confluent.ksql.execution.streams.TableMapValuesBuilder; import io.confluent.ksql.execution.streams.TableSinkBuilder; +import io.confluent.ksql.execution.streams.TableTableJoinBuilder; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; @@ -209,7 +211,7 @@ public KStream getKstream() { return ktable.toStream(); } - public KTable getKtable() { + public KTable getKtable() { return ktable; } @@ -258,18 +260,13 @@ public SchemaKGroupedStream groupBy( functionRegistry); } - @SuppressWarnings("unchecked") public SchemaKTable join( final SchemaKTable schemaKTable, final LogicalSchema joinSchema, final KeyField keyField, final QueryContext.Stacker contextStacker ) { - final KTable joinedKTable = ktable.join( - schemaKTable.getKtable(), - new KsqlValueJoiner(this.getSchema(), schemaKTable.getSchema()) - ); - final ExecutionStep> step = ExecutionStepFactory.tableTableJoin( + final TableTableJoin step = ExecutionStepFactory.tableTableJoin( contextStacker, JoinType.INNER, sourceTableStep, @@ -277,7 +274,7 @@ public SchemaKTable join( joinSchema ); return new SchemaKTable<>( - joinedKTable, + TableTableJoinBuilder.build(ktable, schemaKTable.ktable, step), step, keyFormat, keySerde, @@ -289,19 +286,13 @@ public SchemaKTable join( ); } - @SuppressWarnings("unchecked") public SchemaKTable leftJoin( final SchemaKTable schemaKTable, final LogicalSchema joinSchema, final KeyField keyField, final QueryContext.Stacker contextStacker ) { - final KTable joinedKTable = - ktable.leftJoin( - schemaKTable.getKtable(), - new KsqlValueJoiner(this.getSchema(), schemaKTable.getSchema()) - ); - final ExecutionStep> step = ExecutionStepFactory.tableTableJoin( + final TableTableJoin step = ExecutionStepFactory.tableTableJoin( contextStacker, JoinType.LEFT, sourceTableStep, @@ -309,7 +300,7 @@ public SchemaKTable leftJoin( joinSchema ); return new SchemaKTable<>( - joinedKTable, + TableTableJoinBuilder.build(ktable, schemaKTable.ktable, step), step, keyFormat, keySerde, @@ -321,19 +312,13 @@ public SchemaKTable leftJoin( ); } - @SuppressWarnings("unchecked") public SchemaKTable outerJoin( final SchemaKTable schemaKTable, final LogicalSchema joinSchema, final KeyField keyField, final QueryContext.Stacker contextStacker ) { - final KTable joinedKTable = - ktable.outerJoin( - schemaKTable.getKtable(), - new KsqlValueJoiner(this.getSchema(), schemaKTable.getSchema()) - ); - final ExecutionStep> step = ExecutionStepFactory.tableTableJoin( + final TableTableJoin step = ExecutionStepFactory.tableTableJoin( contextStacker, JoinType.OUTER, sourceTableStep, @@ -341,7 +326,7 @@ public SchemaKTable outerJoin( joinSchema ); return new SchemaKTable<>( - joinedKTable, + TableTableJoinBuilder.build(ktable, schemaKTable.ktable, step), step, keyFormat, keySerde, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index 8041d0846338..e168431de129 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -360,9 +361,9 @@ public void shouldPerformStreamToStreamLeftJoin() { eq(WITHIN_EXPRESSION.get().joinWindow()), eq(VALUE_FORMAT), eq(OTHER_FORMAT), - any(), - any(), - eq(CONTEXT_STACKER)); + eq(CONTEXT_STACKER), + same(ksqlStreamBuilder) + ); } @Test @@ -392,9 +393,9 @@ public void shouldPerformStreamToStreamInnerJoin() { eq(WITHIN_EXPRESSION.get().joinWindow()), eq(VALUE_FORMAT), eq(OTHER_FORMAT), - any(), - any(), - eq(CONTEXT_STACKER)); + eq(CONTEXT_STACKER), + same(ksqlStreamBuilder) + ); } @Test @@ -424,9 +425,9 @@ public void shouldPerformStreamToStreamOuterJoin() { eq(WITHIN_EXPRESSION.get().joinWindow()), eq(VALUE_FORMAT), eq(OTHER_FORMAT), - any(), - any(), - eq(CONTEXT_STACKER)); + eq(CONTEXT_STACKER), + same(ksqlStreamBuilder) + ); } @Test @@ -565,8 +566,9 @@ public void shouldHandleJoinIfTableHasNoKeyAndJoinFieldIsRowKey() { eq(JOIN_SCHEMA), eq(leftJoinField), eq(VALUE_FORMAT), - any(), - eq(CONTEXT_STACKER)); + eq(CONTEXT_STACKER), + same(ksqlStreamBuilder) + ); } @Test @@ -594,8 +596,9 @@ public void shouldPerformStreamToTableLeftJoin() { eq(JOIN_SCHEMA), eq(leftJoinField), eq(VALUE_FORMAT), - any(), - eq(CONTEXT_STACKER)); + eq(CONTEXT_STACKER), + same(ksqlStreamBuilder) + ); } @Test @@ -623,8 +626,9 @@ public void shouldPerformStreamToTableInnerJoin() { eq(JOIN_SCHEMA), eq(leftJoinField), eq(VALUE_FORMAT), - any(), - eq(CONTEXT_STACKER)); + eq(CONTEXT_STACKER), + same(ksqlStreamBuilder) + ); } @Test @@ -910,64 +914,6 @@ public void shouldSelectLeftKeyField() { ); } - @Test - public void shouldBuildLeftRowSerde() { - // Given: - setupStream(left, leftSchemaKStream); - setupStream(right, rightSchemaKStream); - - final JoinNode joinNode = new JoinNode( - nodeId, - JoinNode.JoinType.LEFT, - left, - right, - LEFT_JOIN_FIELD_NAME, - RIGHT_JOIN_FIELD_NAME, - WITHIN_EXPRESSION - ); - - // When: - joinNode.buildStream(ksqlStreamBuilder); - - // Then: - final PhysicalSchema expected = PhysicalSchema - .from(LEFT_NODE_SCHEMA.withoutAlias(), SerdeOption.none()); - - verify(ksqlStreamBuilder).buildValueSerde( - any(), - eq(expected), - any()); - } - - @Test - public void shouldBuildRightRowSerde() { - // Given: - setupStream(left, leftSchemaKStream); - setupStream(right, rightSchemaKStream); - - final JoinNode joinNode = new JoinNode( - nodeId, - JoinNode.JoinType.LEFT, - left, - right, - LEFT_JOIN_FIELD_NAME, - RIGHT_JOIN_FIELD_NAME, - WITHIN_EXPRESSION - ); - - // When: - joinNode.buildStream(ksqlStreamBuilder); - - // Then: - final PhysicalSchema expected = PhysicalSchema - .from(RIGHT_NODE_SCHEMA.withoutAlias(), SerdeOption.none()); - - verify(ksqlStreamBuilder).buildValueSerde( - any(), - eq(expected), - any()); - } - @Test public void shouldNotUseSourceSerdeOptionsForInternalTopics() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java index 7754952d015e..61c77ae94ee6 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.streams.JoinedFactory; import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index cfa08f98ff6c..16b1cc8f41e9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -53,6 +53,8 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.StreamFilter; import io.confluent.ksql.execution.streams.ExecutionStepFactory; +import io.confluent.ksql.execution.streams.JoinedFactory; +import io.confluent.ksql.execution.streams.KsqlValueJoiner; import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.function.InternalFunctionRegistry; @@ -80,8 +82,8 @@ import io.confluent.ksql.serde.KeySerde; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; +import io.confluent.ksql.execution.streams.JoinedFactory; import io.confluent.ksql.execution.streams.GroupedFactory; -import io.confluent.ksql.streams.JoinedFactory; import io.confluent.ksql.streams.StreamsFactories; import io.confluent.ksql.structured.SchemaKStream.Type; import io.confluent.ksql.testutils.AnalysisTestUtil; @@ -201,6 +203,7 @@ public void init() { when(mockGroupedFactory.create(anyString(), any(Serde.class), any(Serde.class))) .thenReturn(grouped); when(mockMaterializedFactory.create(any(), any(), anyString())).thenReturn(mockMaterialized); + when(mockJoinedFactory.create(any(), any(), any(), anyString())).thenReturn(joined); final KsqlStream secondKsqlStream = (KsqlStream) metaStore.getSource("ORDERS"); final KStream secondKStream = builder @@ -859,10 +862,13 @@ public void shouldPerformStreamToStreamLeftJoin() { when( mockKStream.leftJoin( any(KStream.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(JoinWindows.class), any(Joined.class)) ).thenReturn(mockKStream); + when(queryBuilder.buildValueSerde(any(), any(), any())) + .thenReturn(leftSerde) + .thenReturn(rightSerde); // When: final SchemaKStream joinedKStream = initialSchemaKStream @@ -873,16 +879,15 @@ public void shouldPerformStreamToStreamLeftJoin() { joinWindow, valueFormat, valueFormat, - leftSerde, - rightSerde, - childContextStacker + childContextStacker, + queryBuilder ); // Then: verifyCreateJoined(rightSerde); verify(mockKStream).leftJoin( eq(secondSchemaKStream.kstream), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), eq(joinWindow), same(joined) ); @@ -903,9 +908,9 @@ SchemaKStream join( JoinWindows joinWindows, ValueFormat leftFormat, ValueFormat rightFormat, - Serde leftSerde, - Serde rightSerde, - QueryContext.Stacker contextStacker); + QueryContext.Stacker contextStacker, + KsqlQueryBuilder queryBuilder + ); } @Test @@ -916,19 +921,19 @@ public void shouldBuildStepForStreamStreamJoin() { final JoinWindows joinWindow = JoinWindows.of(Duration.ofMillis(10L)); when(mockKStream.leftJoin( any(KStream.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(JoinWindows.class), any(Joined.class)) ).thenReturn(mockKStream); when(mockKStream.join( any(KStream.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(JoinWindows.class), any(Joined.class)) ).thenReturn(mockKStream); when(mockKStream.outerJoin( any(KStream.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(JoinWindows.class), any(Joined.class)) ).thenReturn(mockKStream); @@ -947,9 +952,8 @@ public void shouldBuildStepForStreamStreamJoin() { joinWindow, valueFormat, rightFormat, - leftSerde, - rightSerde, - childContextStacker + childContextStacker, + queryBuilder ); // Then: @@ -981,10 +985,13 @@ public void shouldPerformStreamToStreamInnerJoin() { when( mockKStream.join( any(KStream.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(JoinWindows.class), any(Joined.class)) ).thenReturn(mockKStream); + when(queryBuilder.buildValueSerde(any(), any(), any())) + .thenReturn(leftSerde) + .thenReturn(rightSerde); // When: final SchemaKStream joinedKStream = initialSchemaKStream @@ -995,15 +1002,15 @@ public void shouldPerformStreamToStreamInnerJoin() { joinWindow, valueFormat, valueFormat, - leftSerde, - rightSerde, - childContextStacker); + childContextStacker, + queryBuilder + ); // Then: verifyCreateJoined(rightSerde); verify(mockKStream).join( eq(secondSchemaKStream.kstream), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), eq(joinWindow), same(joined) ); @@ -1026,10 +1033,13 @@ public void shouldPerformStreamToStreamOuterJoin() { when( mockKStream.outerJoin( any(KStream.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(JoinWindows.class), any(Joined.class)) ).thenReturn(mockKStream); + when(queryBuilder.buildValueSerde(any(), any(), any())) + .thenReturn(leftSerde) + .thenReturn(rightSerde); // When: final SchemaKStream joinedKStream = initialSchemaKStream @@ -1040,16 +1050,15 @@ public void shouldPerformStreamToStreamOuterJoin() { joinWindow, valueFormat, valueFormat, - leftSerde, - rightSerde, - childContextStacker + childContextStacker, + queryBuilder ); // Then: verifyCreateJoined(rightSerde); verify(mockKStream).outerJoin( eq(secondSchemaKStream.kstream), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), eq(joinWindow), same(joined) ); @@ -1070,9 +1079,10 @@ public void shouldPerformStreamToTableLeftJoin() { when( mockKStream.leftJoin( any(KTable.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(Joined.class)) ).thenReturn(mockKStream); + when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(leftSerde); // When: final SchemaKStream joinedKStream = initialSchemaKStream @@ -1081,14 +1091,15 @@ public void shouldPerformStreamToTableLeftJoin() { joinSchema, validJoinKeyField, valueFormat, - leftSerde, - childContextStacker); + childContextStacker, + queryBuilder + ); // Then: verifyCreateJoined(null); verify(mockKStream).leftJoin( eq(schemaKTable.getKtable()), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), same(joined)); assertThat(joinedKStream, instanceOf(SchemaKStream.class)); assertEquals(SchemaKStream.Type.JOIN, joinedKStream.type); @@ -1107,9 +1118,10 @@ public void shouldPerformStreamToTableInnerJoin() { when( mockKStream.join( any(KTable.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(Joined.class)) ).thenReturn(mockKStream); + when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(leftSerde); // When: final SchemaKStream joinedKStream = initialSchemaKStream @@ -1118,14 +1130,15 @@ public void shouldPerformStreamToTableInnerJoin() { joinSchema, validJoinKeyField, valueFormat, - leftSerde, - childContextStacker); + childContextStacker, + queryBuilder + ); // Then: verifyCreateJoined(null); verify(mockKStream).join( eq(schemaKTable.getKtable()), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), same(joined) ); @@ -1143,8 +1156,9 @@ SchemaKStream join( LogicalSchema joinSchema, KeyField keyField, ValueFormat leftFormat, - Serde leftSerde, - QueryContext.Stacker contextStacker); + QueryContext.Stacker contextStacker, + KsqlQueryBuilder queryBuilder + ); } @Test @@ -1155,12 +1169,12 @@ public void shouldBuildStepForStreamTableJoin() { when( mockKStream.leftJoin( any(KTable.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(Joined.class)) ).thenReturn(mockKStream); when(mockKStream.join( any(KTable.class), - any(SchemaKStream.KsqlValueJoiner.class), + any(KsqlValueJoiner.class), any(Joined.class)) ).thenReturn(mockKStream); @@ -1175,8 +1189,8 @@ public void shouldBuildStepForStreamTableJoin() { joinSchema, validJoinKeyField, valueFormat, - leftSerde, - childContextStacker + childContextStacker, + queryBuilder ); // Then: @@ -1361,8 +1375,8 @@ private SchemaKStream buildSchemaKStreamForJoin( final GroupedFactory groupedFactory, final JoinedFactory joinedFactory) { return buildSchemaKStream( - ksqlStream.getSchema(), - ksqlStream.getKeyField(), kStream, + ksqlStream.getSchema().withAlias("test"), + ksqlStream.getKeyField().withAlias("test"), kStream, new StreamsFactories(groupedFactory, joinedFactory, mock(MaterializedFactory.class)) ); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index 58ba569d158b..79e2ca9d1ab0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -54,6 +54,7 @@ import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.execution.streams.StreamsUtil; +import io.confluent.ksql.execution.streams.KsqlValueJoiner; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; @@ -78,7 +79,7 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.execution.streams.GroupedFactory; -import io.confluent.ksql.streams.JoinedFactory; +import io.confluent.ksql.execution.streams.JoinedFactory; import io.confluent.ksql.streams.StreamsFactories; import io.confluent.ksql.structured.SchemaKStream.Type; import io.confluent.ksql.testutils.AnalysisTestUtil; @@ -555,7 +556,7 @@ public void shouldGroupKeysCorrectly() { @Test public void shouldPerformTableToTableLeftJoin() { expect(mockKTable.leftJoin(eq(secondSchemaKTable.getKtable()), - anyObject(SchemaKStream.KsqlValueJoiner.class))) + anyObject(KsqlValueJoiner.class))) .andReturn(EasyMock.niceMock(KTable.class)); replay(mockKTable); @@ -581,7 +582,7 @@ public void shouldPerformTableToTableLeftJoin() { @Test public void shouldPerformTableToTableInnerJoin() { expect(mockKTable.join(eq(secondSchemaKTable.getKtable()), - anyObject(SchemaKStream.KsqlValueJoiner.class))) + anyObject(KsqlValueJoiner.class))) .andReturn(EasyMock.niceMock(KTable.class)); replay(mockKTable); @@ -605,7 +606,7 @@ public void shouldPerformTableToTableInnerJoin() { @Test public void shouldPerformTableToTableOuterJoin() { expect(mockKTable.outerJoin(eq(secondSchemaKTable.getKtable()), - anyObject(SchemaKStream.KsqlValueJoiner.class))) + anyObject(KsqlValueJoiner.class))) .andReturn(EasyMock.niceMock(KTable.class)); replay(mockKTable); @@ -639,15 +640,15 @@ public void shouldBuildStepForTableTableJoin() { final KTable resultTable = EasyMock.niceMock(KTable.class); expect(mockKTable.outerJoin( eq(secondSchemaKTable.getKtable()), - anyObject(SchemaKStream.KsqlValueJoiner.class)) + anyObject(KsqlValueJoiner.class)) ).andReturn(resultTable); expect(mockKTable.join( eq(secondSchemaKTable.getKtable()), - anyObject(SchemaKStream.KsqlValueJoiner.class)) + anyObject(KsqlValueJoiner.class)) ).andReturn(resultTable); expect(mockKTable.leftJoin( eq(secondSchemaKTable.getKtable()), - anyObject(SchemaKStream.KsqlValueJoiner.class)) + anyObject(KsqlValueJoiner.class)) ).andReturn(resultTable); replay(mockKTable); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java index 2222b9d47f1c..4fedb4a93382 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamStreamJoin.java @@ -16,20 +16,22 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import java.time.Duration; import java.util.List; import java.util.Objects; +import org.apache.kafka.streams.kstream.KStream; @Immutable -public class StreamStreamJoin implements ExecutionStep { +public class StreamStreamJoin implements ExecutionStep> { private final ExecutionStepProperties properties; private final JoinType joinType; private final Formats leftFormats; private final Formats rightFormats; - private final ExecutionStep left; - private final ExecutionStep right; + private final ExecutionStep> left; + private final ExecutionStep> right; private final Duration before; private final Duration after; @@ -38,8 +40,8 @@ public StreamStreamJoin( final JoinType joinType, final Formats leftFormats, final Formats rightFormats, - final ExecutionStep left, - final ExecutionStep right, + final ExecutionStep> left, + final ExecutionStep> right, final Duration before, final Duration after) { this.properties = Objects.requireNonNull(properties, "properties"); @@ -63,10 +65,38 @@ public List> getSources() { } @Override - public S build(final KsqlQueryBuilder streamsBuilder) { + public KStream build(final KsqlQueryBuilder streamsBuilder) { throw new UnsupportedOperationException(); } + public Formats getLeftFormats() { + return leftFormats; + } + + public Formats getRightFormats() { + return rightFormats; + } + + public ExecutionStep> getLeft() { + return left; + } + + public ExecutionStep> getRight() { + return right; + } + + public JoinType getJoinType() { + return joinType; + } + + public Duration getAfter() { + return after; + } + + public Duration getBefore() { + return before; + } + // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @Override public boolean equals(final Object o) { diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java index aa3002f24a12..f220273b6ba8 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamTableJoin.java @@ -16,25 +16,28 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import java.util.List; import java.util.Objects; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; @Immutable -public class StreamTableJoin implements ExecutionStep { +public class StreamTableJoin implements ExecutionStep> { private final ExecutionStepProperties properties; private final JoinType joinType; private final Formats formats; - private final ExecutionStep left; - private final ExecutionStep right; + private final ExecutionStep> left; + private final ExecutionStep> right; public StreamTableJoin( final ExecutionStepProperties properties, final JoinType joinType, final Formats formats, - final ExecutionStep left, - final ExecutionStep right) { + final ExecutionStep> left, + final ExecutionStep> right) { this.properties = Objects.requireNonNull(properties, "properties"); this.formats = Objects.requireNonNull(formats, "formats"); this.joinType = Objects.requireNonNull(joinType, "joinType"); @@ -53,10 +56,26 @@ public List> getSources() { } @Override - public S build(final KsqlQueryBuilder streamsBuilder) { + public KStream build(final KsqlQueryBuilder streamsBuilder) { throw new UnsupportedOperationException(); } + public Formats getFormats() { + return formats; + } + + public ExecutionStep> getLeft() { + return left; + } + + public ExecutionStep> getRight() { + return right; + } + + public JoinType getJoinType() { + return joinType; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -65,7 +84,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final StreamTableJoin that = (StreamTableJoin) o; + final StreamTableJoin that = (StreamTableJoin) o; return Objects.equals(properties, that.properties) && joinType == that.joinType && Objects.equals(formats, that.formats) diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java index d5de852dd14c..5ace3efc3213 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableTableJoin.java @@ -16,22 +16,24 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import java.util.List; import java.util.Objects; +import org.apache.kafka.streams.kstream.KTable; @Immutable -public class TableTableJoin implements ExecutionStep { +public class TableTableJoin implements ExecutionStep> { private final ExecutionStepProperties properties; private final JoinType joinType; - private final ExecutionStep left; - private final ExecutionStep right; + private final ExecutionStep> left; + private final ExecutionStep> right; public TableTableJoin( final ExecutionStepProperties properties, final JoinType joinType, - final ExecutionStep left, - final ExecutionStep right) { + final ExecutionStep> left, + final ExecutionStep> right) { this.properties = Objects.requireNonNull(properties, "properties"); this.joinType = Objects.requireNonNull(joinType, "joinType"); this.left = Objects.requireNonNull(left, "left"); @@ -48,8 +50,20 @@ public List> getSources() { return ImmutableList.of(left, right); } + public ExecutionStep> getLeft() { + return left; + } + + public ExecutionStep> getRight() { + return right; + } + + public JoinType getJoinType() { + return joinType; + } + @Override - public T build(final KsqlQueryBuilder builder) { + public KTable build(final KsqlQueryBuilder builder) { throw new UnsupportedOperationException(); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 9f1e3ca89d5d..65f60908040c 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -174,14 +174,13 @@ public static StreamMapValues> streamMapValues( ); } - public static StreamTableJoin, KTable> - streamTableJoin( - final QueryContext.Stacker stacker, - final JoinType joinType, - final Formats formats, - final ExecutionStep> left, - final ExecutionStep> right, - final LogicalSchema resultSchema + public static StreamTableJoin streamTableJoin( + final QueryContext.Stacker stacker, + final JoinType joinType, + final Formats formats, + final ExecutionStep> left, + final ExecutionStep> right, + final LogicalSchema resultSchema ) { final QueryContext queryContext = stacker.getQueryContext(); return new StreamTableJoin<>( @@ -193,7 +192,7 @@ public static StreamMapValues> streamMapValues( ); } - public static StreamStreamJoin> streamStreamJoin( + public static StreamStreamJoin streamStreamJoin( final QueryContext.Stacker stacker, final JoinType joinType, final Formats leftFormats, @@ -286,7 +285,7 @@ public static TableMapValues> tableMapValues( ); } - public static TableTableJoin> tableTableJoin( + public static TableTableJoin tableTableJoin( final QueryContext.Stacker stacker, final JoinType joinType, final ExecutionStep> left, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java similarity index 94% rename from ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java rename to ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java index b1985ce7f892..d3325401f215 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java @@ -13,9 +13,8 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.streams; +package io.confluent.ksql.execution.streams; -import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Joined; diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java new file mode 100644 index 000000000000..42b201b07280 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KsqlValueJoiner.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.kstream.ValueJoiner; + +public final class KsqlValueJoiner implements ValueJoiner { + private final LogicalSchema leftSchema; + private final LogicalSchema rightSchema; + + KsqlValueJoiner(final LogicalSchema leftSchema, final LogicalSchema rightSchema) { + this.leftSchema = Objects.requireNonNull(leftSchema, "leftSchema"); + this.rightSchema = Objects.requireNonNull(rightSchema, "rightSchema"); + } + + @Override + public GenericRow apply(final GenericRow left, final GenericRow right) { + final List columns = new ArrayList<>(); + if (left != null) { + columns.addAll(left.getColumns()); + } else { + fillWithNulls(columns, leftSchema.value().size()); + } + + if (right != null) { + columns.addAll(right.getColumns()); + } else { + fillWithNulls(columns, rightSchema.value().size()); + } + + return new GenericRow(columns); + } + + private static void fillWithNulls(final List columns, final int numToFill) { + for (int i = 0; i < numToFill; ++i) { + columns.add(null); + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KsqlValueJoiner that = (KsqlValueJoiner) o; + return Objects.equals(leftSchema, that.leftSchema) + && Objects.equals(rightSchema, that.rightSchema); + } + + @Override + public int hashCode() { + return Objects.hash(leftSchema, rightSchema); + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java new file mode 100644 index 000000000000..af65c4f71b15 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilder.java @@ -0,0 +1,93 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.StreamStreamJoin; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeySerde; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; + +public final class StreamStreamJoinBuilder { + private static final String LEFT_SERDE_CTX = "left"; + private static final String RIGHT_SERDE_CTX = "right"; + + private StreamStreamJoinBuilder() { + } + + public static KStream build( + final KStream left, + final KStream right, + final StreamStreamJoin join, + final KeySerdeFactory keySerdeFactory, + final KsqlQueryBuilder queryBuilder, + final JoinedFactory joinedFactory) { + final Formats leftFormats = join.getLeftFormats(); + final QueryContext queryContext = join.getProperties().getQueryContext(); + final QueryContext.Stacker stacker = QueryContext.Stacker.of(queryContext); + final LogicalSchema leftSchema = join.getLeft().getProperties().getSchema(); + final PhysicalSchema leftPhysicalSchema = PhysicalSchema.from( + leftSchema.withoutAlias(), + leftFormats.getOptions() + ); + final Serde leftSerde = queryBuilder.buildValueSerde( + leftFormats.getValueFormat().getFormatInfo(), + leftPhysicalSchema, + stacker.push(LEFT_SERDE_CTX).getQueryContext() + ); + final Formats rightFormats = join.getRightFormats(); + final LogicalSchema rightSchema = join.getRight().getProperties().getSchema(); + final PhysicalSchema rightPhysicalSchema = PhysicalSchema.from( + rightSchema.withoutAlias(), + rightFormats.getOptions() + ); + final Serde rightSerde = queryBuilder.buildValueSerde( + rightFormats.getValueFormat().getFormatInfo(), + rightPhysicalSchema, + stacker.push(RIGHT_SERDE_CTX).getQueryContext() + ); + final KeySerde keySerde = keySerdeFactory.buildKeySerde( + leftFormats.getKeyFormat(), + leftPhysicalSchema, + queryContext + ); + final Joined joined = joinedFactory.create( + keySerde, + leftSerde, + rightSerde, + StreamsUtil.buildOpName(queryContext) + ); + final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); + final JoinWindows joinWindows = JoinWindows.of(join.getBefore()).after(join.getAfter()); + switch (join.getJoinType()) { + case LEFT: + return left.leftJoin(right, joiner, joinWindows, joined); + case OUTER: + return left.outerJoin(right, joiner, joinWindows, joined); + case INNER: + return left.join(right, joiner, joinWindows, joined); + default: + throw new IllegalStateException("invalid join type"); + } + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java new file mode 100644 index 000000000000..4d176bf07963 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilder.java @@ -0,0 +1,79 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.StreamTableJoin; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeySerde; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; + +public final class StreamTableJoinBuilder { + private static final String SERDE_CTX = "left"; + + private StreamTableJoinBuilder() { + } + + public static KStream build( + final KStream left, + final KTable right, + final StreamTableJoin join, + final KeySerdeFactory keySerdeFactory, + final KsqlQueryBuilder queryBuilder, + final JoinedFactory joinedFactory) { + final Formats leftFormats = join.getFormats(); + final QueryContext queryContext = join.getProperties().getQueryContext(); + final QueryContext.Stacker stacker = QueryContext.Stacker.of(queryContext); + final LogicalSchema leftSchema = join.getLeft().getProperties().getSchema(); + final PhysicalSchema leftPhysicalSchema = PhysicalSchema.from( + leftSchema.withoutAlias(), + leftFormats.getOptions() + ); + final Serde leftSerde = queryBuilder.buildValueSerde( + leftFormats.getValueFormat().getFormatInfo(), + leftPhysicalSchema, + stacker.push(SERDE_CTX).getQueryContext() + ); + final KeySerde keySerde = keySerdeFactory.buildKeySerde( + leftFormats.getKeyFormat(), + leftPhysicalSchema, + queryContext + ); + final Joined joined = joinedFactory.create( + keySerde, + leftSerde, + null, + StreamsUtil.buildOpName(queryContext) + ); + final LogicalSchema rightSchema = join.getRight().getProperties().getSchema(); + final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); + switch (join.getJoinType()) { + case LEFT: + return left.leftJoin(right, joiner, joined); + case INNER: + return left.join(right, joiner, joined); + default: + throw new IllegalStateException("invalid join type"); + } + } +} \ No newline at end of file diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java new file mode 100644 index 000000000000..e77ead5929cd --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableTableJoinBuilder.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.plan.TableTableJoin; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import org.apache.kafka.streams.kstream.KTable; + +public final class TableTableJoinBuilder { + private TableTableJoinBuilder() { + } + + public static KTable build( + final KTable left, + final KTable right, + final TableTableJoin join) { + final LogicalSchema leftSchema = join.getLeft().getProperties().getSchema(); + final LogicalSchema rightSchema = join.getRight().getProperties().getSchema(); + final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); + switch (join.getJoinType()) { + case LEFT: + return left.leftJoin(right, joiner); + case INNER: + return left.join(right, joiner); + case OUTER: + return left.outerJoin(right, joiner); + default: + throw new IllegalStateException("invalid join type"); + } + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/KsqlValueJoinerTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java similarity index 78% rename from ksql-engine/src/test/java/io/confluent/ksql/structured/KsqlValueJoinerTest.java rename to ksql-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java index 5735b884ef53..5f75ee2877dd 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/KsqlValueJoinerTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/KsqlValueJoinerTest.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.structured; +package io.confluent.ksql.execution.streams; import static org.junit.Assert.assertEquals; @@ -47,8 +47,7 @@ public void setUp() { @Test public void shouldJoinValueBothNonNull() { - final SchemaKStream.KsqlValueJoiner joiner = new SchemaKStream.KsqlValueJoiner(leftSchema, - rightSchema); + final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); final GenericRow joined = joiner.apply(leftRow, rightRow); final List expected = Arrays.asList(12L, "foobar", 20L, "baz"); @@ -57,8 +56,7 @@ public void shouldJoinValueBothNonNull() { @Test public void shouldJoinValueRightEmpty() { - final SchemaKStream.KsqlValueJoiner joiner = new SchemaKStream.KsqlValueJoiner(leftSchema, - rightSchema); + final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); final GenericRow joined = joiner.apply(leftRow, null); final List expected = Arrays.asList(12L, "foobar", null, null); @@ -67,8 +65,7 @@ public void shouldJoinValueRightEmpty() { @Test public void shouldJoinValueLeftEmpty() { - final SchemaKStream.KsqlValueJoiner joiner = new SchemaKStream.KsqlValueJoiner(leftSchema, - rightSchema); + final KsqlValueJoiner joiner = new KsqlValueJoiner(leftSchema, rightSchema); final GenericRow joined = joiner.apply(null, rightRow); final List expected = Arrays.asList(null, null, 20L, "baz"); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java new file mode 100644 index 000000000000..29c4bfd24112 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamStreamJoinBuilderTest.java @@ -0,0 +1,331 @@ +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.JoinType; +import io.confluent.ksql.execution.plan.StreamStreamJoin; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import java.time.Duration; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StreamStreamJoinBuilderTest { + private static final String LEFT = "LEFT"; + private static final String RIGHT = "RIGHT"; + private static final String ALIAS = "ALIAS"; + private static final LogicalSchema LEFT_SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.STRING) + .valueColumn("GREEN", SqlTypes.INTEGER) + .build() + .withAlias(LEFT) + .withMetaAndKeyColsInValue(); + private static final LogicalSchema RIGHT_SCHEMA = LogicalSchema.builder() + .valueColumn("RED", SqlTypes.BIGINT) + .valueColumn("ORANGE", SqlTypes.DOUBLE) + .build() + .withAlias(RIGHT) + .withMetaAndKeyColsInValue(); + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.STRING) + .valueColumn("GREEN", SqlTypes.STRING) + .valueColumn("RED", SqlTypes.BIGINT) + .valueColumn("ORANGE", SqlTypes.DOUBLE) + .build() + .withAlias(ALIAS) + .withMetaAndKeyColsInValue(); + private static final PhysicalSchema LEFT_PHYSICAL = + PhysicalSchema.from(LEFT_SCHEMA.withoutAlias(), SerdeOption.none()); + private static final PhysicalSchema RIGHT_PHYSICAL = + PhysicalSchema.from(RIGHT_SCHEMA.withoutAlias(), SerdeOption.none()); + private static final Formats LEFT_FMT = Formats.of( + KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), + ValueFormat.of(FormatInfo.of(Format.JSON)), + SerdeOption.none() + ); + private static final Formats RIGHT_FMT = Formats.of( + KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), + ValueFormat.of(FormatInfo.of(Format.AVRO)), + SerdeOption.none() + ); + private static final Duration BEFORE = Duration.ofMillis(1000); + private static final Duration AFTER = Duration.ofMillis(2000); + private static final JoinWindows WINDOWS = JoinWindows.of(BEFORE).after(AFTER); + private final QueryContext SRC_CTX = + new QueryContext.Stacker(new QueryId("qid")).push("src").getQueryContext(); + private final QueryContext CTX = + new QueryContext.Stacker(new QueryId("qid")).push("jo").push("in").getQueryContext(); + + @Mock + private KStream leftStream; + @Mock + private KStream rightStream; + @Mock + private KStream resultStream; + @Mock + private ExecutionStep> left; + @Mock + private ExecutionStep> right; + @Mock + private Joined joined; + @Mock + private JoinedFactory joinedFactory; + @Mock + private KsqlQueryBuilder queryBuilder; + @Mock + private KeySerdeFactory keySerdeFactory; + @Mock + private KeySerde keySerde; + @Mock + private Serde leftSerde; + @Mock + private Serde rightSerde; + + private StreamStreamJoin join; + + @Before + @SuppressWarnings("unchecked") + public void init() { + when(left.getProperties()).thenReturn( + new DefaultExecutionStepProperties(LEFT_SCHEMA, SRC_CTX)); + when(right.getProperties()).thenReturn( + new DefaultExecutionStepProperties(RIGHT_SCHEMA, SRC_CTX)); + when(keySerdeFactory.buildKeySerde(any(KeyFormat.class), any(), any())).thenReturn(keySerde); + when(queryBuilder.buildValueSerde(eq(FormatInfo.of(Format.JSON)), any(), any())) + .thenReturn(leftSerde); + when(queryBuilder.buildValueSerde(eq(FormatInfo.of(Format.AVRO)), any(), any())) + .thenReturn(rightSerde); + when(joinedFactory.create(any(Serde.class), any(), any(), any())).thenReturn(joined); + } + + @SuppressWarnings("unchecked") + private void givenLeftJoin() { + when(leftStream.leftJoin(any(KStream.class), any(), any(), any())).thenReturn(resultStream); + join = new StreamStreamJoin<>( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.LEFT, + LEFT_FMT, + RIGHT_FMT, + left, + right, + BEFORE, + AFTER + ); + } + + @SuppressWarnings("unchecked") + private void givenOuterJoin() { + when(leftStream.outerJoin(any(KStream.class), any(), any(), any())).thenReturn(resultStream); + join = new StreamStreamJoin<>( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.OUTER, + LEFT_FMT, + RIGHT_FMT, + left, + right, + BEFORE, + AFTER + ); + } + + @SuppressWarnings("unchecked") + private void givenInnerJoin() { + when(leftStream.join(any(KStream.class), any(), any(), any())).thenReturn(resultStream); + join = new StreamStreamJoin<>( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.INNER, + LEFT_FMT, + RIGHT_FMT, + left, + right, + BEFORE, + AFTER + ); + } + + @Test + public void shouldDoLeftJoin() { + // Given: + givenLeftJoin(); + + // When: + final KStream result = StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(leftStream).leftJoin( + same(rightStream), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)), + eq(WINDOWS), + same(joined) + ); + verifyNoMoreInteractions(leftStream, rightStream, resultStream); + assertThat(result, is(resultStream)); + } + + @Test + public void shouldDoOuterJoin() { + // Given: + givenOuterJoin(); + + // When: + final KStream result = StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(leftStream).outerJoin( + same(rightStream), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)), + eq(WINDOWS), + same(joined) + ); + verifyNoMoreInteractions(leftStream, rightStream, resultStream); + assertThat(result, is(resultStream)); + } + + @Test + public void shouldDoInnerJoin() { + // Given: + givenInnerJoin(); + + // When: + final KStream result = StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(leftStream).join( + same(rightStream), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)), + eq(WINDOWS), + same(joined) + ); + verifyNoMoreInteractions(leftStream, rightStream, resultStream); + assertThat(result, is(resultStream)); + } + + @Test + public void shouldBuildJoinedCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(joinedFactory).create(keySerde, leftSerde, rightSerde, "jo-in"); + } + + @Test + public void shouldBuildKeySerdeCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(keySerdeFactory).buildKeySerde(LEFT_FMT.getKeyFormat(), LEFT_PHYSICAL, CTX); + } + + @Test + public void shouldBuildLeftSerdeCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + final QueryContext leftCtx = QueryContext.Stacker.of(CTX).push("left").getQueryContext(); + verify(queryBuilder).buildValueSerde(FormatInfo.of(Format.JSON), LEFT_PHYSICAL, leftCtx); + } + + @Test + public void shouldBuildRightSerdeCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamStreamJoinBuilder.build( + leftStream, + rightStream, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + final QueryContext leftCtx = QueryContext.Stacker.of(CTX).push("right").getQueryContext(); + verify(queryBuilder).buildValueSerde(FormatInfo.of(Format.AVRO), RIGHT_PHYSICAL, leftCtx); + } +} \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java new file mode 100644 index 000000000000..e86940fa1567 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamTableJoinBuilderTest.java @@ -0,0 +1,282 @@ +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.JoinType; +import io.confluent.ksql.execution.plan.StreamTableJoin; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StreamTableJoinBuilderTest { + private static final String LEFT = "LEFT"; + private static final String RIGHT = "RIGHT"; + private static final String ALIAS = "ALIAS"; + private static final LogicalSchema LEFT_SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.STRING) + .valueColumn("GREEN", SqlTypes.INTEGER) + .build() + .withAlias(LEFT) + .withMetaAndKeyColsInValue(); + private static final LogicalSchema RIGHT_SCHEMA = LogicalSchema.builder() + .valueColumn("RED", SqlTypes.BIGINT) + .valueColumn("ORANGE", SqlTypes.DOUBLE) + .build() + .withAlias(RIGHT) + .withMetaAndKeyColsInValue(); + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.STRING) + .valueColumn("GREEN", SqlTypes.STRING) + .valueColumn("RED", SqlTypes.BIGINT) + .valueColumn("ORANGE", SqlTypes.DOUBLE) + .build() + .withAlias(ALIAS) + .withMetaAndKeyColsInValue(); + private static final PhysicalSchema LEFT_PHYSICAL = + PhysicalSchema.from(LEFT_SCHEMA.withoutAlias(), SerdeOption.none()); + private static final Formats LEFT_FMT = Formats.of( + KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), + ValueFormat.of(FormatInfo.of(Format.JSON)), + SerdeOption.none() + ); + private final QueryContext SRC_CTX = + new QueryContext.Stacker(new QueryId("qid")).push("src").getQueryContext(); + private final QueryContext CTX = + new QueryContext.Stacker(new QueryId("qid")).push("jo").push("in").getQueryContext(); + + @Mock + private KStream leftStream; + @Mock + private KTable rightTable; + @Mock + private KStream resultStream; + @Mock + private ExecutionStep> left; + @Mock + private ExecutionStep> right; + @Mock + private Joined joined; + @Mock + private JoinedFactory joinedFactory; + @Mock + private KsqlQueryBuilder queryBuilder; + @Mock + private KeySerdeFactory keySerdeFactory; + @Mock + private KeySerde keySerde; + @Mock + private Serde leftSerde; + + private StreamTableJoin join; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Before + @SuppressWarnings("unchecked") + public void init() { + when(left.getProperties()).thenReturn( + new DefaultExecutionStepProperties(LEFT_SCHEMA, SRC_CTX)); + when(right.getProperties()).thenReturn( + new DefaultExecutionStepProperties(RIGHT_SCHEMA, SRC_CTX)); + when(keySerdeFactory.buildKeySerde(any(KeyFormat.class), any(), any())).thenReturn(keySerde); + when(queryBuilder.buildValueSerde(eq(FormatInfo.of(Format.JSON)), any(), any())) + .thenReturn(leftSerde); + when(joinedFactory.create(any(Serde.class), any(), any(), any())).thenReturn(joined); + } + + @SuppressWarnings("unchecked") + private void givenLeftJoin() { + when(leftStream.leftJoin(any(KTable.class), any(), any())).thenReturn(resultStream); + join = new StreamTableJoin( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.LEFT, + LEFT_FMT, + left, + right + ); + } + + @SuppressWarnings("unchecked") + private void givenOuterJoin() { + join = new StreamTableJoin( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.OUTER, + LEFT_FMT, + left, + right + ); + } + + @SuppressWarnings("unchecked") + private void givenInnerJoin() { + when(leftStream.join(any(KTable.class), any(), any())).thenReturn(resultStream); + join = new StreamTableJoin( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.INNER, + LEFT_FMT, + left, + right + ); + } + + @Test + public void shouldDoLeftJoin() { + // Given: + givenLeftJoin(); + + // When: + final KStream result = StreamTableJoinBuilder.build( + leftStream, + rightTable, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(leftStream).leftJoin( + same(rightTable), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)), + same(joined) + ); + verifyNoMoreInteractions(leftStream, rightTable, resultStream); + assertThat(result, is(resultStream)); + } + + @Test + public void shoulFailOnOuterJoin() { + // Given: + givenOuterJoin(); + + // Then: + expectedException.expect(IllegalStateException.class); + + // When: + StreamTableJoinBuilder.build( + leftStream, + rightTable, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + } + + @Test + public void shouldDoInnerJoin() { + // Given: + givenInnerJoin(); + + // When: + final KStream result = StreamTableJoinBuilder.build( + leftStream, + rightTable, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(leftStream).join( + same(rightTable), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)), + same(joined) + ); + verifyNoMoreInteractions(leftStream, rightTable, resultStream); + assertThat(result, is(resultStream)); + } + + @Test + public void shouldBuildJoinedCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamTableJoinBuilder.build( + leftStream, + rightTable, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(joinedFactory).create(keySerde, leftSerde, null, "jo-in"); + } + + @Test + public void shouldBuildKeySerdeCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamTableJoinBuilder.build( + leftStream, + rightTable, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + verify(keySerdeFactory).buildKeySerde(LEFT_FMT.getKeyFormat(), LEFT_PHYSICAL, CTX); + } + + @Test + public void shouldBuildLeftSerdeCorrectly() { + // Given: + givenInnerJoin(); + + // When: + StreamTableJoinBuilder.build( + leftStream, + rightTable, + join, + keySerdeFactory, + queryBuilder, + joinedFactory + ); + + // Then: + final QueryContext leftCtx = QueryContext.Stacker.of(CTX).push("left").getQueryContext(); + verify(queryBuilder).buildValueSerde(FormatInfo.of(Format.JSON), LEFT_PHYSICAL, leftCtx); + } +} \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java new file mode 100644 index 000000000000..d49cc8e62061 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableTableJoinBuilderTest.java @@ -0,0 +1,171 @@ +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.JoinType; +import io.confluent.ksql.execution.plan.TableTableJoin; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KTable; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TableTableJoinBuilderTest { + private static final String LEFT = "LEFT"; + private static final String RIGHT = "RIGHT"; + private static final String ALIAS = "ALIAS"; + private static final LogicalSchema LEFT_SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.STRING) + .valueColumn("GREEN", SqlTypes.INTEGER) + .build() + .withAlias(LEFT) + .withMetaAndKeyColsInValue(); + private static final LogicalSchema RIGHT_SCHEMA = LogicalSchema.builder() + .valueColumn("RED", SqlTypes.BIGINT) + .valueColumn("ORANGE", SqlTypes.DOUBLE) + .build() + .withAlias(RIGHT) + .withMetaAndKeyColsInValue(); + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.STRING) + .valueColumn("GREEN", SqlTypes.STRING) + .valueColumn("RED", SqlTypes.BIGINT) + .valueColumn("ORANGE", SqlTypes.DOUBLE) + .build() + .withAlias(ALIAS) + .withMetaAndKeyColsInValue(); + private final QueryContext SRC_CTX = + new QueryContext.Stacker(new QueryId("qid")).push("src").getQueryContext(); + private final QueryContext CTX = + new QueryContext.Stacker(new QueryId("qid")).push("jo").push("in").getQueryContext(); + + @Mock + private KTable leftTable; + @Mock + private KTable rightTable; + @Mock + private KTable resultTable; + @Mock + private ExecutionStep> left; + @Mock + private ExecutionStep> right; + @Mock + private Joined joined; + @Mock + private JoinedFactory joinedFactory; + + private TableTableJoin join; + + @Before + @SuppressWarnings("unchecked") + public void init() { + when(left.getProperties()).thenReturn( + new DefaultExecutionStepProperties(LEFT_SCHEMA, SRC_CTX)); + when(right.getProperties()).thenReturn( + new DefaultExecutionStepProperties(RIGHT_SCHEMA, SRC_CTX)); + when(joinedFactory.create(any(Serde.class), any(), any(), any())).thenReturn(joined); + } + + @SuppressWarnings("unchecked") + private void givenLeftJoin() { + when(leftTable.leftJoin(any(KTable.class), any())).thenReturn(resultTable); + join = new TableTableJoin<>( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.LEFT, + left, + right + ); + } + + @SuppressWarnings("unchecked") + private void givenOuterJoin() { + when(leftTable.outerJoin(any(KTable.class), any())).thenReturn(resultTable); + join = new TableTableJoin<>( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.OUTER, + left, + right + ); + } + + @SuppressWarnings("unchecked") + private void givenInnerJoin() { + when(leftTable.join(any(KTable.class), any())).thenReturn(resultTable); + join = new TableTableJoin<>( + new DefaultExecutionStepProperties(SCHEMA, CTX), + JoinType.INNER, + left, + right + ); + } + + @Test + public void shouldDoLeftJoin() { + // Given: + givenLeftJoin(); + + // When: + final KTable result = TableTableJoinBuilder.build(leftTable, rightTable, join); + + // Then: + verify(leftTable).leftJoin( + same(rightTable), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)) + ); + verifyNoMoreInteractions(leftTable, rightTable, resultTable); + assertThat(result, is(resultTable)); + } + + @Test + public void shouldDoOuterJoin() { + // Given: + givenOuterJoin(); + + // When: + final KTable result = TableTableJoinBuilder.build(leftTable, rightTable, join); + + // Then: + verify(leftTable).outerJoin( + same(rightTable), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)) + ); + verifyNoMoreInteractions(leftTable, rightTable, resultTable); + assertThat(result, is(resultTable)); + } + + @Test + public void shouldDoInnerJoin() { + // Given: + givenInnerJoin(); + + // When: + final KTable result = TableTableJoinBuilder.build(leftTable, rightTable, join); + + // Then: + verify(leftTable).join( + same(rightTable), + eq(new KsqlValueJoiner(LEFT_SCHEMA, RIGHT_SCHEMA)) + ); + verifyNoMoreInteractions(leftTable, rightTable, resultTable); + assertThat(result, is(resultTable)); + } +} \ No newline at end of file