From f312fcc213ba393c0be5584a02793c191028adbf Mon Sep 17 00:00:00 2001 From: Rohan Date: Wed, 18 Sep 2019 13:09:35 -0700 Subject: [PATCH] feat: move selectKey impl to plan builder (#3362) Moves rekeying from SchemaKStream and into an execution plan builder. --- .../ksql/structured/GroupByMapper.java | 1 + .../ksql/structured/SchemaKStream.java | 24 +- .../ksql/structured/SchemaKTable.java | 1 + .../ksql/structured/GroupByMapperTest.java | 1 + .../ksql/structured/SchemaKTableTest.java | 1 + .../ksql/execution/plan/StreamSelectKey.java | 19 +- .../ksql/execution/util}/StructKeyUtil.java | 8 +- .../streams/ExecutionStepFactory.java | 3 +- .../streams/StreamSelectKeyBuilder.java | 70 +++++ .../streams/StreamSelectKeyBuilderTest.java | 241 ++++++++++++++++++ 10 files changed, 342 insertions(+), 27 deletions(-) rename {ksql-engine/src/main/java/io/confluent/ksql/structured => ksql-execution/src/main/java/io/confluent/ksql/execution/util}/StructKeyUtil.java (86%) create mode 100644 ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java create mode 100644 ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/GroupByMapper.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/GroupByMapper.java index c2d8e5e25fe9..8b89ac745589 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/GroupByMapper.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/GroupByMapper.java @@ -19,6 +19,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.codegen.ExpressionMetadata; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.util.StructKeyUtil; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 37d03456e82a..c06944acaae8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -38,14 +38,17 @@ import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.execution.plan.StreamFilter; import io.confluent.ksql.execution.plan.StreamMapValues; +import io.confluent.ksql.execution.plan.StreamSelectKey; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.execution.plan.StreamToTable; import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.StreamFilterBuilder; import io.confluent.ksql.execution.streams.StreamMapValuesBuilder; +import io.confluent.ksql.execution.streams.StreamSelectKeyBuilder; import io.confluent.ksql.execution.streams.StreamSourceBuilder; import io.confluent.ksql.execution.streams.StreamToTableBuilder; import io.confluent.ksql.execution.streams.StreamsUtil; +import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.KeyField; @@ -697,34 +700,19 @@ public SchemaKStream selectKey( ); } - final int keyIndexInValue = getSchema().valueColumnIndex(proposedKey.fullName()) - .orElseThrow(IllegalStateException::new); - - final KStream keyedKStream = kstream - .filter((key, value) -> value != null && extractColumn(keyIndexInValue, value) != null) - .selectKey((key, value) -> - StructKeyUtil.asStructKey(extractColumn(keyIndexInValue, value).toString())) - .mapValues((key, row) -> { - if (updateRowKey) { - final Object rowKey = key.get(key.schema().fields().get(0)); - row.getColumns().set(SchemaUtil.ROWKEY_INDEX, rowKey); - } - return row; - }); - final KeyField newKeyField = getSchema().isMetaColumn(fieldName) ? resultantKeyField.withName(Optional.empty()) : resultantKeyField; final KeySerde selectKeySerde = keySerde.rebind(StructKeyUtil.ROWKEY_SERIALIZED_SCHEMA); - final ExecutionStep> step = ExecutionStepFactory.streamSelectKey( + final StreamSelectKey step = ExecutionStepFactory.streamSelectKey( contextStacker, sourceStep, fieldName, updateRowKey ); - return (SchemaKStream) new SchemaKStream( - keyedKStream, + return new SchemaKStream<>( + StreamSelectKeyBuilder.build(kstream, step), step, keyFormat, selectKeySerde, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java index 67b4e6951176..6ee7d2763760 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java @@ -30,6 +30,7 @@ import io.confluent.ksql.execution.streams.StreamsUtil; import io.confluent.ksql.execution.streams.TableFilterBuilder; import io.confluent.ksql.execution.streams.TableMapValuesBuilder; +import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KeyField.LegacyField; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/GroupByMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/GroupByMapperTest.java index c8bdcca1f3dc..def4018422ae 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/GroupByMapperTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/GroupByMapperTest.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.QualifiedName; import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; +import io.confluent.ksql.execution.util.StructKeyUtil; import java.util.Collections; import org.apache.kafka.connect.data.Struct; import org.easymock.EasyMock; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index 17568bb9d9c6..e99e0d9d217a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -54,6 +54,7 @@ import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.execution.streams.MaterializedFactory; import io.confluent.ksql.execution.streams.StreamsUtil; +import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java index fb87dd91a581..cb11fa19739b 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSelectKey.java @@ -15,21 +15,24 @@ package io.confluent.ksql.execution.plan; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import java.util.Collections; import java.util.List; import java.util.Objects; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; @Immutable -public class StreamSelectKey implements ExecutionStep { +public class StreamSelectKey implements ExecutionStep> { private final ExecutionStepProperties properties; - private final ExecutionStep source; + private final ExecutionStep> source; private final String fieldName; private final boolean updateRowKey; public StreamSelectKey( final ExecutionStepProperties properties, - final ExecutionStep source, + final ExecutionStep> source, final String fieldName, final boolean updateRowKey) { this.properties = Objects.requireNonNull(properties, "properties"); @@ -48,8 +51,16 @@ public List> getSources() { return Collections.singletonList(source); } + public boolean isUpdateRowKey() { + return updateRowKey; + } + + public String getFieldName() { + return fieldName; + } + @Override - public S build(final KsqlQueryBuilder streamsBuilder) { + public KStream build(final KsqlQueryBuilder streamsBuilder) { throw new UnsupportedOperationException(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/StructKeyUtil.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java similarity index 86% rename from ksql-engine/src/main/java/io/confluent/ksql/structured/StructKeyUtil.java rename to ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java index 4d23b1d2dc24..100dd68c4a17 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/StructKeyUtil.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/StructKeyUtil.java @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.structured; +package io.confluent.ksql.execution.util; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.SchemaUtil; @@ -25,7 +25,7 @@ /** * Helper for dealing with Struct keys. */ -final class StructKeyUtil { +public final class StructKeyUtil { private static final Schema ROWKEY_STRUCT_SCHEMA = SchemaBuilder .struct() @@ -35,7 +35,7 @@ final class StructKeyUtil { private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD = ROWKEY_STRUCT_SCHEMA.fields().get(0); - static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from( + public static final PersistenceSchema ROWKEY_SERIALIZED_SCHEMA = PersistenceSchema.from( (ConnectSchema) ROWKEY_STRUCT_SCHEMA, false ); @@ -43,7 +43,7 @@ final class StructKeyUtil { private StructKeyUtil() { } - static Struct asStructKey(final String rowKey) { + public static Struct asStructKey(final String rowKey) { final Struct keyStruct = new Struct(ROWKEY_STRUCT_SCHEMA); keyStruct.put(ROWKEY_FIELD, rowKey); return keyStruct; diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java index 9d7d01d3845a..aa689dfd3c25 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/ExecutionStepFactory.java @@ -215,7 +215,8 @@ public static StreamStreamJoin> streamStreamJoin( ); } - public static StreamSelectKey> streamSelectKey( + @SuppressWarnings("unchecked") + public static StreamSelectKey streamSelectKey( final QueryContext.Stacker stacker, final ExecutionStep> source, final String fieldName, diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java new file mode 100644 index 000000000000..0d98604d026e --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilder.java @@ -0,0 +1,70 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.execution.util.StructKeyUtil; +import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.SchemaUtil; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; + +public final class StreamSelectKeyBuilder { + private StreamSelectKeyBuilder() { + } + + public static KStream build( + final KStream kstream, + final StreamSelectKey selectKey) { + final LogicalSchema sourceSchema = selectKey.getSources().get(0).getProperties().getSchema(); + final Column keyColumn = sourceSchema.findValueColumn(selectKey.getFieldName()) + .orElseThrow(IllegalArgumentException::new); + final int keyIndexInValue = sourceSchema.valueColumnIndex(keyColumn.fullName()) + .orElseThrow(IllegalStateException::new); + final boolean updateRowKey = selectKey.isUpdateRowKey(); + return kstream + .filter((key, value) -> + value != null && extractColumn(sourceSchema, keyIndexInValue, value) != null + ).selectKey((key, value) -> + StructKeyUtil.asStructKey( + extractColumn(sourceSchema, keyIndexInValue, value).toString() + ) + ).mapValues((key, row) -> { + if (updateRowKey) { + final Object rowKey = key.get(key.schema().fields().get(0)); + row.getColumns().set(SchemaUtil.ROWKEY_INDEX, rowKey); + } + return row; + }); + } + + private static Object extractColumn( + final LogicalSchema schema, + final int keyIndexInValue, + final GenericRow value + ) { + if (value.getColumns().size() != schema.value().size()) { + throw new IllegalStateException("Field count mismatch. " + + "Schema fields: " + schema + + ", row:" + value); + } + return value + .getColumns() + .get(keyIndexInValue); + } +} diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java new file mode 100644 index 000000000000..f9b292d4afb8 --- /dev/null +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java @@ -0,0 +1,241 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static io.confluent.ksql.execution.util.StructKeyUtil.asStructKey; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.plan.DefaultExecutionStepProperties; +import io.confluent.ksql.execution.plan.ExecutionStep; +import io.confluent.ksql.execution.plan.ExecutionStepProperties; +import io.confluent.ksql.execution.plan.StreamSelectKey; +import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.ValueMapperWithKey; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class StreamSelectKeyBuilderTest { + private static final String ALIAS = "ATL"; + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .valueColumn("BIG", SqlTypes.BIGINT) + .valueColumn("BOI", SqlTypes.STRING) + .build() + .withAlias(ALIAS) + .withMetaAndKeyColsInValue(); + private static final String KEY = "ATL.BOI"; + + @Mock + private KStream kstream; + @Mock + private KStream rekeyedKstream; + @Mock + private KStream filteredKStream; + @Mock + private KStream updatedKeyKStream; + @Mock + private ExecutionStep> sourceStep; + @Captor + private ArgumentCaptor> predicateCaptor; + @Captor + private ArgumentCaptor> keyValueMapperCaptor; + @Captor + private ArgumentCaptor> mapperCaptor; + + private final QueryContext queryContext = + new QueryContext.Stacker(new QueryId("hey")).push("ya").getQueryContext(); + private final ExecutionStepProperties properties = new DefaultExecutionStepProperties( + SCHEMA, + queryContext + ); + + private StreamSelectKey selectKey; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + @SuppressWarnings("unchecked") + public void init() { + when(sourceStep.getProperties()).thenReturn(properties); + when(kstream.filter(any())).thenReturn(filteredKStream); + when(filteredKStream.selectKey(any(KeyValueMapper.class))).thenReturn(rekeyedKstream); + when(rekeyedKstream.mapValues(any(ValueMapperWithKey.class))).thenReturn(updatedKeyKStream); + givenUpdateRowkey(); + } + + private void givenUpdateRowkey() { + selectKey = new StreamSelectKey<>( + properties, + sourceStep, + KEY, + true + ); + } + + private void givenUpdateRowkeyFalse() { + selectKey = new StreamSelectKey<>( + properties, + sourceStep, + KEY, + false + ); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldRekeyCorrectly() { + // When: + final KStream result = StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + final InOrder inOrder = Mockito.inOrder(kstream, filteredKStream, rekeyedKstream); + inOrder.verify(kstream).filter(any()); + inOrder.verify(filteredKStream).selectKey(any()); + inOrder.verify(rekeyedKstream).mapValues(any(ValueMapperWithKey.class)); + inOrder.verifyNoMoreInteractions(); + assertThat(result, is(updatedKeyKStream)); + } + + @Test + public void shouldFilterOutNullValues() { + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat(predicate.test(asStructKey("dre"), null), is(false)); + } + + @Test + public void shouldFilterOutNullKeyColumns() { + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat( + predicate.test(asStructKey("dre"), new GenericRow(0, "dre", 3000, null)), + is(false) + ); + } + + @Test + public void shouldNotFilterOutNonNullKeyColumns() { + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat( + predicate.test(asStructKey("dre"), new GenericRow(0, "dre", 3000, "bob")), + is(true) + ); + } + + @Test + public void shouldIgnoreNullNonKeyColumns() { + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + verify(kstream).filter(predicateCaptor.capture()); + final Predicate predicate = getPredicate(); + assertThat(predicate.test(asStructKey("dre"), new GenericRow(0, "dre", null, "bob")), is(true)); + } + + @Test + public void shouldComputeCorrectKey() { + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + final KeyValueMapper keyValueMapper = getKeyMapper(); + assertThat( + keyValueMapper.apply(asStructKey("dre"), new GenericRow(0, "dre", 3000, "bob")), + is(asStructKey("bob")) + ); + } + + @Test + public void shouldUpdateRowkeyIfUpdateRowkeyTrue() { + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + verify(rekeyedKstream).mapValues(mapperCaptor.capture()); + final ValueMapperWithKey mapper = getMapper(); + assertThat( + mapper.apply(asStructKey("bob"), new GenericRow(0, "dre", 3000, "bob")), + equalTo(new GenericRow(0, "bob", 3000, "bob")) + ); + } + + @Test + public void shouldNotUpdateRowkeyIfUpdateRowkeyTrue() { + // Given: + givenUpdateRowkeyFalse(); + + // When: + StreamSelectKeyBuilder.build(kstream, selectKey); + + // Then: + final ValueMapperWithKey mapper = getMapper(); + assertThat( + mapper.apply(asStructKey("bob"), new GenericRow(0, "dre", 3000, "bob")), + equalTo(new GenericRow(0, "dre", 3000, "bob")) + ); + } + + private KeyValueMapper getKeyMapper() { + verify(filteredKStream).selectKey(keyValueMapperCaptor.capture()); + return keyValueMapperCaptor.getValue(); + } + + private ValueMapperWithKey getMapper() { + verify(rekeyedKstream).mapValues(mapperCaptor.capture()); + return mapperCaptor.getValue(); + } + + private Predicate getPredicate() { + verify(kstream).filter(predicateCaptor.capture()); + return predicateCaptor.getValue(); + } +} \ No newline at end of file