From 0e09cd57830a414ce661ac5b8727fe6bf4aaccdb Mon Sep 17 00:00:00 2001 From: rodesai Date: Sun, 15 Sep 2019 21:46:20 -0700 Subject: [PATCH 1/3] feat: move setup of the sink to plan builders This patch moves stream and table sinks (SchemaKX.into) into execution plan builders. This change also moves computation of the rowkey/rowtime indexes to be excluded from the output into the plan builder. --- .../plan/KsqlStructuredDataOutputNode.java | 59 +----- .../ksql/structured/SchemaKGroupedStream.java | 9 + .../ksql/structured/SchemaKStream.java | 30 ++- .../ksql/structured/SchemaKTable.java | 34 ++- .../ksql/planner/plan/DataSourceNodeTest.java | 3 +- .../KsqlStructuredDataOutputNodeTest.java | 95 +-------- .../ksql/execution/plan/StreamSink.java | 15 +- .../ksql/execution/plan/TableSink.java | 14 +- .../ksql/execution/util/SinkSchemaUtil.java | 55 +++++ .../execution/util/SinkSchemaUtilTest.java | 90 ++++++++ .../streams/ExecutionStepFactory.java | 4 +- .../execution/streams/KeySerdeFactory.java | 28 +++ .../execution/streams/StreamSinkBuilder.java | 74 +++++++ .../execution/streams/TableSinkBuilder.java | 75 +++++++ .../streams/StreamSinkBuilderTest.java | 192 +++++++++++++++++ .../streams/TableSinkBuilderTest.java | 197 ++++++++++++++++++ 16 files changed, 776 insertions(+), 198 deletions(-) create mode 100644 ksql-execution/src/main/java/io/confluent/ksql/execution/util/SinkSchemaUtil.java create mode 100644 ksql-execution/src/test/java/io/confluent/ksql/execution/util/SinkSchemaUtilTest.java create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KeySerdeFactory.java create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSinkBuilder.java create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSinkBuilder.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java index 53d5c9ddf306..4542f865a85a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.java @@ -19,35 +19,21 @@ import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Streams; -import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; -import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.serde.KeySerde; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.structured.SchemaKStream; -import io.confluent.ksql.structured.SchemaKStream.Type; import io.confluent.ksql.structured.SchemaKTable; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.QueryIdGenerator; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.connect.data.ConnectSchema; -import org.apache.kafka.streams.kstream.KStream; public class KsqlStructuredDataOutputNode extends OutputNode { @@ -56,7 +42,6 @@ public class KsqlStructuredDataOutputNode extends OutputNode { private final Optional partitionByField; private final boolean doCreateInto; private final Set serdeOptions; - private final Set implicitAndKeyFieldIndexes; public KsqlStructuredDataOutputNode( final PlanNodeId id, @@ -88,7 +73,6 @@ public KsqlStructuredDataOutputNode( this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic"); this.partitionByField = Objects.requireNonNull(partitionByField, "partitionByField"); this.doCreateInto = doCreateInto; - this.implicitAndKeyFieldIndexes = implicitAndKeyColumnIndexesInValueSchema(schema); validatePartitionByField(); } @@ -134,20 +118,13 @@ public SchemaKStream buildStream(final KsqlQueryBuilder builder) { contextStacker ); - final Serde outputRowSerde = builder.buildValueSerde( - getKsqlTopic().getValueFormat().getFormatInfo(), - PhysicalSchema.from(getSchema(), serdeOptions), - contextStacker.getQueryContext() - ); - return result.into( getKsqlTopic().getKafkaTopicName(), - outputRowSerde, getSchema(), getKsqlTopic().getValueFormat(), serdeOptions, - implicitAndKeyFieldIndexes, - contextStacker + contextStacker, + builder ); } @@ -190,36 +167,4 @@ private void validatePartitionByField() { throw new IllegalArgumentException("keyField must match partition by field"); } } - - @SuppressWarnings("UnstableApiUsage") - private static Set implicitAndKeyColumnIndexesInValueSchema(final LogicalSchema schema) { - final ConnectSchema valueSchema = schema.valueConnectSchema(); - - final Stream cols = Streams.concat( - schema.metadata().stream(), - schema.key().stream() - ); - - return cols - .map(Column::name) - .map(valueSchema::field) - .filter(Objects::nonNull) - .map(org.apache.kafka.connect.data.Field::index) - .collect(Collectors.toSet()); - } - - interface SinkFactory { - - SchemaKStream create( - KStream kstream, - LogicalSchema schema, - KeySerde keySerde, - KeyField keyField, - List sourceSchemaKStreams, - Type type, - KsqlConfig ksqlConfig, - FunctionRegistry functionRegistry, - QueryContext queryContext - ); - } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java index 221a3b9e1a2b..2f73d240cec1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java @@ -232,6 +232,15 @@ private KTable aggregateWindowed( } private KeyFormat getKeyFormat(final WindowExpression windowExpression) { + if (ksqlConfig.getBoolean(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG)) { + return KeyFormat.windowed( + FormatInfo.of(Format.KAFKA), + WindowInfo.of( + WindowType.TUMBLING, + Optional.of(Duration.ofMillis(Long.MAX_VALUE)) + ) + ); + } return KeyFormat.windowed( FormatInfo.of(Format.KAFKA), windowExpression.getKsqlWindowExpression().getWindowInfo() diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 8303bd03b01e..09e048fcc39e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -39,6 +39,7 @@ import io.confluent.ksql.execution.plan.StreamGroupByKey; import io.confluent.ksql.execution.plan.StreamMapValues; import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.plan.StreamSink; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamToTable; import io.confluent.ksql.execution.streams.ExecutionStepFactory; @@ -46,6 +47,7 @@ import io.confluent.ksql.execution.streams.StreamGroupByBuilder; import io.confluent.ksql.execution.streams.StreamMapValuesBuilder; import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder; +import io.confluent.ksql.execution.streams.StreamSinkBuilder; import io.confluent.ksql.execution.streams.StreamSourceBuilder; import io.confluent.ksql.execution.streams.StreamToTableBuilder; import io.confluent.ksql.execution.streams.StreamsUtil; @@ -80,7 +82,6 @@ import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; @@ -273,35 +274,28 @@ public SchemaKStream withKeyField(final KeyField resultKeyField) { ); } + @SuppressWarnings("unchecked") public SchemaKStream into( final String kafkaTopicName, - final Serde topicValueSerDe, final LogicalSchema outputSchema, final ValueFormat valueFormat, final Set options, - final Set rowkeyIndexes, - final QueryContext.Stacker contextStacker + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder queryBuilder ) { - kstream - .mapValues(row -> { - if (row == null) { - return null; - } - final List columns = new ArrayList<>(); - for (int i = 0; i < row.getColumns().size(); i++) { - if (!rowkeyIndexes.contains(i)) { - columns.add(row.getColumns().get(i)); - } - } - return new GenericRow(columns); - }).to(kafkaTopicName, Produced.with(keySerde, topicValueSerDe)); - final ExecutionStep> step = ExecutionStepFactory.streamSink( + final StreamSink step = ExecutionStepFactory.streamSink( contextStacker, outputSchema, Formats.of(keyFormat, valueFormat, options), sourceStep, kafkaTopicName ); + StreamSinkBuilder.build( + kstream, + step, + (fmt, schema, ctx) -> keySerde, + queryBuilder + ); return new SchemaKStream<>( kstream, step, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 037c5e7edb12..a9832894499e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -27,10 +27,12 @@ import io.confluent.ksql.execution.plan.TableFilter; import io.confluent.ksql.execution.plan.TableGroupBy; import io.confluent.ksql.execution.plan.TableMapValues; +import io.confluent.ksql.execution.plan.TableSink; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.TableFilterBuilder; import io.confluent.ksql.execution.streams.TableGroupByBuilder; import io.confluent.ksql.execution.streams.TableMapValuesBuilder; +import io.confluent.ksql.execution.streams.TableSinkBuilder; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; @@ -44,7 +46,6 @@ import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.streams.StreamsFactories; import io.confluent.ksql.util.KsqlConfig; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -54,7 +55,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Produced; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class SchemaKTable extends SchemaKStream { @@ -117,38 +117,28 @@ public SchemaKTable( } @Override + @SuppressWarnings("unchecked") public SchemaKTable into( final String kafkaTopicName, - final Serde topicValueSerDe, final LogicalSchema outputSchema, final ValueFormat valueFormat, final Set options, - final Set rowkeyIndexes, - final QueryContext.Stacker contextStacker + final QueryContext.Stacker contextStacker, + final KsqlQueryBuilder builder ) { - - ktable.toStream() - .mapValues(row -> { - if (row == null) { - return null; - } - final List columns = new ArrayList<>(); - for (int i = 0; i < row.getColumns().size(); i++) { - if (!rowkeyIndexes.contains(i)) { - columns.add(row.getColumns().get(i)); - } - } - return new GenericRow(columns); - } - ).to(kafkaTopicName, Produced.with(keySerde, topicValueSerDe)); - - final ExecutionStep> step = ExecutionStepFactory.tableSink( + final TableSink step = ExecutionStepFactory.tableSink( contextStacker, outputSchema, sourceTableStep, Formats.of(keyFormat, valueFormat, options), kafkaTopicName ); + TableSinkBuilder.build( + ktable, + step, + (fmt, schema, ctx) -> keySerde, + builder + ); return new SchemaKTable<>( ktable, step, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index e639af2a279b..85eb414b43b0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -190,7 +190,8 @@ public void before() { new QueryContext.Stacker(queryId) .push(inv.getArgument(0).toString())); - when(ksqlStreamBuilder.buildKeySerde(any(), any(), any())).thenReturn((KeySerde)keySerde); + when(ksqlStreamBuilder.buildKeySerde(any(), any(), any())) + .thenReturn((KeySerde)keySerde); when(ksqlStreamBuilder.buildKeySerde(any(), any(), any(), any())).thenReturn((KeySerde)keySerde); when(ksqlStreamBuilder.buildValueSerde(any(), any(), any())).thenReturn(rowSerde); when(ksqlStreamBuilder.getFunctionRegistry()).thenReturn(functionRegistry); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index aa27a3ca2ea1..ae0b0a080f40 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -91,15 +91,11 @@ public class KsqlStructuredDataOutputNodeTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); - @Mock - private KsqlConfig ksqlConfig; @Mock private QueryIdGenerator queryIdGenerator; @Mock private KsqlQueryBuilder ksqlStreamBuilder; @Mock - private FunctionRegistry functionRegistry; - @Mock private PlanNode sourceNode; @Mock private SchemaKStream sourceStream; @@ -112,18 +108,10 @@ public class KsqlStructuredDataOutputNodeTest { @Mock private SchemaKStream sinkStreamWithKeySelected; @Mock - private KStream kstream; - @Mock private KsqlTopic ksqlTopic; - @Mock - private Serde rowSerde; - @Captor - private ArgumentCaptor queryContextCaptor; @Captor private ArgumentCaptor stackerCaptor; - private final Set serdeOptions = SerdeOption.none(); - private KsqlStructuredDataOutputNode outputNode; private LogicalSchema schema; private Optional partitionBy; @@ -145,14 +133,13 @@ public void before() { when(sourceStream.withKeyField(any())) .thenReturn(resultStream); - when(resultStream.into(any(), any(), any(), any(), any(), any(), any())) + when(resultStream.into(any(), any(), any(), any(), any(), any())) .thenReturn((SchemaKStream) sinkStream); when(resultStream.selectKey(any(), anyBoolean(), any())) .thenReturn((SchemaKStream) resultWithKeySelected); - when(resultWithKeySelected.into(any(), any(), any(), any(), any(), any(), any())) + when(resultWithKeySelected.into(any(), any(), any(), any(), any(), any())) .thenReturn((SchemaKStream) sinkStreamWithKeySelected); - when(ksqlStreamBuilder.buildValueSerde(any(), any(), any())).thenReturn(rowSerde); when(ksqlStreamBuilder.buildNodeContext(any())).thenAnswer(inv -> new QueryContext.Stacker(QUERY_ID) .push(inv.getArgument(0).toString())); @@ -334,27 +321,7 @@ public void shouldBuildOutputNodeForInsertIntoAvroFromNonAvro() { outputNode.buildStream(ksqlStreamBuilder); // Then: - verify(ksqlStreamBuilder).buildValueSerde( - eq(valueFormat.getFormatInfo()), - any(), - any() - ); - } - - @Test - public void shouldBuildRowSerdeCorrectly() { - // When: - outputNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(ksqlStreamBuilder).buildValueSerde( - eq(FormatInfo.of(Format.JSON)), - eq(PhysicalSchema.from(SCHEMA, serdeOptions)), - queryContextCaptor.capture() - ); - - assertThat(QueryLoggerUtil.queryLoggerName(queryContextCaptor.getValue()), - is("output-test.0")); + verify(resultStream).into(any(), any(), eq(valueFormat), any(), any(), any()); } @Test @@ -365,12 +332,11 @@ public void shouldCallInto() { // Then: verify(resultStream).into( eq(SINK_KAFKA_TOPIC_NAME), - same(rowSerde), eq(SCHEMA), eq(JSON_FORMAT), eq(SerdeOption.none()), - eq(ImmutableSet.of()), - stackerCaptor.capture() + stackerCaptor.capture(), + same(ksqlStreamBuilder) ); assertThat( stackerCaptor.getValue().getQueryContext().getContext(), @@ -379,57 +345,6 @@ public void shouldCallInto() { assertThat(result, sameInstance(sinkStream)); } - @Test - public void shouldCallIntoWithIndexesToRemoveImplicitsAndRowKey() { - // Given: - final LogicalSchema schema = SCHEMA.withMetaAndKeyColsInValue(); - givenNodeWithSchema(schema); - - // When: - outputNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(resultStream).into( - eq(SINK_KAFKA_TOPIC_NAME), - same(rowSerde), - any(), - any(), - any(), - eq(ImmutableSet.of(0, 1)), - any() - ); - } - - @Test - public void shouldCallIntoWithIndexesToRemoveImplicitsAndRowKeyRegardlessOfLocation() { - // Given: - final LogicalSchema schema = LogicalSchema.builder() - .valueColumn("field1", SqlTypes.STRING) - .valueColumn("field2", SqlTypes.STRING) - .valueColumn("ROWKEY", SqlTypes.STRING) - .valueColumn("field3", SqlTypes.STRING) - .valueColumn("timestamp", SqlTypes.BIGINT) - .valueColumn("ROWTIME", SqlTypes.BIGINT) - .valueColumn("key", SqlTypes.STRING) - .build(); - - givenNodeWithSchema(schema); - - // When: - outputNode.buildStream(ksqlStreamBuilder); - - // Then: - verify(resultStream).into( - eq(SINK_KAFKA_TOPIC_NAME), - same(rowSerde), - any(), - any(), - any(), - eq(ImmutableSet.of(2, 5)), - any() - ); - } - private void givenInsertIntoNode() { this.createInto = false; buildNode(); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java index 09167d8e7b72..6224726c9ab1 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java @@ -15,21 +15,23 @@ package io.confluent.ksql.execution.plan; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import java.util.Collections; import java.util.List; import java.util.Objects; +import org.apache.kafka.streams.kstream.KStream; @Immutable -public class StreamSink implements ExecutionStep { +public class StreamSink implements ExecutionStep> { private final ExecutionStepProperties properties; - private final ExecutionStep source; + private final ExecutionStep> source; private final Formats formats; private final String topicName; public StreamSink( final ExecutionStepProperties properties, - final ExecutionStep source, + final ExecutionStep> source, final Formats formats, final String topicName) { this.properties = Objects.requireNonNull(properties, "properties"); @@ -52,11 +54,16 @@ public List> getSources() { return Collections.singletonList(source); } + public Formats getFormats() { + return formats; + } + @Override - public S build(final KsqlQueryBuilder streamsBuilder) { + public KStream build(final KsqlQueryBuilder streamsBuilder) { throw new UnsupportedOperationException(); } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java index 2e4f18e8ae8c..5d0972dda8d6 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java @@ -15,21 +15,23 @@ package io.confluent.ksql.execution.plan; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import java.util.Collections; import java.util.List; import java.util.Objects; +import org.apache.kafka.streams.kstream.KTable; @Immutable -public class TableSink implements ExecutionStep { +public class TableSink implements ExecutionStep> { private final ExecutionStepProperties properties; - private final ExecutionStep source; + private final ExecutionStep> source; private final Formats formats; private final String topicName; public TableSink( final ExecutionStepProperties properties, - final ExecutionStep source, + final ExecutionStep> source, final Formats formats, final String topicName ) { @@ -53,8 +55,12 @@ public List> getSources() { return Collections.singletonList(source); } + public Formats getFormats() { + return formats; + } + @Override - public T build(final KsqlQueryBuilder builder) { + public KTable build(final KsqlQueryBuilder builder) { throw new UnsupportedOperationException(); } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/SinkSchemaUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/SinkSchemaUtil.java new file mode 100644 index 000000000000..dbd6946f4513 --- /dev/null +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/SinkSchemaUtil.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.util; + +import com.google.common.collect.Streams; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.connect.data.ConnectSchema; + +public final class SinkSchemaUtil { + private SinkSchemaUtil() { + } + + public static LogicalSchema sinkSchema(final ExecutionStep step) { + final LogicalSchema schema = step.getSources().get(0).getProperties().getSchema(); + return schema.withoutMetaAndKeyColsInValue(); + } + + public static Set implicitAndKeyColumnIndexesInValueSchema( + final ExecutionStep step + ) { + final LogicalSchema schema = step.getSources().get(0).getProperties().getSchema(); + final ConnectSchema valueSchema = schema.valueConnectSchema(); + + final Stream cols = Streams.concat( + schema.metadata().stream(), + schema.key().stream() + ); + + return cols + .map(Column::name) + .map(valueSchema::field) + .filter(Objects::nonNull) + .map(org.apache.kafka.connect.data.Field::index) + .collect(Collectors.toSet()); + } +} diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/SinkSchemaUtilTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/SinkSchemaUtilTest.java new file mode 100644 index 000000000000..2ac41e54b4ef --- /dev/null +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/SinkSchemaUtilTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.util; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import java.util.Set; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class SinkSchemaUtilTest { + @Mock + private ExecutionStep step; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private void givenStepWithSchema(final LogicalSchema schema) { + when(step.getSources()).thenReturn(ImmutableList.of(step)); + when(step.getProperties()).thenReturn( + new DefaultExecutionStepProperties(schema, mock(QueryContext.class)) + ); + } + + @Test + public void shouldComputeIndexesToRemoveImplicitsAndRowKey() { + // Given: + givenStepWithSchema(LogicalSchema.builder() + .valueColumn("field1", SqlTypes.STRING) + .valueColumn("field2", SqlTypes.STRING) + .valueColumn("field3", SqlTypes.STRING) + .valueColumn("timestamp", SqlTypes.BIGINT) + .valueColumn("key", SqlTypes.STRING) + .build() + .withMetaAndKeyColsInValue() + ); + + // When: + final Set indices = SinkSchemaUtil.implicitAndKeyColumnIndexesInValueSchema(step); + + // Then: + assertThat(indices, contains(0, 1)); + } + + @Test + public void shouldComputeIndexesToRemoveImplicitsAndRowKeyRegardlessOfLocation() { + // Given: + givenStepWithSchema(LogicalSchema.builder() + .valueColumn("field1", SqlTypes.STRING) + .valueColumn("field2", SqlTypes.STRING) + .valueColumn("ROWKEY", SqlTypes.STRING) + .valueColumn("field3", SqlTypes.STRING) + .valueColumn("timestamp", SqlTypes.BIGINT) + .valueColumn("ROWTIME", SqlTypes.BIGINT) + .valueColumn("key", SqlTypes.STRING) + .build() + ); + + // When: + final Set indices = SinkSchemaUtil.implicitAndKeyColumnIndexesInValueSchema(step); + + // Then: + assertThat(indices, contains(2, 5)); + } +} \ No newline at end of file diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 9895350f76c9..9f1e3ca89d5d 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -123,7 +123,7 @@ public static StreamToTable, KTable> s ); } - public static StreamSink> streamSink( + public static StreamSink streamSink( final QueryContext.Stacker stacker, final LogicalSchema outputSchema, final Formats formats, @@ -235,7 +235,7 @@ public static StreamSelectKey streamSelectKey( ); } - public static TableSink> tableSink( + public static TableSink tableSink( final QueryContext.Stacker stacker, final LogicalSchema outputSchema, final ExecutionStep> source, diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KeySerdeFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KeySerdeFactory.java new file mode 100644 index 000000000000..722dca64ce8a --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/KeySerdeFactory.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; + +public interface KeySerdeFactory { + KeySerde buildKeySerde( + KeyFormat keyFormat, + PhysicalSchema physicalSchema, + QueryContext queryContext + ); +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSinkBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSinkBuilder.java new file mode 100644 index 000000000000..c6fcb6c811b8 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSinkBuilder.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.StreamSink; +import io.confluent.ksql.execution.util.SinkSchemaUtil; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeySerde; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; + +public final class StreamSinkBuilder { + private StreamSinkBuilder() { + } + + public static void build( + final KStream kstream, + final StreamSink streamSink, + final KeySerdeFactory keySerdeFactory, + final KsqlQueryBuilder queryBuilder) { + final QueryContext queryContext = streamSink.getProperties().getQueryContext(); + final LogicalSchema schema = SinkSchemaUtil.sinkSchema(streamSink); + final Formats formats = streamSink.getFormats(); + final PhysicalSchema physicalSchema = PhysicalSchema.from(schema, formats.getOptions()); + final KeySerde keySerde = keySerdeFactory.buildKeySerde( + formats.getKeyFormat(), + physicalSchema, + queryContext + ); + final Serde valueSerde = queryBuilder.buildValueSerde( + formats.getValueFormat().getFormatInfo(), + physicalSchema, + queryContext + ); + final Set rowkeyIndexes = + SinkSchemaUtil.implicitAndKeyColumnIndexesInValueSchema(streamSink); + final String kafkaTopicName = streamSink.getTopicName(); + kstream + .mapValues(row -> { + if (row == null) { + return null; + } + final List columns = new ArrayList<>(); + for (int i = 0; i < row.getColumns().size(); i++) { + if (!rowkeyIndexes.contains(i)) { + columns.add(row.getColumns().get(i)); + } + } + return new GenericRow(columns); + }).to(kafkaTopicName, Produced.with(keySerde, valueSerde)); + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSinkBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSinkBuilder.java new file mode 100644 index 000000000000..e7884b489d79 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableSinkBuilder.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.TableSink; +import io.confluent.ksql.execution.util.SinkSchemaUtil; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.KeySerde; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; + +public final class TableSinkBuilder { + private TableSinkBuilder() { + } + + public static void build( + final KTable ktable, + final TableSink tableSink, + final KeySerdeFactory keySerdeFactory, + final KsqlQueryBuilder queryBuilder) { + final QueryContext queryContext = tableSink.getProperties().getQueryContext(); + final LogicalSchema schema = SinkSchemaUtil.sinkSchema(tableSink); + final Formats formats = tableSink.getFormats(); + final PhysicalSchema physicalSchema = PhysicalSchema.from(schema, formats.getOptions()); + final KeySerde keySerde = keySerdeFactory.buildKeySerde( + formats.getKeyFormat(), + physicalSchema, + queryContext + ); + final Serde valueSerde = queryBuilder.buildValueSerde( + formats.getValueFormat().getFormatInfo(), + physicalSchema, + queryContext + ); + final Set rowkeyIndexes = + SinkSchemaUtil.implicitAndKeyColumnIndexesInValueSchema(tableSink); + final String kafkaTopicName = tableSink.getTopicName(); + ktable.toStream() + .mapValues(row -> { + if (row == null) { + return null; + } + final List columns = new ArrayList<>(); + for (int i = 0; i < row.getColumns().size(); i++) { + if (!rowkeyIndexes.contains(i)) { + columns.add(row.getColumns().get(i)); + } + } + return new GenericRow(columns); + } + ).to(kafkaTopicName, Produced.with(keySerde, valueSerde)); + } +} \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java new file mode 100644 index 000000000000..ceb2557005aa --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java @@ -0,0 +1,192 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.StreamSink; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class StreamSinkBuilderTest { + private static final String TOPIC = "TOPIC"; + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.BIGINT) + .valueColumn("GREEN", SqlTypes.STRING) + .build() + .withMetaAndKeyColsInValue(); + private static final PhysicalSchema PHYSICAL_SCHEMA = + PhysicalSchema.from(SCHEMA.withoutMetaAndKeyColsInValue(), SerdeOption.none()); + private static final KeyFormat KEY_FORMAT = KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)); + private static final ValueFormat VALUE_FORMAT = ValueFormat.of(FormatInfo.of(Format.JSON)); + + @Mock + private KsqlQueryBuilder queryBuilder; + @Mock + private KeySerdeFactory keySerdeFactory; + @Mock + private KStream stream; + @Mock + private ExecutionStep> source; + @Mock + private KeySerde keySerde; + @Mock + private Serde valSerde; + @Captor + private ArgumentCaptor> mapperCaptor; + + private final QueryContext queryContext = + new QueryContext.Stacker(new QueryId("qid")).push("sink").getQueryContext(); + + private StreamSink sink; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + when(keySerdeFactory.buildKeySerde(any(), any(), any())).thenReturn(keySerde); + when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valSerde); + when(stream.mapValues(any(ValueMapper.class))).thenReturn(stream); + when(source.getProperties()).thenReturn( + new DefaultExecutionStepProperties(SCHEMA, mock(QueryContext.class)) + ); + sink = new StreamSink<>( + new DefaultExecutionStepProperties(SCHEMA, queryContext), + source, + Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), + TOPIC + ); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldWriteOutStream() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + final InOrder inOrder = Mockito.inOrder(stream); + inOrder.verify(stream).mapValues(any(ValueMapper.class)); + inOrder.verify(stream).to(anyString(), any()); + verifyNoMoreInteractions(stream); + } + + @Test + public void shouldWriteOutStreamToCorrectTopic() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).to(eq(TOPIC), any()); + } + + @Test + public void shouldBuildKeySerdeCorrectly() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(keySerdeFactory).buildKeySerde(KEY_FORMAT, PHYSICAL_SCHEMA, queryContext); + } + + @Test + public void shouldBuildValueSerdeCorrectly() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(queryBuilder).buildValueSerde( + VALUE_FORMAT.getFormatInfo(), + PHYSICAL_SCHEMA, + queryContext + ); + } + + @Test + public void shouldWriteOutStreamWithCorrectSerdes() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).to(anyString(), eq(Produced.with(keySerde, valSerde))); + } + + @Test + public void shouldRemoveKeyAndTimeFieldsFromValue() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).mapValues(mapperCaptor.capture()); + final ValueMapper mapper = mapperCaptor.getValue(); + assertThat( + mapper.apply(new GenericRow(123, "456", 789, "101112")), + equalTo(new GenericRow(789, "101112")) + ); + } + + @Test + public void shouldIgnoreNullRowsWhenRemovingKeyAndTimeFieldsFromValue() { + // When: + StreamSinkBuilder.build(stream, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).mapValues(mapperCaptor.capture()); + final ValueMapper mapper = mapperCaptor.getValue(); + assertThat(mapper.apply(null), is(nullValue())); + } +} \ No newline at end of file diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java new file mode 100644 index 000000000000..63879fc43e84 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java @@ -0,0 +1,197 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.Formats; +import io.confluent.ksql.execution.plan.TableSink; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.KeySerde; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class TableSinkBuilderTest { + private static final String TOPIC = "TOPIC"; + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("BLUE", SqlTypes.BIGINT) + .valueColumn("GREEN", SqlTypes.STRING) + .build() + .withMetaAndKeyColsInValue(); + private static final PhysicalSchema PHYSICAL_SCHEMA = + PhysicalSchema.from(SCHEMA.withoutMetaAndKeyColsInValue(), SerdeOption.none()); + private static final KeyFormat KEY_FORMAT = KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)); + private static final ValueFormat VALUE_FORMAT = ValueFormat.of(FormatInfo.of(Format.JSON)); + + @Mock + private KsqlQueryBuilder queryBuilder; + @Mock + private KeySerdeFactory keySerdeFactory; + @Mock + private KTable table; + @Mock + private KStream stream; + @Mock + private ExecutionStep> source; + @Mock + private KeySerde keySerde; + @Mock + private Serde valSerde; + @Captor + private ArgumentCaptor> mapperCaptor; + + private final QueryContext queryContext = + new QueryContext.Stacker(new QueryId("qid")).push("sink").getQueryContext(); + + private TableSink sink; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + when(keySerdeFactory.buildKeySerde(any(), any(), any())).thenReturn(keySerde); + when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valSerde); + when(table.toStream()).thenReturn(stream); + when(stream.mapValues(any(ValueMapper.class))).thenReturn(stream); + when(source.getProperties()).thenReturn( + new DefaultExecutionStepProperties(SCHEMA, mock(QueryContext.class)) + ); + sink = new TableSink<>( + new DefaultExecutionStepProperties(SCHEMA, queryContext), + source, + Formats.of(KEY_FORMAT, VALUE_FORMAT, SerdeOption.none()), + TOPIC + ); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldWriteOutTable() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + final InOrder inOrder = Mockito.inOrder(table, stream); + inOrder.verify(table).toStream(); + inOrder.verify(stream).mapValues(any(ValueMapper.class)); + inOrder.verify(stream).to(anyString(), any()); + verifyNoMoreInteractions(stream); + } + + @Test + public void shouldWriteOutTableToCorrectTopic() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).to(eq(TOPIC), any()); + } + + @Test + public void shouldBuildKeySerdeCorrectly() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(keySerdeFactory).buildKeySerde(KEY_FORMAT, PHYSICAL_SCHEMA, queryContext); + } + + @Test + public void shouldBuildValueSerdeCorrectly() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(queryBuilder).buildValueSerde( + VALUE_FORMAT.getFormatInfo(), + PHYSICAL_SCHEMA, + queryContext + ); + } + + @Test + public void shouldWriteOutTableWithCorrectSerdes() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).to(anyString(), eq(Produced.with(keySerde, valSerde))); + } + + @Test + public void shouldRemoveKeyAndTimeFieldsFromValue() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).mapValues(mapperCaptor.capture()); + final ValueMapper mapper = mapperCaptor.getValue(); + assertThat( + mapper.apply(new GenericRow(123, "456", 789, "101112")), + equalTo(new GenericRow(789, "101112")) + ); + } + + @Test + public void shouldIgnoreNullRowsWhenRemovingKeyAndTimeFieldsFromValue() { + // When: + TableSinkBuilder.build(table, sink, keySerdeFactory, queryBuilder); + + // Then: + verify(stream).mapValues(mapperCaptor.capture()); + final ValueMapper mapper = mapperCaptor.getValue(); + assertThat(mapper.apply(null), is(nullValue())); + } +} \ No newline at end of file From c7773232a43135f52f6e29927a97ce48624ab77e Mon Sep 17 00:00:00 2001 From: rodesai Date: Thu, 19 Sep 2019 12:48:12 -0700 Subject: [PATCH 2/3] feedback --- .../execution/streams/StreamSinkBuilderTest.java | 15 +++++---------- .../execution/streams/TableSinkBuilderTest.java | 9 +++------ 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java index ceb2557005aa..a5fcca2f6ef4 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java @@ -34,7 +34,6 @@ import io.confluent.ksql.execution.plan.ExecutionStep; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.StreamSink; -import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -50,16 +49,16 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class StreamSinkBuilderTest { private static final String TOPIC = "TOPIC"; private static final LogicalSchema SCHEMA = LogicalSchema.builder() @@ -86,15 +85,11 @@ public class StreamSinkBuilderTest { private Serde valSerde; @Captor private ArgumentCaptor> mapperCaptor; - - private final QueryContext queryContext = - new QueryContext.Stacker(new QueryId("qid")).push("sink").getQueryContext(); + @Mock + private QueryContext queryContext; private StreamSink sink; - @Rule - public final MockitoRule mockitoRule = MockitoJUnit.rule(); - @Before @SuppressWarnings("unchecked") public void setup() { diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java index 63879fc43e84..0e5ee719308b 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableSinkBuilderTest.java @@ -51,16 +51,16 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class TableSinkBuilderTest { private static final String TOPIC = "TOPIC"; private static final LogicalSchema SCHEMA = LogicalSchema.builder() @@ -95,9 +95,6 @@ public class TableSinkBuilderTest { private TableSink sink; - @Rule - public final MockitoRule mockitoRule = MockitoJUnit.rule(); - @Before @SuppressWarnings("unchecked") public void setup() { From cc60b19fb24eb754222cc7570d772b561d42b152 Mon Sep 17 00:00:00 2001 From: rodesai Date: Thu, 19 Sep 2019 13:00:52 -0700 Subject: [PATCH 3/3] checkstyle --- .../src/main/java/io/confluent/ksql/structured/SchemaKTable.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index a9832894499e..7dac67d34d31 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -51,7 +51,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable;