From cbca611dd9d339d8f1e724958b0a5e519e2a5e64 Mon Sep 17 00:00:00 2001 From: rodesai Date: Tue, 10 Sep 2019 21:13:25 -0700 Subject: [PATCH] feat: move toTable kstreams calls to plan builder This patch moves converting from a kstream to a ktable to the plan builder, and cleans up that code (and tests) from SchemaKStream and DataSourceNode --- .../ksql/planner/plan/DataSourceNode.java | 10 +- .../ksql/streams/GroupedFactory.java | 1 + .../confluent/ksql/streams/JoinedFactory.java | 1 + .../ksql/streams/StreamsFactories.java | 1 + .../ksql/structured/SchemaKGroupedStream.java | 4 +- .../ksql/structured/SchemaKGroupedTable.java | 4 +- .../ksql/structured/SchemaKStream.java | 52 ++-- .../ksql/structured/SchemaKTable.java | 2 +- .../ksql/planner/plan/DataSourceNodeTest.java | 16 +- .../structured/SchemaKGroupedStreamTest.java | 4 +- .../structured/SchemaKGroupedTableTest.java | 4 +- .../ksql/structured/SchemaKStreamTest.java | 59 +--- .../ksql/structured/SchemaKTableTest.java | 4 +- .../ksql/execution/plan/StreamToTable.java | 9 +- .../streams/MaterializedFactory.java | 2 +- .../streams/StreamToTableBuilder.java | 110 +++++++ .../ksql/execution}/streams/StreamsUtil.java | 4 +- .../streams/MaterializedFactoryTest.java | 3 +- .../streams/StreamToTableBuilderTest.java | 274 ++++++++++++++++++ 19 files changed, 440 insertions(+), 124 deletions(-) rename {ksql-engine/src/main/java/io/confluent/ksql => ksql-streams/src/main/java/io/confluent/ksql/execution}/streams/MaterializedFactory.java (98%) create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamToTableBuilder.java rename {ksql-engine/src/main/java/io/confluent/ksql => ksql-streams/src/main/java/io/confluent/ksql/execution}/streams/StreamsUtil.java (90%) rename {ksql-engine/src/test/java/io/confluent/ksql => ksql-streams/src/test/java/io/confluent/ksql/execution}/streams/MaterializedFactoryTest.java (96%) create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java index 6c92d4cf3e96..4d43e8c5a6e1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/DataSourceNode.java @@ -27,8 +27,6 @@ import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.structured.SchemaKStream; import java.util.List; @@ -145,12 +143,8 @@ public SchemaKStream buildStream(final KsqlQueryBuilder builder) { return schemaKStream.toTable( dataSource.getKsqlTopic().getKeyFormat(), dataSource.getKsqlTopic().getValueFormat(), - builder.buildValueSerde( - dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), - PhysicalSchema.from(getSchema(), SerdeOption.none()), - reduceContextStacker.getQueryContext() - ), - reduceContextStacker); + reduceContextStacker, + builder); } interface SchemaKStreamFactory { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/GroupedFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/streams/GroupedFactory.java index c8e90976dc7c..0b4a6abd56d9 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/GroupedFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/streams/GroupedFactory.java @@ -15,6 +15,7 @@ package io.confluent.ksql.streams; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Grouped; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java index bcfd856cfcea..b1985ce7f892 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/streams/JoinedFactory.java @@ -15,6 +15,7 @@ package io.confluent.ksql.streams; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Joined; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java b/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java index 7a420cd20651..1d31351e5826 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsFactories.java @@ -15,6 +15,7 @@ package io.confluent.ksql.streams; +import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.util.KsqlConfig; import java.util.Objects; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java index 4cf8d2c111d3..221a3b9e1a2b 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java @@ -21,6 +21,8 @@ import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.streams.ExecutionStepFactory; +import io.confluent.ksql.execution.streams.MaterializedFactory; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.function.UdafAggregator; @@ -38,8 +40,6 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.serde.WindowInfo; -import io.confluent.ksql.streams.MaterializedFactory; -import io.confluent.ksql.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import java.time.Duration; import java.util.List; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java index a81dc4909eac..5470b996efbb 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedTable.java @@ -21,6 +21,8 @@ import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.streams.ExecutionStepFactory; +import io.confluent.ksql.execution.streams.MaterializedFactory; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.function.TableAggregationFunction; @@ -33,8 +35,6 @@ import io.confluent.ksql.serde.KeySerde; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.streams.MaterializedFactory; -import io.confluent.ksql.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.util.List; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 6fa99e2b1bcb..9d1f76fb0b3f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -39,9 +39,12 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.StreamMapValues; import io.confluent.ksql.execution.plan.StreamSource; +import io.confluent.ksql.execution.plan.StreamToTable; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.StreamMapValuesBuilder; import io.confluent.ksql.execution.streams.StreamSourceBuilder; +import io.confluent.ksql.execution.streams.StreamToTableBuilder; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.model.DataSource; @@ -56,7 +59,6 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.streams.StreamsFactories; -import io.confluent.ksql.streams.StreamsUtil; import io.confluent.ksql.util.IdentifierUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.SchemaUtil; @@ -67,7 +69,6 @@ import java.util.Optional; import java.util.Set; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.Topology.AutoOffsetReset; import org.apache.kafka.streams.kstream.Grouped; @@ -75,11 +76,9 @@ import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.state.KeyValueStore; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling @SuppressWarnings("OptionalUsedAsFieldOrParameterType") @@ -223,41 +222,26 @@ public static SchemaKStream forSource( this.streamsFactories = requireNonNull(streamsFactories); } + @SuppressWarnings("unchecked") public SchemaKTable toTable( final KeyFormat keyFormat, final ValueFormat valueFormat, - final Serde valueSerde, - final QueryContext.Stacker contextStacker + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder ) { - final Materialized> materialized = - streamsFactories.getMaterializedFactory().create( - keySerde, - valueSerde, - StreamsUtil.buildOpName(contextStacker.getQueryContext())); - final KTable ktable = kstream - // 1. mapValues to transform null records into Optional.EMPTY. We eventually - // need to aggregate the KStream to produce the KTable. However the KStream aggregator - // filters out records with null keys or values. For tables, a null value for a key - // represents that the key was deleted. So we preserve these "tombstone" records by - // converting them to a not-null representation. - .mapValues(Optional::ofNullable) - - // 2. Group by the key, so that we can: - .groupByKey() - - // 3. Aggregate the KStream into a KTable using a custom aggregator that handles - // Optional.EMPTY - .aggregate( - () -> null, - (k, value, oldValue) -> value.orElse(null), - materialized); - final ExecutionStep> step = ExecutionStepFactory.streamToTable( - contextStacker, - Formats.of(keyFormat, valueFormat, Collections.emptySet()), - sourceStep - ); + final StreamToTable, KTable> step = + ExecutionStepFactory.streamToTable( + contextStacker, + Formats.of(keyFormat, valueFormat, Collections.emptySet()), + sourceStep + ); return new SchemaKTable<>( - ktable, + StreamToTableBuilder.build( + (KStream) kstream, + (StreamToTable) step, + queryBuilder, + streamsFactories.getMaterializedFactory() + ), step, keyFormat, keySerde, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 4ab1c388007d..d8e7a37565d7 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -27,6 +27,7 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.TableMapValues; import io.confluent.ksql.execution.streams.ExecutionStepFactory; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.execution.streams.TableMapValuesBuilder; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; @@ -40,7 +41,6 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.streams.StreamsFactories; -import io.confluent.ksql.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import java.util.ArrayList; import java.util.Collections; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index fcfc9a4c2fb8..e639af2a279b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -532,9 +532,8 @@ public void shouldBuildTableByConvertingFromStream() { assertThat(returned, is(table)); } - @Test @SuppressWarnings("unchecked") - public void shouldBuildReduceSerdeCorrectlyWhenBuildingTable() { + public void shouldPassBuilderWhenBuildingTable() { // Given: final DataSourceNode node = buildNodeWithMockSource(); @@ -542,16 +541,7 @@ public void shouldBuildReduceSerdeCorrectlyWhenBuildingTable() { node.buildStream(ksqlStreamBuilder); // Then: - verify(ksqlStreamBuilder).buildValueSerde( - eq(FormatInfo.of(Format.JSON)), - eq(PhysicalSchema.from(node.getSchema(), SerdeOption.none())), - queryContextCaptor.capture() - ); - assertThat( - queryContextCaptor.getValue().getContext(), - equalTo(ImmutableList.of("0", "reduce")) - ); - verify(stream).toTable(any(), any(), same(rowSerde), any()); + verify(stream).toTable(any(), any(), any(), same(ksqlStreamBuilder)); } @Test @@ -564,7 +554,7 @@ public void shouldBuildTableWithCorrectContext() { node.buildStream(ksqlStreamBuilder); // Then: - verify(stream).toTable(any(), any(), any(), stackerCaptor.capture()); + verify(stream).toTable(any(), any(), stackerCaptor.capture(), any()); assertThat( stackerCaptor.getValue().getQueryContext().getContext(), equalTo(ImmutableList.of("0", "reduce"))); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index 0da19d95805a..0af9f1e5937d 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -54,8 +54,8 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.serde.WindowInfo; -import io.confluent.ksql.streams.MaterializedFactory; -import io.confluent.ksql.streams.StreamsUtil; +import io.confluent.ksql.execution.streams.MaterializedFactory; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.util.KsqlConfig; import java.time.Duration; import java.util.Collections; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java index 67b035c65a12..b463b749688e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java @@ -65,8 +65,8 @@ import io.confluent.ksql.serde.KeySerde; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.streams.MaterializedFactory; -import io.confluent.ksql.streams.StreamsUtil; +import io.confluent.ksql.execution.streams.MaterializedFactory; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.testutils.AnalysisTestUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index c1d2d1ee3a8e..fbf173874d66 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -78,9 +78,9 @@ import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.streams.GroupedFactory; import io.confluent.ksql.streams.JoinedFactory; -import io.confluent.ksql.streams.MaterializedFactory; +import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.streams.StreamsFactories; -import io.confluent.ksql.streams.StreamsUtil; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.structured.SchemaKStream.Type; import io.confluent.ksql.testutils.AnalysisTestUtil; import io.confluent.ksql.util.KsqlConfig; @@ -734,8 +734,8 @@ public void shouldBuildStepForToTable() { final SchemaKTable result = initialSchemaKStream.toTable( keyFormat, valueFormat, - leftSerde, - childContextStacker + childContextStacker, + queryBuilder ); // Then: @@ -765,8 +765,8 @@ public void shouldConvertToTableWithCorrectProperties() { final SchemaKTable result = initialSchemaKStream.toTable( keyFormat, valueFormat, - leftSerde, - childContextStacker + childContextStacker, + queryBuilder ); // Then: @@ -776,53 +776,6 @@ public void shouldConvertToTableWithCorrectProperties() { assertThat(result.getKtable(), is(table)); } - @Test - public void shouldConvertToOptionalBeforeGroupingInToTable() { - // Given: - givenInitialSchemaKStreamUsesMocks(); - when(mockKStream.mapValues(any(ValueMapper.class))).thenReturn(mockKStream); - final KGroupedStream groupedStream = mock(KGroupedStream.class); - final KTable table = mock(KTable.class); - when(mockKStream.groupByKey()).thenReturn(groupedStream); - when(groupedStream.aggregate(any(), any(), any())).thenReturn(table); - - // When: - initialSchemaKStream.toTable(keyFormat, valueFormat, leftSerde, childContextStacker); - - // Then: - InOrder inOrder = Mockito.inOrder(mockKStream); - final ArgumentCaptor captor = ArgumentCaptor.forClass(ValueMapper.class); - inOrder.verify(mockKStream).mapValues(captor.capture()); - inOrder.verify(mockKStream).groupByKey(); - assertThat(captor.getValue().apply(null), equalTo(Optional.empty())); - final GenericRow nonNull = new GenericRow(1, 2, 3); - assertThat(captor.getValue().apply(nonNull), equalTo(Optional.of(nonNull))); - } - - @Test - public void shouldComputeAggregateCorrectlyInToTable() { - // Given: - givenInitialSchemaKStreamUsesMocks(); - when(mockKStream.mapValues(any(ValueMapper.class))).thenReturn(mockKStream); - final KGroupedStream groupedStream = mock(KGroupedStream.class); - final KTable table = mock(KTable.class); - when(mockKStream.groupByKey()).thenReturn(groupedStream); - when(groupedStream.aggregate(any(), any(), any())).thenReturn(table); - - // When: - initialSchemaKStream.toTable(keyFormat, valueFormat, leftSerde, childContextStacker); - - // Then: - final ArgumentCaptor initCaptor = ArgumentCaptor.forClass(Initializer.class); - final ArgumentCaptor captor = ArgumentCaptor.forClass(Aggregator.class); - verify(groupedStream) - .aggregate(initCaptor.capture(), captor.capture(), same(mockMaterialized)); - assertThat(initCaptor.getValue().apply(), is(nullValue())); - assertThat(captor.getValue().apply(null, Optional.empty(), null), is(nullValue())); - final GenericRow nonNull = new GenericRow(1, 2, 3); - assertThat(captor.getValue().apply(null, Optional.of(nonNull), null), is(nonNull)); - } - @SuppressWarnings("unchecked") @Test public void shouldPerformStreamToStreamLeftJoin() { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index b848ae337749..db36fe8e405f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -74,9 +74,9 @@ import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.streams.GroupedFactory; import io.confluent.ksql.streams.JoinedFactory; -import io.confluent.ksql.streams.MaterializedFactory; +import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.streams.StreamsFactories; -import io.confluent.ksql.streams.StreamsUtil; +import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.structured.SchemaKStream.Type; import io.confluent.ksql.testutils.AnalysisTestUtil; import io.confluent.ksql.util.KsqlConfig; diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java index 21e54bbf2809..b1b6bb63459e 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamToTable.java @@ -40,12 +40,19 @@ public ExecutionStepProperties getProperties() { return properties; } - @Override public List> getSources() { return ImmutableList.of(source); } + public ExecutionStep getSource() { + return source; + } + + public Formats getFormats() { + return formats; + } + @Override public T build(final KsqlQueryBuilder builder) { throw new UnsupportedOperationException(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/MaterializedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java similarity index 98% rename from ksql-engine/src/main/java/io/confluent/ksql/streams/MaterializedFactory.java rename to ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java index 46ea95698b0c..ada79f5fe82d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/MaterializedFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.streams; +package io.confluent.ksql.execution.streams; import io.confluent.ksql.GenericRow; import io.confluent.ksql.util.KsqlConfig; diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamToTableBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamToTableBuilder.java new file mode 100644 index 000000000000..895fdcf62f0c --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamToTableBuilder.java @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.StreamToTable; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; +import io.confluent.ksql.serde.ValueFormat; +import java.util.Optional; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueStore; + +public final class StreamToTableBuilder { + private StreamToTableBuilder() { + } + + public static KTable build( + final KStream sourceStream, + final StreamToTable, KTable> streamToTable, + final KsqlQueryBuilder queryBuilder, + final MaterializedFactory materializedFactory) { + final QueryContext queryContext = streamToTable.getProperties().getQueryContext(); + final ExecutionStep> sourceStep = streamToTable.getSource(); + final PhysicalSchema physicalSchema = PhysicalSchema.from( + sourceStep.getProperties().getSchema(), + streamToTable.getFormats().getOptions() + ); + final ValueFormat valueFormat = streamToTable.getFormats().getValueFormat(); + final Serde valueSerde = queryBuilder.buildValueSerde( + valueFormat.getFormatInfo(), + physicalSchema, + queryContext + ); + final KeyFormat keyFormat = streamToTable.getFormats().getKeyFormat(); + final KeySerde keySerde = buildKeySerde( + keyFormat, + queryBuilder, + physicalSchema, + queryContext + ); + final Materialized> materialized = + materializedFactory.create( + keySerde, + valueSerde, + StreamsUtil.buildOpName(queryContext) + ); + return sourceStream + // 1. mapValues to transform null records into Optional.EMPTY. We eventually + // need to aggregate the KStream to produce the KTable. However the KStream aggregator + // filters out records with null keys or values. For tables, a null value for a key + // represents that the key was deleted. So we preserve these "tombstone" records by + // converting them to a not-null representation. + .mapValues(Optional::ofNullable) + + // 2. Group by the key, so that we can: + .groupByKey() + + // 3. Aggregate the KStream into a KTable using a custom aggregator that handles + // Optional.EMPTY + .aggregate( + () -> null, + (k, value, oldValue) -> value.orElse(null), + materialized); + } + + @SuppressWarnings("unchecked") + private static KeySerde buildKeySerde( + final KeyFormat keyFormat, + final KsqlQueryBuilder queryBuilder, + final PhysicalSchema physicalSchema, + final QueryContext queryContext + ) { + if (keyFormat.isWindowed()) { + return (KeySerde) queryBuilder.buildKeySerde( + keyFormat.getFormatInfo(), + keyFormat.getWindowInfo().get(), + physicalSchema, + queryContext + ); + } else { + return (KeySerde) queryBuilder.buildKeySerde( + keyFormat.getFormatInfo(), + physicalSchema, + queryContext + ); + } + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsUtil.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java similarity index 90% rename from ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsUtil.java rename to ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java index 1f55007fc9ce..e1f2d884a738 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/streams/StreamsUtil.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.streams; +package io.confluent.ksql.execution.streams; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.util.KsqlConfig; @@ -23,7 +23,7 @@ public final class StreamsUtil { private StreamsUtil() { } - static boolean useProvidedName(final KsqlConfig ksqlConfig) { + public static boolean useProvidedName(final KsqlConfig ksqlConfig) { return Objects.equals( ksqlConfig.getString(KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS), KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_ON diff --git a/ksql-engine/src/test/java/io/confluent/ksql/streams/MaterializedFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java similarity index 96% rename from ksql-engine/src/test/java/io/confluent/ksql/streams/MaterializedFactoryTest.java rename to ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index 63e1fddbe2cc..50ca781bdf24 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/streams/MaterializedFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.streams; +package io.confluent.ksql.execution.streams; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java new file mode 100644 index 000000000000..5952dbb4c25b --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamToTableBuilderTest.java @@ -0,0 +1,274 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.StreamToTable; +import io.confluent.ksql.model.WindowType; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import io.confluent.ksql.serde.WindowInfo; +import java.time.Duration; +import java.util.Optional; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.hamcrest.MatcherAssert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class StreamToTableBuilderTest { + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("PING", SqlTypes.STRING) + .valueColumn("PONG", SqlTypes.INTEGER) + .build() + .withAlias("PADDLE") + .withMetaAndKeyColsInValue(); + + @Mock + private KStream kStream; + @Mock + private MaterializedFactory materializedFactory; + @Mock + private Materialized materialized; + @Mock + private KGroupedStream kGroupedStream; + @Mock + private KTable kTable; + @Mock + private KsqlQueryBuilder ksqlQueryBuilder; + @Mock + private KeySerde keySerde; + @Mock + private KeySerde> windowedKeySerde; + @Mock + private Serde valueSerde; + @Mock + private ExecutionStep> source; + + private final QueryContext.Stacker stacker = new QueryContext.Stacker(new QueryId("qid")); + private final QueryContext queryContext = stacker.push("s2t").getQueryContext(); + private final ValueFormat valueFormat = ValueFormat.of(FormatInfo.of(Format.JSON)); + private final KeyFormat keyFormat = KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)); + private final KeyFormat windowedKeyFormat = KeyFormat.windowed( + FormatInfo.of(Format.KAFKA), + WindowInfo.of(WindowType.TUMBLING, Optional.of(Duration.ofSeconds(10))) + ); + private final PhysicalSchema physicalSchema = PhysicalSchema.from( + SCHEMA, + SerdeOption.none() + ); + + private StreamToTable, KTable> step; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + when(source.getProperties()).thenReturn( + new DefaultExecutionStepProperties(SCHEMA, stacker.push("source").getQueryContext()) + ); + when(materializedFactory.create(any(), any(), any())) + .thenReturn(materialized); + when(kStream.mapValues(any(ValueMapper.class))).thenReturn(kStream); + when(kStream.groupByKey()).thenReturn(kGroupedStream); + when(kGroupedStream.aggregate(any(), any(), any())).thenReturn(kTable); + when(ksqlQueryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); + } + + private void givenWindowed() { + step = new StreamToTable<>( + source, + Formats.of(windowedKeyFormat, valueFormat, SerdeOption.none()), + new DefaultExecutionStepProperties(SCHEMA, queryContext) + ); + when(ksqlQueryBuilder.buildKeySerde(any(), any(), any(), any())).thenReturn(windowedKeySerde); + } + + private void givenUnwindowed() { + step = new StreamToTable<>( + source, + Formats.of(keyFormat, valueFormat, SerdeOption.none()), + new DefaultExecutionStepProperties(SCHEMA, queryContext) + ); + when(ksqlQueryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldConvertToTableCorrectly() { + // Given: + givenUnwindowed(); + + // When: + final KTable result = StreamToTableBuilder.build( + kStream, + step, + ksqlQueryBuilder, + materializedFactory + ); + + // Then: + final InOrder inOrder = Mockito.inOrder(kStream); + inOrder.verify(kStream).mapValues(any(ValueMapper.class)); + inOrder.verify(kStream).groupByKey(); + verify(kGroupedStream).aggregate(any(), any(), same(materialized)); + assertThat(result, is(kTable)); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldBuildKeySerdeCorrectlyForWindowedKey() { + // Given: + givenWindowed(); + + // When: + StreamToTableBuilder.build(kStream, step, ksqlQueryBuilder, materializedFactory); + + // Then: + verify(ksqlQueryBuilder).buildKeySerde( + windowedKeyFormat.getFormatInfo(), + windowedKeyFormat.getWindowInfo().get(), + physicalSchema, + queryContext + ); + verify(materializedFactory).create(same(windowedKeySerde), any(), any()); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldBuildKeySerdeCorrectlyForUnwindowedKey() { + // Given: + givenUnwindowed(); + + // When: + StreamToTableBuilder.build(kStream, step, ksqlQueryBuilder, materializedFactory); + + // Then: + verify(ksqlQueryBuilder).buildKeySerde( + keyFormat.getFormatInfo(), + physicalSchema, + queryContext + ); + verify(materializedFactory).create(same(keySerde), any(), any()); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldBuildValueSerdeCorrectly() { + // Given: + givenUnwindowed(); + + // When: + StreamToTableBuilder.build(kStream, step, ksqlQueryBuilder, materializedFactory); + + // Then: + verify(ksqlQueryBuilder).buildValueSerde( + valueFormat.getFormatInfo(), + physicalSchema, + queryContext + ); + verify(materializedFactory).create(any(), same(valueSerde), any()); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldUseCorrectNameForMaterialized() { + // Given: + givenUnwindowed(); + + // When: + StreamToTableBuilder.build(kStream, step, ksqlQueryBuilder, materializedFactory); + + // Then: + verify(materializedFactory).create(any(), any(), eq(StreamsUtil.buildOpName(queryContext))); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldConvertToOptionalBeforeGroupingInToTable() { + // Given: + givenUnwindowed(); + + // When: + StreamToTableBuilder.build(kStream, step, ksqlQueryBuilder, materializedFactory); + + // Then: + final ArgumentCaptor captor = ArgumentCaptor.forClass(ValueMapper.class); + verify(kStream).mapValues(captor.capture()); + MatcherAssert.assertThat(captor.getValue().apply(null), equalTo(Optional.empty())); + final GenericRow nonNull = new GenericRow(1, 2, 3); + MatcherAssert.assertThat(captor.getValue().apply(nonNull), equalTo(Optional.of(nonNull))); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldComputeAggregateCorrectlyInToTable() { + // Given: + givenUnwindowed(); + + // When: + StreamToTableBuilder.build(kStream, step, ksqlQueryBuilder, materializedFactory); + + // Then: + final ArgumentCaptor initCaptor = ArgumentCaptor.forClass(Initializer.class); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Aggregator.class); + verify(kGroupedStream).aggregate(initCaptor.capture(), captor.capture(), any()); + assertThat(initCaptor.getValue().apply(), is(nullValue())); + assertThat(captor.getValue().apply(null, Optional.empty(), null), is(nullValue())); + final GenericRow nonNull = new GenericRow(1, 2, 3); + assertThat(captor.getValue().apply(null, Optional.of(nonNull), null), is(nonNull)); + } +} \ No newline at end of file